Airflow Sensors на собеседовании Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем спрашивают на собесе DE
Sensors — стандартный инструмент для зависимостей в Airflow. На собесе DE: «отличие poke и reschedule», «deferrable operators», «зачем ExternalTaskSensor».
Что такое Sensor
Особый тип Operator. Ждёт, пока условие не станет true. После — task complete'ится.
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_data',
filepath='/data/incoming/file.csv',
timeout=3600,
poke_interval=60
)timeout=3600 — после часа failed. poke_interval=60 — проверка каждую минуту.
Mode: poke vs reschedule
Poke (default). Worker слот занят весь период sensor. Каждые poke_interval секунд — check. Между checks — слот занят, ждёт.
Минус. На длинных sensors (часы) — занимает воркеры зря.
Reschedule. Sensor проверяет → если false → освобождает слот → планируется снова через poke_interval. Между checks слот свободен.
sensor = FileSensor(
task_id='wait',
filepath='...',
mode='reschedule', # критично
poke_interval=300 # 5 min
)Reschedule стандарт для long-running sensors.
Лимит. Reschedule имеет накладные расходы на DB (каждый check — UPDATE). На очень частых poke — poke лучше.
Типы sensors
FileSensor. Файл существует.
ExternalTaskSensor. Другая task в другом DAG completed.
ExternalTaskSensor(
task_id='wait_for_upstream',
external_dag_id='upstream_dag',
external_task_id='final_task',
execution_delta=timedelta(minutes=30),
mode='reschedule'
)S3KeySensor / GCSObjectExistenceSensor. Объект в облаке.
SqlSensor. SQL query возвращает не пустой / true.
HttpSensor. HTTP endpoint отвечает.
HivePartitionSensor / NamedHivePartitionSensor. Партиция в Hive существует.
KafkaSensor. Topic / partition reached offset.
TimeDeltaSensor. Подождать N времени.
Deferrable operators
Современная альтернатива sensors (Airflow 2.2+).
Идея. Task передаётся в Triggerer — отдельный async-процесс. Воркер свободен. При наступлении условия — task возвращается в queue, обрабатывается.
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
S3KeySensor(
task_id='wait',
bucket_key='s3://bucket/file.csv',
deferrable=True,
poke_interval=60
)Преимущества:
- Один Triggerer обрабатывает тысячи waiting tasks.
- Не занимает worker slot.
- Низкая latency на event.
Минус. Не все providers поддерживают (с 2026 — большинство стандартных).
В современных setup'ах — deferrable вместо reschedule.
Частые ошибки
Poke mode на длинном sensor. Один task на 10 часов — 10 часов worker слот занят. Reschedule / deferrable.
Без timeout. Sensor ждёт вечно. Установи timeout.
Слишком частый poke_interval. Каждые 5 секунд — DB load. 60s+ обычно достаточно.
ExternalTaskSensor с execution_delta=0. Если DAGs запускаются одновременно — sensor не находит upstream task. Используй execution_delta.
Не использовать deferrable. Пропускаешь главное улучшение Airflow 2.2+.
SqlSensor с тяжёлым запросом. Каждый poke = exec. На big query — много нагрузки. Оптимизируй запрос или используй другой подход.
Связанные темы
- Airflow на собесе DE
- Airflow XCom для DE
- Идемпотентность пайплайна для DE
- dbt incremental models для DE
- Подготовка к собесу Data Engineer
FAQ
Reschedule или deferrable?
Deferrable — лучше (если provider поддерживает). Reschedule — старый fallback.
Sensor может рестартануться?
Да. Catchup, manual retry, scheduler restart — все варианты. Sensor должен быть idempotent.
Это официальная информация?
Нет. Статья основана на документации Airflow 2.7+.
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.