Идемпотентность пайплайна для Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем идемпотентность
Идемпотентность — свойство операции, при котором повторный запуск даёт тот же результат, что и первый. Запустил DAG за 2026-05-01 один раз — есть данные за день. Запустил тот же DAG за тот же день ещё пять раз — данные те же, не удвоились, не утроились.
Без идемпотентности любой ретрай — это лотерея. Airflow упал в 3 ночи на середине задачи, ретрай дописал к уже частично записанным данным дубли. На утро Slack: «у нас выручка x2». Это ситуация, в которой DE идёт чинить руками в проде, теряет два часа и получает претензию от продакта.
На собесе DE идемпотентность спрашивают почти всегда. Два уровня: (1) дать определение, (2) рассказать, как сделать идемпотентным конкретный пайплайн (SQL → DWH, Spark → S3, Kafka consumer → Postgres).
Принцип записи через перезапись окна
Главный паттерн — пайплайн должен уметь стирать свой выход за обрабатываемый интервал и записать его заново без повреждения других интервалов.
DAG запускается за день D:
1. DELETE FROM target WHERE date = D
2. INSERT INTO target SELECT ... WHERE date = DЕсли DAG упал между DELETE и INSERT — есть пустота, видно по мониторингу, ретрай восстановит. Если DAG упал после INSERT — ретрай выполнит DELETE снова и впишет тот же результат. Дублей нет.
Альтернатива — INSERT ON CONFLICT DO UPDATE, но она требует уникального ключа и не решает кейс «вчера было 100 строк за день, сегодня та же логика даёт 95 — старые 5 надо снести». Перезапись окна снимает эти 5 автоматически.
Идемпотентность в SQL
Стратегия зависит от движка:
Postgres / Snowflake / BigQuery — INSERT ON CONFLICT (UPSERT):
INSERT INTO target (event_id, user_id, amount)
SELECT event_id, user_id, amount
FROM staging
WHERE event_date = '{{ ds }}'
ON CONFLICT (event_id) DO UPDATE
SET user_id = EXCLUDED.user_id,
amount = EXCLUDED.amount;Postgres / любой движок — DELETE + INSERT по партиции:
BEGIN;
DELETE FROM target WHERE event_date = '{{ ds }}';
INSERT INTO target SELECT * FROM staging WHERE event_date = '{{ ds }}';
COMMIT;Транзакция важна: без неё DELETE может пройти, INSERT упасть — потеряем день.
ClickHouse — ReplacingMergeTree + версия:
CREATE TABLE target (
event_id UInt64,
user_id UInt64,
amount Decimal(18, 2),
version DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(version)
ORDER BY event_id;При вставке дубля по event_id мерж оставит строку с большим version. Финальный SELECT — SELECT ... FROM target FINAL или argMax.
ClickHouse — INSERT INTO ... PARTITION + DROP PARTITION:
ALTER TABLE target DROP PARTITION '2026-05-01';
INSERT INTO target SELECT * FROM staging WHERE event_date = '2026-05-01';Дёшево и атомарно для партиционированных по дате таблиц.
Spark / parquet — insertInto с overwrite=True по партиции:
spark.sql("SET spark.sql.sources.partitionOverwriteMode=dynamic")
df.write.mode("overwrite").partitionBy("event_date").saveAsTable("target")С dynamic mode Spark перезаписывает только те партиции, в которые есть данные в DataFrame.
Идемпотентность в Airflow
Airflow построен вокруг идемпотентности — каждая задача имеет execution_date (или data_interval_start в 2.x), под которую её можно перезапустить.
Принципы:
- Никаких
NOW()в SQL. Использовать макросы Jinja:{{ ds }}(дата запуска),{{ data_interval_start }}/{{ data_interval_end }}.
PostgresOperator(
task_id='load_orders',
sql="""
DELETE FROM orders_daily WHERE event_date = '{{ ds }}';
INSERT INTO orders_daily
SELECT date('{{ ds }}'), COUNT(*), SUM(amount)
FROM raw.orders
WHERE created_at >= '{{ data_interval_start }}'
AND created_at < '{{ data_interval_end }}';
""",
)Если в SQL стоит NOW() - INTERVAL '1 day' — backfill за 100 дней назад вычислит сегодняшний минус один. Все 100 запусков напишут одно и то же. Классический баг.
depends_on_past=Falseпо умолчанию. Иначе backfill параллельно не запустишь.Catchup осознанно.
catchup=True(default) при первом запуске пересчитает все периоды отstart_date. Если не нужно —catchup=False.External resources — с retries и без побочных эффектов. Вызов API с дедупликацией по idempotency-key, не «увеличить счётчик».
Идемпотентность в Spark и стриминге
Spark batch: записывать через partition overwrite (см. выше). Если пишем в S3/HDFS — путь должен быть детерминирован: s3://bucket/orders/dt=2026-05-01/. Ретрай удаляет директорию и пишет снова.
Spark Structured Streaming: включить checkpointLocation. Spark хранит offset Kafka и состояние агрегаций — при рестарте продолжает ровно с того места.
df.writeStream \
.format("parquet") \
.option("checkpointLocation", "/checkpoints/orders") \
.option("path", "/data/orders") \
.trigger(processingTime="1 minute") \
.start()Без checkpointLocation — после рестарта стрим начнёт с latest и потеряет необработанные данные.
Kafka consumer + Postgres sink: at-least-once + idempotent UPSERT в Postgres. Kafka offset коммитится после успешного UPSERT, не до. Если упадём после UPSERT, но до commit offset — на ретрае придёт то же сообщение, UPSERT перезапишет ту же строку.
Backfill без дублей
Backfill — пересчёт пайплайна за прошлый период. Должен работать без ручных интервенций:
Декомпозировать по партициям. Запускать N независимых тасков на N дней, не один большой DELETE FROM target WHERE date BETWEEN — это монолит, упадёт целиком.
Параллельность ограничить.
max_active_runsв DAG, чтобы 100-дневный backfill не задушил DWH.Логика без зависимости от текущего состояния. SQL не должен использовать
MAX(created_at) FROM targetдля определения окна — тогда backfill старых дней увидит новый MAX и пропустит данные. Окно — параметр запуска.Тестовый прогон. Сначала backfill в
_test-схему, диффнуть с продом по контрольным метрикам (count, sum по дню), потом в прод.Атомарность смены. Записать в
target_new, переименоватьtarget → target_old,target_new → target. Не править прод во время чтения.
Частые ошибки
NOW() или current_date в production-SQL. Любой backfill сломан. Только макросы Airflow или параметризованная дата.
INSERT без предварительного DELETE/UPSERT. Любой ретрай удваивает данные. Если поставили auto-retry в Airflow — половина прода в дублях через неделю.
Append-only таблица без бизнес-ключа. Невозможно дедуплицировать после обнаружения дубля. Минимум — surrogate key из (source_id, source_table, event_id) и ROW_NUMBER при чтении.
Пайплайн правит данные за «вчера и раньше». Граница окна должна быть жёсткой — event_date = '{{ ds }}', не event_date <= '{{ ds }}'. Иначе перезапись окна затрёт чужие интервалы.
Транзакция вокруг DELETE+INSERT забыта. При падении между ними получаем пустоту в production. Всегда BEGIN; DELETE; INSERT; COMMIT;.
Side-effects в Python-операторах. Отправил письмо клиенту через EmailOperator — ретрай отправит ещё раз. Внешние API — только с idempotency-key.
Связанные темы
- Airflow на собеседовании DE
- Kafka на собеседовании DE
- Spark на собеседовании DE
- dbt на собеседовании DE
- Подготовка к собесу Data Engineer
FAQ
Идемпотентность и exactly-once — это одно и то же?
Нет. Exactly-once — гарантия от системы доставки (Kafka producer, Spark Streaming с checkpoint). Идемпотентность — свойство операции записи. На практике at-least-once + идемпотентный sink эквивалентны exactly-once и проще в реализации.
Как сделать идемпотентным пайплайн с агрегацией по уникальным юзерам за месяц?
Записывать результат как полную перезапись месячной партиции: DELETE FROM monthly_users WHERE month = '2026-05'; INSERT ... GROUP BY month;. Не пытаться накапливать COUNT(DISTINCT) инкрементально — это не аддитивная метрика, дубли при ретраях гарантированы.
Что делать с external API, у которого нет idempotency-key?
Слой между пайплайном и API: writeahead-лог запросов с уникальным id, проверка перед отправкой «уже отправляли?». Альтернатива — переключить вызов в режим upsert на стороне API (например, PUT вместо POST).
Идемпотентность нужна только в production?
Нужна везде, кроме одноразовых ad-hoc-скриптов. В dev — потому что разрабы постоянно перезапускают. В CI — потому что тесты прогоняются заново. В staging — потому что туда часто перезаливают данные.
Как тестировать идемпотентность в CI?
Прогнать пайплайн дважды с одним параметром, сравнить результат: count(*) и sum(хеши строк) должны совпасть. Это самый дешёвый smoke-тест на регрессию.
Это официальная информация?
Нет. Статья основана на публичной документации Airflow, Spark, dbt и общей практике дата-инжиниринга. Конкретные паттерны зависят от стека компании.
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.