Airflow Sensors на собеседовании Data Engineer

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Зачем спрашивают на собесе DE

Sensors — стандартный инструмент для зависимостей в Airflow. На собесе DE: «отличие poke и reschedule», «deferrable operators», «зачем ExternalTaskSensor».

Что такое Sensor

Особый тип Operator. Ждёт, пока условие не станет true. После — task complete'ится.

from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_data',
    filepath='/data/incoming/file.csv',
    timeout=3600,
    poke_interval=60
)

timeout=3600 — после часа failed. poke_interval=60 — проверка каждую минуту.

Mode: poke vs reschedule

Poke (default). Worker слот занят весь период sensor. Каждые poke_interval секунд — check. Между checks — слот занят, ждёт.

Минус. На длинных sensors (часы) — занимает воркеры зря.

Reschedule. Sensor проверяет → если false → освобождает слот → планируется снова через poke_interval. Между checks слот свободен.

sensor = FileSensor(
    task_id='wait',
    filepath='...',
    mode='reschedule',          # критично
    poke_interval=300            # 5 min
)

Reschedule стандарт для long-running sensors.

Лимит. Reschedule имеет накладные расходы на DB (каждый check — UPDATE). На очень частых poke — poke лучше.

Типы sensors

FileSensor. Файл существует.

ExternalTaskSensor. Другая task в другом DAG completed.

ExternalTaskSensor(
    task_id='wait_for_upstream',
    external_dag_id='upstream_dag',
    external_task_id='final_task',
    execution_delta=timedelta(minutes=30),
    mode='reschedule'
)

S3KeySensor / GCSObjectExistenceSensor. Объект в облаке.

SqlSensor. SQL query возвращает не пустой / true.

HttpSensor. HTTP endpoint отвечает.

HivePartitionSensor / NamedHivePartitionSensor. Партиция в Hive существует.

KafkaSensor. Topic / partition reached offset.

TimeDeltaSensor. Подождать N времени.

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Deferrable operators

Современная альтернатива sensors (Airflow 2.2+).

Идея. Task передаётся в Triggerer — отдельный async-процесс. Воркер свободен. При наступлении условия — task возвращается в queue, обрабатывается.

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

S3KeySensor(
    task_id='wait',
    bucket_key='s3://bucket/file.csv',
    deferrable=True,
    poke_interval=60
)

Преимущества:

  • Один Triggerer обрабатывает тысячи waiting tasks.
  • Не занимает worker slot.
  • Низкая latency на event.

Минус. Не все providers поддерживают (с 2026 — большинство стандартных).

В современных setup'ах — deferrable вместо reschedule.

Частые ошибки

Poke mode на длинном sensor. Один task на 10 часов — 10 часов worker слот занят. Reschedule / deferrable.

Без timeout. Sensor ждёт вечно. Установи timeout.

Слишком частый poke_interval. Каждые 5 секунд — DB load. 60s+ обычно достаточно.

ExternalTaskSensor с execution_delta=0. Если DAGs запускаются одновременно — sensor не находит upstream task. Используй execution_delta.

Не использовать deferrable. Пропускаешь главное улучшение Airflow 2.2+.

SqlSensor с тяжёлым запросом. Каждый poke = exec. На big query — много нагрузки. Оптимизируй запрос или используй другой подход.

Связанные темы

FAQ

Reschedule или deferrable?

Deferrable — лучше (если provider поддерживает). Reschedule — старый fallback.

Sensor может рестартануться?

Да. Catchup, manual retry, scheduler restart — все варианты. Sensor должен быть idempotent.

Это официальная информация?

Нет. Статья основана на документации Airflow 2.7+.


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.