Airflow для аналитика: шпаргалка

Что такое Airflow

Apache Airflow — оркестратор задач (workflow engine). Позволяет писать, запускать и мониторить data pipelines.

Используется для:

  • Ежедневных ETL-джоб.
  • Обновления дашбордов.
  • Запуска ML-пайплайнов.
  • Переноса данных между системами.

Ключевые концепции

DAG (Directed Acyclic Graph)

Pipeline задач. В Airflow каждый pipeline — это DAG.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    'daily_etl',
    schedule_interval='0 3 * * *',  # каждый день в 3 ночи
    start_date=datetime(2026, 4, 1),
    catchup=False
) as dag:
    task1 = PythonOperator(task_id='extract', python_callable=extract_data)
    task2 = PythonOperator(task_id='transform', python_callable=transform_data)
    task3 = PythonOperator(task_id='load', python_callable=load_data)

    task1 >> task2 >> task3

Tasks

Отдельные шаги в DAG. Зависимости через >> или <<.

Operators

Строительные блоки задач:

  • PythonOperator — вызов Python-функции.
  • BashOperator — shell-команда.
  • PostgresOperator — SQL в PostgreSQL.
  • SnowflakeOperator — SQL в Snowflake.
  • S3ToRedshiftOperator — перенос данных.
  • EmailOperator — отправить email.

Scheduling

Cron-выражения:

schedule_interval='@daily'         # каждый день в полночь
schedule_interval='@hourly'        # каждый час
schedule_interval='0 3 * * *'      # 3 ночи каждый день
schedule_interval='0 9 * * MON'    # 9 утра каждый понедельник
schedule_interval=None             # только вручную

Типичный ETL DAG

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator

def extract_users():
    hook = PostgresHook('source_db')
    df = hook.get_pandas_df('SELECT * FROM users WHERE updated_at > now() - interval \'1 day\'')
    df.to_parquet('/tmp/users.parquet')

def load_to_warehouse():
    hook = PostgresHook('warehouse_db')
    # ... load logic

with DAG('users_etl', schedule_interval='@daily', start_date=datetime(2026, 4, 1)) as dag:
    extract = PythonOperator(task_id='extract', python_callable=extract_users)
    load = PythonOperator(task_id='load', python_callable=load_to_warehouse)

    extract >> load

XCom — передача данных между задачами

def push_value(**context):
    context['ti'].xcom_push(key='my_key', value=42)

def pull_value(**context):
    value = context['ti'].xcom_pull(key='my_key', task_ids='push_task')
    print(value)

Только для маленьких данных (metadata). Большие датасеты — через S3 / file storage.

Попробовать силы на подобных вопросах проще всего в тренажёре Карьерник — прямо в Telegram, без регистрации через сайт.

Variables и Connections

Variables (UI / Admin)

Конфигурация вне кода:

from airflow.models import Variable
schedule = Variable.get('schedule_name')

Connections

Доступы к БД, API, cloud:

hook = PostgresHook(postgres_conn_id='prod_db')

Креды хранятся в Airflow Connections, не в коде.

Sensors

Ждут событие (файл появился, DAG завершился):

from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_data',
    filepath='/tmp/input.csv',
    timeout=3600
)

Задачи аналитика в Airflow

1. Daily SQL aggregates

from airflow.providers.postgres.operators.postgres import PostgresOperator

build_mart = PostgresOperator(
    task_id='daily_mart',
    sql="""
    INSERT INTO mart_daily SELECT date, count(*) FROM events GROUP BY date;
    """
)

2. Обновление дашбордов

После ETL — триггер обновления Metabase / Tableau кэша.

3. Проверка качества данных

def check_null_rate():
    hook = PostgresHook('warehouse')
    result = hook.get_first('SELECT COUNT(*) FILTER (WHERE email IS NULL) * 1.0 / COUNT(*) FROM users')
    if result[0] > 0.1:
        raise ValueError(f'Too many nulls: {result[0]}')

4. Отправка отчётов

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

send_report = SlackWebhookOperator(
    task_id='report',
    message='Daily metrics ready: {{ var.value.dashboard_url }}'
)

Мониторинг и debugging

Airflow UI

  • Graph View — визуализация DAG.
  • Tree View — история прогонов.
  • Task Logs — stdout/stderr задачи.
  • XCom — значения между задачами.

Retry и SLA

task = PythonOperator(
    task_id='important',
    python_callable=func,
    retries=3,
    retry_delay=timedelta(minutes=5),
    sla=timedelta(hours=1)
)

Alerts

from airflow import DAG

def slack_on_failure(context):
    # Отправить в Slack
    pass

with DAG(..., on_failure_callback=slack_on_failure):
    ...

dbt + Airflow

Популярная комбинация:

  • Airflow — оркестратор (когда запускать).
  • dbt — transformation (что делать с SQL).
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

dbt_run = DbtCloudRunJobOperator(
    task_id='dbt_run',
    dbt_cloud_conn_id='dbt_cloud',
    job_id=12345
)

Пройти 30–50 задач по теме за вечер можно в Telegram-тренажёре. Это то, что отличает «знаю» от «уверенно отвечу на собесе».

Альтернативы

  • Prefect — современнее, дружественнее.
  • Dagster — акцент на data assets.
  • Luigi — старый, уходит.
  • Windmill — легче, self-hosted.

В enterprise 80% — Airflow. В современных стартапах — Prefect / Dagster.

Нужно ли аналитику

На junior — нет

Знать, что Airflow существует.

На middle — желательно

Читать код чужих DAG, дописывать простые задачи.

На senior / DA+DE — обязательно

Писать DAG, настраивать connections, мониторить.

Типичные ошибки

1. Большие данные через XCom

XCom не для датасетов. Только metadata.

2. Зависимость от external state

Задача использует текущую дату → не воспроизводимо. Используйте ds (execution_date).

3. Медленные DAG-и

Параллелизуйте независимые задачи. Дробите большие задачи на мелкие.

4. Секреты в коде

Никогда. Используйте Connections и Variables.

Читайте также

FAQ

Airflow на Python — обязательно?

DAG-и пишутся в Python. Нужен базовый Python.

Self-hosted или Airflow Cloud?

Self-hosted — контроль и бесплатно. Cloud (Astronomer, MWAA) — быстрый старт, плата.

Сколько стоит?

Open-source сам Airflow — бесплатно. Инфраструктура (сервер, БД, логи) — ресурсы.

Почему DAG не запускается?

Проверьте: schedule_interval, start_date, catchup=False, is_paused. В UI есть toggle on/off.