Как настроить первый Airflow DAG
Зачем аналитику писать DAG
На уровне middle+ аналитик часто должен писать и поддерживать свои pipeline. Не нужно быть дата-инженером, но базовый DAG для ежедневной выгрузки или обновления витрины — это обычно ваша работа.
Преимущества — вы контролируете свой pipeline. Если нужно изменить логику или добавить новый отчёт, не нужно ждать, пока DE найдёт время.
Структура DAG
DAG в Airflow — это Python-скрипт, который описывает, какие задачи нужно выполнять и в каком порядке. Каркас выглядит так:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'tagir',
'depends_on_past': False,
'email_on_failure': True,
'email': ['team@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'daily_user_metrics',
default_args=default_args,
description='Ежедневный расчёт пользовательских метрик',
schedule_interval='0 4 * * *', # каждый день в 4 утра
start_date=datetime(2026, 4, 1),
catchup=False,
tags=['analytics', 'metrics']
) as dag:
# задачи
passdefault_args — общие настройки для всех задач в DAG. schedule_interval в cron-формате. catchup=False — не запускать задачи за прошлое, только с текущей даты.
Первая задача
Простая Python-задача:
from airflow.operators.python import PythonOperator
def extract_users():
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@host/db')
df = pd.read_sql('SELECT * FROM users WHERE created_at::date = current_date', engine)
df.to_parquet('/tmp/users_today.parquet')
print(f'Extracted {len(df)} rows')
extract_task = PythonOperator(
task_id='extract_users',
python_callable=extract_users,
dag=dag
)Функция extract_users — это то, что будет выполняться. PythonOperator оборачивает её в Airflow task.
Зависимости между задачами
DAG имеет смысл, когда задач несколько и они зависят друг от друга:
from airflow.operators.python import PythonOperator
def extract(): ...
def transform(): ...
def load(): ...
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)
extract_task >> transform_task >> load_task>> означает «сначала выполнить левую, потом правую». Это классический ETL-pipeline. Airflow запустит extract, дождётся завершения, запустит transform, затем load.
Для параллельных задач:
extract_task >> [transform_a, transform_b] >> load_taskПосле extract параллельно запускаются transform_a и transform_b. Load стартует, только когда обе завершатся.
Тренироваться на таких вопросах можно в Telegram-боте Карьерник — там 1500+ задач с реальных собесов с разборами.
Использование XCom для передачи данных
XCom — механизм обмена небольшими данными между задачами:
def extract(**context):
count = 1000 # реальный extract
context['ti'].xcom_push(key='row_count', value=count)
def transform(**context):
count = context['ti'].xcom_pull(key='row_count', task_ids='extract')
print(f'Processing {count} rows')XCom только для метаданных (числа, ID, имена файлов). Не передавайте через него большие DataFrame — только ссылки на файлы в S3 или другом storage.
Работа с базой через Connection
Правильный способ хранить credentials — Connections в Airflow UI. В коде:
from airflow.providers.postgres.hooks.postgres import PostgresHook
def extract_from_db():
hook = PostgresHook(postgres_conn_id='warehouse')
df = hook.get_pandas_df("SELECT * FROM users WHERE created_at::date = CURRENT_DATE")
return len(df)Credentials в Airflow UI → Admin → Connections. Хранятся securely, не в коде.
SQL-задачи через PostgresOperator
Для чистых SQL-задач без Python:
from airflow.providers.postgres.operators.postgres import PostgresOperator
refresh_mart = PostgresOperator(
task_id='refresh_daily_mart',
postgres_conn_id='warehouse',
sql="""
INSERT INTO mart_daily_metrics
SELECT CURRENT_DATE AS day,
COUNT(DISTINCT user_id) AS dau,
SUM(amount) AS revenue
FROM events
WHERE event_time::date = CURRENT_DATE - INTERVAL '1 day';
""",
dag=dag
)Проще Python-задачи, если просто нужно выполнить SQL.
Планирование (schedule_interval)
В cron-формате или через preset:
schedule_interval='@daily' # каждый день в полночь
schedule_interval='@hourly' # каждый час
schedule_interval='@weekly' # каждый понедельник в полночь
schedule_interval='0 4 * * *' # каждый день в 4 утра
schedule_interval='0 9 * * MON' # по понедельникам в 9
schedule_interval='0 */6 * * *' # каждые 6 часов
schedule_interval=None # только вручнуюУчтите: время по timezone сервера Airflow (обычно UTC). Настройка Moscow time нужна явно в конфиге.
Retries и alerts
Задачи могут падать — сеть, база, временные проблемы. Airflow автоматически retry через настройку:
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # 5, 10, 20 минут
'email_on_failure': True,
'email': ['alerts@company.com']
}Если 3 попытки провалились, Airflow отправляет email. Для Slack можно настроить callback:
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
def slack_on_failure(context):
task_id = context['task_instance'].task_id
dag_id = context['dag'].dag_id
msg = f'DAG {dag_id}, task {task_id} failed!'
SlackWebhookOperator(
task_id='slack_alert',
http_conn_id='slack',
message=msg
).execute(context)
default_args['on_failure_callback'] = slack_on_failureК слову, набить руку на таких кейсах удобно через тренажёр в Telegram — разбирайте по 10 вопросов в день, через 2 недели тема становится рефлексом.
Отладка
Первый запуск DAG чаще всего падает. Чек-лист отладки:
В UI смотрите Graph View — какая задача упала. Кликаете → Logs. Там stack trace.
Для теста без scheduler используйте airflow tasks test:
airflow tasks test daily_user_metrics extract_users 2026-04-15Это запускает конкретную задачу для указанной execution_date. Логи идут сразу в stdout, удобно для дебага.
Типовые ошибки новичка
Использовать pandas для загрузки миллионов строк через XCom. XCom хранится в БД Airflow, большие данные ломают её.
Hardcode credentials в коде. Всегда через Connections.
Неправильный schedule. start_date=datetime.now() проблематично — Airflow пропустит первый запуск. Используйте фиксированную дату в прошлом.
catchup=True случайно. Запустит задачи за все прошлые execution_date, завалит кластер.
Забыть provide_context=True в старых версиях для PythonOperator. В Airflow 2+ это уже не нужно.
Структура файлов
Организация кода в проекте Airflow:
dags/
├── daily_metrics.py # DAG-скрипт
├── utils/
│ ├── __init__.py
│ ├── db_helpers.py # переиспользуемые функции
│ └── validators.py
└── sql/
├── daily_mart.sql # SQL-скрипты
└── retention.sqlSQL-файлы можно подключать через PostgresOperator:
refresh_mart = PostgresOperator(
task_id='refresh',
sql='sql/daily_mart.sql', # относительно dags folder
postgres_conn_id='warehouse'
)Читайте также
FAQ
Что лучше — DAG в файле или через Airflow UI?
Всегда в файле, под git. UI — только для monitoring и ручного trigger.
Сколько задач в одном DAG?
Обычно 5-20. Больше — разбивайте на несколько DAG.
DAG падает — с чего начать?
Log задачи в UI. Потом execution_date, connection setup, dependencies версий.
Можно ли через Airflow запускать Python-скрипт с shell-командой?
Да, через BashOperator. Но лучше PythonOperator с чистым Python — легче debug.