Airflow XCom на собеседовании Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем спрашивают на собесе DE
XCom — стандартный механизм передачи данных между tasks в Airflow. На собесе DE: «как передать значение между tasks», «лимиты XCom», «зачем custom backend».
Что такое XCom
Cross-Communication. Хранилище small values для передачи между tasks. Хранится в Airflow metadata DB.
# task A:
def push_value(**context):
context['ti'].xcom_push(key='count', value=42)
# task B:
def pull_value(**context):
val = context['ti'].xcom_pull(task_ids='task_a', key='count')push и pull
push. Сохранить значение.
ti.xcom_push(key='my_key', value=data)pull. Получить значение.
ti.xcom_pull(task_ids='upstream_task', key='my_key')
ti.xcom_pull(task_ids=['t1', 't2'], key='my_key') # из несколькихAuto-push. Если PythonOperator возвращает значение — автоматически push'ится с key='return_value'.
def get_count():
return 42
# в downstream task:
val = ti.xcom_pull(task_ids='get_count_task') # 42TaskFlow API
Airflow 2.0+ — декораторы делают XCom прозрачным.
from airflow.decorators import task, dag
@task
def extract():
return {"count": 42, "data": [...]}
@task
def transform(data):
return data["count"] * 2
@dag(...)
def my_pipeline():
raw = extract()
result = transform(raw)Под капотом — XCom. Pythonic syntax, без boilerplate.
Лимиты и ограничения
Размер. XCom value сериализуется в JSON и хранится в metadata DB. Жёсткий лимит — max_xcom_value_size (по умолчанию 48KB в Postgres backend).
Что нельзя класть в XCom:
- Большие DataFrames / arrays.
- Файлы.
- Models, embeddings.
- Любое > 1 MB.
Что можно:
- ID, путь к файлу, URL.
- Маленькие конфиги.
- Status, summary.
- Скаляры, dicts.
Принцип. XCom — для метаданных, не данных. Данные — в S3 / DWH, передавай URI / path / row_count.
Custom XCom backend
Если нужно передавать большие объекты — custom backend.
from airflow.models.xcom import BaseXCom
class S3XComBackend(BaseXCom):
PREFIX = "s3://my-bucket/xcom/"
@staticmethod
def serialize_value(value):
if isinstance(value, pd.DataFrame):
key = f"{uuid4()}.parquet"
value.to_parquet(f"s3://my-bucket/xcom/{key}")
return BaseXCom.serialize_value(f"S3:{key}")
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result):
value = BaseXCom.deserialize_value(result)
if isinstance(value, str) and value.startswith("S3:"):
return pd.read_parquet(f"s3://my-bucket/xcom/{value[3:]}")
return valueПолезно для DataFrames между tasks без boilerplate.
Антипаттерны
Передавать DataFrame через XCom. OOM на metadata DB. Большие данные — через S3 / shared FS, передавать только path.
XCom как sync mechanism. Tasks должны быть idempotent и не зависеть от runtime values сложным образом. Переиспользуй input data.
Sensitive в XCom. Видно через UI всем с access к Airflow. Используй Variables / Secrets.
Многоуровневое XCom passing. A → B → C → D через XCom — fragile. Лучше — каждая task читает source независимо.
XCom для миграций / DDL. XCom — runtime, не должен влиять на schema или structure.
Связанные темы
- Airflow на собесе DE
- Airflow шпаргалка для аналитика
- Идемпотентность пайплайна для DE
- dbt incremental models для DE
- Подготовка к собесу Data Engineer
FAQ
XCom хранится между DAG runs?
Да. По умолчанию — без TTL (хотя есть cleanup). Можно настроить xcom_purge_after_days.
Можно ли передавать XCom между DAGs?
Да, через ExternalTaskSensor + XCom pull с явным dag_id.
Это официальная информация?
Нет. Статья основана на документации Airflow 2.7+.
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.