Airflow для аналитика: шпаргалка
Что такое Airflow
Apache Airflow — оркестратор задач (workflow engine). Позволяет писать, запускать и мониторить data pipelines.
Используется для:
- Ежедневных ETL-джоб.
- Обновления дашбордов.
- Запуска ML-пайплайнов.
- Переноса данных между системами.
Ключевые концепции
DAG (Directed Acyclic Graph)
Pipeline задач. В Airflow каждый pipeline — это DAG.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG(
'daily_etl',
schedule_interval='0 3 * * *', # каждый день в 3 ночи
start_date=datetime(2026, 4, 1),
catchup=False
) as dag:
task1 = PythonOperator(task_id='extract', python_callable=extract_data)
task2 = PythonOperator(task_id='transform', python_callable=transform_data)
task3 = PythonOperator(task_id='load', python_callable=load_data)
task1 >> task2 >> task3Tasks
Отдельные шаги в DAG. Зависимости через >> или <<.
Operators
Строительные блоки задач:
- PythonOperator — вызов Python-функции.
- BashOperator — shell-команда.
- PostgresOperator — SQL в PostgreSQL.
- SnowflakeOperator — SQL в Snowflake.
- S3ToRedshiftOperator — перенос данных.
- EmailOperator — отправить email.
Scheduling
Cron-выражения:
schedule_interval='@daily' # каждый день в полночь
schedule_interval='@hourly' # каждый час
schedule_interval='0 3 * * *' # 3 ночи каждый день
schedule_interval='0 9 * * MON' # 9 утра каждый понедельник
schedule_interval=None # только вручнуюТипичный ETL DAG
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def extract_users():
hook = PostgresHook('source_db')
df = hook.get_pandas_df('SELECT * FROM users WHERE updated_at > now() - interval \'1 day\'')
df.to_parquet('/tmp/users.parquet')
def load_to_warehouse():
hook = PostgresHook('warehouse_db')
# ... load logic
with DAG('users_etl', schedule_interval='@daily', start_date=datetime(2026, 4, 1)) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_users)
load = PythonOperator(task_id='load', python_callable=load_to_warehouse)
extract >> loadXCom — передача данных между задачами
def push_value(**context):
context['ti'].xcom_push(key='my_key', value=42)
def pull_value(**context):
value = context['ti'].xcom_pull(key='my_key', task_ids='push_task')
print(value)Только для маленьких данных (metadata). Большие датасеты — через S3 / file storage.
Попробовать силы на подобных вопросах проще всего в тренажёре Карьерник — прямо в Telegram, без регистрации через сайт.
Variables и Connections
Variables (UI / Admin)
Конфигурация вне кода:
from airflow.models import Variable
schedule = Variable.get('schedule_name')Connections
Доступы к БД, API, cloud:
hook = PostgresHook(postgres_conn_id='prod_db')Креды хранятся в Airflow Connections, не в коде.
Sensors
Ждут событие (файл появился, DAG завершился):
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_data',
filepath='/tmp/input.csv',
timeout=3600
)Задачи аналитика в Airflow
1. Daily SQL aggregates
from airflow.providers.postgres.operators.postgres import PostgresOperator
build_mart = PostgresOperator(
task_id='daily_mart',
sql="""
INSERT INTO mart_daily SELECT date, count(*) FROM events GROUP BY date;
"""
)2. Обновление дашбордов
После ETL — триггер обновления Metabase / Tableau кэша.
3. Проверка качества данных
def check_null_rate():
hook = PostgresHook('warehouse')
result = hook.get_first('SELECT COUNT(*) FILTER (WHERE email IS NULL) * 1.0 / COUNT(*) FROM users')
if result[0] > 0.1:
raise ValueError(f'Too many nulls: {result[0]}')4. Отправка отчётов
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
send_report = SlackWebhookOperator(
task_id='report',
message='Daily metrics ready: {{ var.value.dashboard_url }}'
)Мониторинг и debugging
Airflow UI
- Graph View — визуализация DAG.
- Tree View — история прогонов.
- Task Logs — stdout/stderr задачи.
- XCom — значения между задачами.
Retry и SLA
task = PythonOperator(
task_id='important',
python_callable=func,
retries=3,
retry_delay=timedelta(minutes=5),
sla=timedelta(hours=1)
)Alerts
from airflow import DAG
def slack_on_failure(context):
# Отправить в Slack
pass
with DAG(..., on_failure_callback=slack_on_failure):
...dbt + Airflow
Популярная комбинация:
- Airflow — оркестратор (когда запускать).
- dbt — transformation (что делать с SQL).
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
dbt_run = DbtCloudRunJobOperator(
task_id='dbt_run',
dbt_cloud_conn_id='dbt_cloud',
job_id=12345
)Пройти 30–50 задач по теме за вечер можно в Telegram-тренажёре. Это то, что отличает «знаю» от «уверенно отвечу на собесе».
Альтернативы
- Prefect — современнее, дружественнее.
- Dagster — акцент на data assets.
- Luigi — старый, уходит.
- Windmill — легче, self-hosted.
В enterprise 80% — Airflow. В современных стартапах — Prefect / Dagster.
Нужно ли аналитику
На junior — нет
Знать, что Airflow существует.
На middle — желательно
Читать код чужих DAG, дописывать простые задачи.
На senior / DA+DE — обязательно
Писать DAG, настраивать connections, мониторить.
Типичные ошибки
1. Большие данные через XCom
XCom не для датасетов. Только metadata.
2. Зависимость от external state
Задача использует текущую дату → не воспроизводимо. Используйте ds (execution_date).
3. Медленные DAG-и
Параллелизуйте независимые задачи. Дробите большие задачи на мелкие.
4. Секреты в коде
Никогда. Используйте Connections и Variables.
Читайте также
FAQ
Airflow на Python — обязательно?
DAG-и пишутся в Python. Нужен базовый Python.
Self-hosted или Airflow Cloud?
Self-hosted — контроль и бесплатно. Cloud (Astronomer, MWAA) — быстрый старт, плата.
Сколько стоит?
Open-source сам Airflow — бесплатно. Инфраструктура (сервер, БД, логи) — ресурсы.
Почему DAG не запускается?
Проверьте: schedule_interval, start_date, catchup=False, is_paused. В UI есть toggle on/off.