Как настроить первый Airflow DAG

Зачем аналитику писать DAG

На уровне middle+ аналитик часто должен писать и поддерживать свои pipeline. Не нужно быть дата-инженером, но базовый DAG для ежедневной выгрузки или обновления витрины — это обычно ваша работа.

Преимущества — вы контролируете свой pipeline. Если нужно изменить логику или добавить новый отчёт, не нужно ждать, пока DE найдёт время.

Структура DAG

DAG в Airflow — это Python-скрипт, который описывает, какие задачи нужно выполнять и в каком порядке. Каркас выглядит так:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'tagir',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['team@company.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'daily_user_metrics',
    default_args=default_args,
    description='Ежедневный расчёт пользовательских метрик',
    schedule_interval='0 4 * * *',  # каждый день в 4 утра
    start_date=datetime(2026, 4, 1),
    catchup=False,
    tags=['analytics', 'metrics']
) as dag:

    # задачи

    pass

default_args — общие настройки для всех задач в DAG. schedule_interval в cron-формате. catchup=False — не запускать задачи за прошлое, только с текущей даты.

Первая задача

Простая Python-задача:

from airflow.operators.python import PythonOperator

def extract_users():
    import pandas as pd
    from sqlalchemy import create_engine

    engine = create_engine('postgresql://user:pass@host/db')
    df = pd.read_sql('SELECT * FROM users WHERE created_at::date = current_date', engine)
    df.to_parquet('/tmp/users_today.parquet')
    print(f'Extracted {len(df)} rows')

extract_task = PythonOperator(
    task_id='extract_users',
    python_callable=extract_users,
    dag=dag
)

Функция extract_users — это то, что будет выполняться. PythonOperator оборачивает её в Airflow task.

Зависимости между задачами

DAG имеет смысл, когда задач несколько и они зависят друг от друга:

from airflow.operators.python import PythonOperator

def extract(): ...
def transform(): ...
def load(): ...

extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)

extract_task >> transform_task >> load_task

>> означает «сначала выполнить левую, потом правую». Это классический ETL-pipeline. Airflow запустит extract, дождётся завершения, запустит transform, затем load.

Для параллельных задач:

extract_task >> [transform_a, transform_b] >> load_task

После extract параллельно запускаются transform_a и transform_b. Load стартует, только когда обе завершатся.

Тренироваться на таких вопросах можно в Telegram-боте Карьерник — там 1500+ задач с реальных собесов с разборами.

Использование XCom для передачи данных

XCom — механизм обмена небольшими данными между задачами:

def extract(**context):
    count = 1000  # реальный extract
    context['ti'].xcom_push(key='row_count', value=count)

def transform(**context):
    count = context['ti'].xcom_pull(key='row_count', task_ids='extract')
    print(f'Processing {count} rows')

XCom только для метаданных (числа, ID, имена файлов). Не передавайте через него большие DataFrame — только ссылки на файлы в S3 или другом storage.

Работа с базой через Connection

Правильный способ хранить credentials — Connections в Airflow UI. В коде:

from airflow.providers.postgres.hooks.postgres import PostgresHook

def extract_from_db():
    hook = PostgresHook(postgres_conn_id='warehouse')
    df = hook.get_pandas_df("SELECT * FROM users WHERE created_at::date = CURRENT_DATE")
    return len(df)

Credentials в Airflow UI → Admin → Connections. Хранятся securely, не в коде.

SQL-задачи через PostgresOperator

Для чистых SQL-задач без Python:

from airflow.providers.postgres.operators.postgres import PostgresOperator

refresh_mart = PostgresOperator(
    task_id='refresh_daily_mart',
    postgres_conn_id='warehouse',
    sql="""
        INSERT INTO mart_daily_metrics
        SELECT CURRENT_DATE AS day,
            COUNT(DISTINCT user_id) AS dau,
            SUM(amount) AS revenue
        FROM events
        WHERE event_time::date = CURRENT_DATE - INTERVAL '1 day';
    """,
    dag=dag
)

Проще Python-задачи, если просто нужно выполнить SQL.

Планирование (schedule_interval)

В cron-формате или через preset:

schedule_interval='@daily'          # каждый день в полночь
schedule_interval='@hourly'         # каждый час
schedule_interval='@weekly'         # каждый понедельник в полночь
schedule_interval='0 4 * * *'       # каждый день в 4 утра
schedule_interval='0 9 * * MON'     # по понедельникам в 9
schedule_interval='0 */6 * * *'     # каждые 6 часов
schedule_interval=None              # только вручную

Учтите: время по timezone сервера Airflow (обычно UTC). Настройка Moscow time нужна явно в конфиге.

Retries и alerts

Задачи могут падать — сеть, база, временные проблемы. Airflow автоматически retry через настройку:

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,  # 5, 10, 20 минут
    'email_on_failure': True,
    'email': ['alerts@company.com']
}

Если 3 попытки провалились, Airflow отправляет email. Для Slack можно настроить callback:

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

def slack_on_failure(context):
    task_id = context['task_instance'].task_id
    dag_id = context['dag'].dag_id
    msg = f'DAG {dag_id}, task {task_id} failed!'
    SlackWebhookOperator(
        task_id='slack_alert',
        http_conn_id='slack',
        message=msg
    ).execute(context)

default_args['on_failure_callback'] = slack_on_failure

К слову, набить руку на таких кейсах удобно через тренажёр в Telegram — разбирайте по 10 вопросов в день, через 2 недели тема становится рефлексом.

Отладка

Первый запуск DAG чаще всего падает. Чек-лист отладки:

В UI смотрите Graph View — какая задача упала. Кликаете → Logs. Там stack trace.

Для теста без scheduler используйте airflow tasks test:

airflow tasks test daily_user_metrics extract_users 2026-04-15

Это запускает конкретную задачу для указанной execution_date. Логи идут сразу в stdout, удобно для дебага.

Типовые ошибки новичка

Использовать pandas для загрузки миллионов строк через XCom. XCom хранится в БД Airflow, большие данные ломают её.

Hardcode credentials в коде. Всегда через Connections.

Неправильный schedule. start_date=datetime.now() проблематично — Airflow пропустит первый запуск. Используйте фиксированную дату в прошлом.

catchup=True случайно. Запустит задачи за все прошлые execution_date, завалит кластер.

Забыть provide_context=True в старых версиях для PythonOperator. В Airflow 2+ это уже не нужно.

Структура файлов

Организация кода в проекте Airflow:

dags/
├── daily_metrics.py        # DAG-скрипт
├── utils/
│   ├── __init__.py
│   ├── db_helpers.py       # переиспользуемые функции
│   └── validators.py
└── sql/
    ├── daily_mart.sql      # SQL-скрипты
    └── retention.sql

SQL-файлы можно подключать через PostgresOperator:

refresh_mart = PostgresOperator(
    task_id='refresh',
    sql='sql/daily_mart.sql',  # относительно dags folder
    postgres_conn_id='warehouse'
)

Читайте также

FAQ

Что лучше — DAG в файле или через Airflow UI?

Всегда в файле, под git. UI — только для monitoring и ручного trigger.

Сколько задач в одном DAG?

Обычно 5-20. Больше — разбивайте на несколько DAG.

DAG падает — с чего начать?

Log задачи в UI. Потом execution_date, connection setup, dependencies версий.

Можно ли через Airflow запускать Python-скрипт с shell-командой?

Да, через BashOperator. Но лучше PythonOperator с чистым Python — легче debug.