Airflow XCom на собеседовании Data Engineer

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Зачем спрашивают на собесе DE

XCom — стандартный механизм передачи данных между tasks в Airflow. На собесе DE: «как передать значение между tasks», «лимиты XCom», «зачем custom backend».

Что такое XCom

Cross-Communication. Хранилище small values для передачи между tasks. Хранится в Airflow metadata DB.

# task A:
def push_value(**context):
    context['ti'].xcom_push(key='count', value=42)

# task B:
def pull_value(**context):
    val = context['ti'].xcom_pull(task_ids='task_a', key='count')

push и pull

push. Сохранить значение.

ti.xcom_push(key='my_key', value=data)

pull. Получить значение.

ti.xcom_pull(task_ids='upstream_task', key='my_key')
ti.xcom_pull(task_ids=['t1', 't2'], key='my_key')  # из нескольких

Auto-push. Если PythonOperator возвращает значение — автоматически push'ится с key='return_value'.

def get_count():
    return 42

# в downstream task:
val = ti.xcom_pull(task_ids='get_count_task')   # 42

TaskFlow API

Airflow 2.0+ — декораторы делают XCom прозрачным.

from airflow.decorators import task, dag

@task
def extract():
    return {"count": 42, "data": [...]}

@task
def transform(data):
    return data["count"] * 2

@dag(...)
def my_pipeline():
    raw = extract()
    result = transform(raw)

Под капотом — XCom. Pythonic syntax, без boilerplate.

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Лимиты и ограничения

Размер. XCom value сериализуется в JSON и хранится в metadata DB. Жёсткий лимит — max_xcom_value_size (по умолчанию 48KB в Postgres backend).

Что нельзя класть в XCom:

  • Большие DataFrames / arrays.
  • Файлы.
  • Models, embeddings.
  • Любое > 1 MB.

Что можно:

  • ID, путь к файлу, URL.
  • Маленькие конфиги.
  • Status, summary.
  • Скаляры, dicts.

Принцип. XCom — для метаданных, не данных. Данные — в S3 / DWH, передавай URI / path / row_count.

Custom XCom backend

Если нужно передавать большие объекты — custom backend.

from airflow.models.xcom import BaseXCom

class S3XComBackend(BaseXCom):
    PREFIX = "s3://my-bucket/xcom/"

    @staticmethod
    def serialize_value(value):
        if isinstance(value, pd.DataFrame):
            key = f"{uuid4()}.parquet"
            value.to_parquet(f"s3://my-bucket/xcom/{key}")
            return BaseXCom.serialize_value(f"S3:{key}")
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(result):
        value = BaseXCom.deserialize_value(result)
        if isinstance(value, str) and value.startswith("S3:"):
            return pd.read_parquet(f"s3://my-bucket/xcom/{value[3:]}")
        return value

Полезно для DataFrames между tasks без boilerplate.

Антипаттерны

Передавать DataFrame через XCom. OOM на metadata DB. Большие данные — через S3 / shared FS, передавать только path.

XCom как sync mechanism. Tasks должны быть idempotent и не зависеть от runtime values сложным образом. Переиспользуй input data.

Sensitive в XCom. Видно через UI всем с access к Airflow. Используй Variables / Secrets.

Многоуровневое XCom passing. A → B → C → D через XCom — fragile. Лучше — каждая task читает source независимо.

XCom для миграций / DDL. XCom — runtime, не должен влиять на schema или structure.

Связанные темы

FAQ

XCom хранится между DAG runs?

Да. По умолчанию — без TTL (хотя есть cleanup). Можно настроить xcom_purge_after_days.

Можно ли передавать XCom между DAGs?

Да, через ExternalTaskSensor + XCom pull с явным dag_id.

Это официальная информация?

Нет. Статья основана на документации Airflow 2.7+.


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.