Идемпотентность пайплайна для Data Engineer

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Зачем идемпотентность

Идемпотентность — свойство операции, при котором повторный запуск даёт тот же результат, что и первый. Запустил DAG за 2026-05-01 один раз — есть данные за день. Запустил тот же DAG за тот же день ещё пять раз — данные те же, не удвоились, не утроились.

Без идемпотентности любой ретрай — это лотерея. Airflow упал в 3 ночи на середине задачи, ретрай дописал к уже частично записанным данным дубли. На утро Slack: «у нас выручка x2». Это ситуация, в которой DE идёт чинить руками в проде, теряет два часа и получает претензию от продакта.

На собесе DE идемпотентность спрашивают почти всегда. Два уровня: (1) дать определение, (2) рассказать, как сделать идемпотентным конкретный пайплайн (SQL → DWH, Spark → S3, Kafka consumer → Postgres).

Принцип записи через перезапись окна

Главный паттерн — пайплайн должен уметь стирать свой выход за обрабатываемый интервал и записать его заново без повреждения других интервалов.

DAG запускается за день D:
1. DELETE FROM target WHERE date = D
2. INSERT INTO target SELECT ... WHERE date = D

Если DAG упал между DELETE и INSERT — есть пустота, видно по мониторингу, ретрай восстановит. Если DAG упал после INSERT — ретрай выполнит DELETE снова и впишет тот же результат. Дублей нет.

Альтернатива — INSERT ON CONFLICT DO UPDATE, но она требует уникального ключа и не решает кейс «вчера было 100 строк за день, сегодня та же логика даёт 95 — старые 5 надо снести». Перезапись окна снимает эти 5 автоматически.

Идемпотентность в SQL

Стратегия зависит от движка:

Postgres / Snowflake / BigQuery — INSERT ON CONFLICT (UPSERT):

INSERT INTO target (event_id, user_id, amount)
SELECT event_id, user_id, amount
FROM staging
WHERE event_date = '{{ ds }}'
ON CONFLICT (event_id) DO UPDATE
SET user_id = EXCLUDED.user_id,
    amount = EXCLUDED.amount;

Postgres / любой движок — DELETE + INSERT по партиции:

BEGIN;
DELETE FROM target WHERE event_date = '{{ ds }}';
INSERT INTO target SELECT * FROM staging WHERE event_date = '{{ ds }}';
COMMIT;

Транзакция важна: без неё DELETE может пройти, INSERT упасть — потеряем день.

ClickHouse — ReplacingMergeTree + версия:

CREATE TABLE target (
    event_id UInt64,
    user_id UInt64,
    amount Decimal(18, 2),
    version DateTime DEFAULT now()
) ENGINE = ReplacingMergeTree(version)
ORDER BY event_id;

При вставке дубля по event_id мерж оставит строку с большим version. Финальный SELECT — SELECT ... FROM target FINAL или argMax.

ClickHouse — INSERT INTO ... PARTITION + DROP PARTITION:

ALTER TABLE target DROP PARTITION '2026-05-01';
INSERT INTO target SELECT * FROM staging WHERE event_date = '2026-05-01';

Дёшево и атомарно для партиционированных по дате таблиц.

Spark / parquet — insertInto с overwrite=True по партиции:

spark.sql("SET spark.sql.sources.partitionOverwriteMode=dynamic")
df.write.mode("overwrite").partitionBy("event_date").saveAsTable("target")

С dynamic mode Spark перезаписывает только те партиции, в которые есть данные в DataFrame.

Идемпотентность в Airflow

Airflow построен вокруг идемпотентности — каждая задача имеет execution_date (или data_interval_start в 2.x), под которую её можно перезапустить.

Принципы:

  1. Никаких NOW() в SQL. Использовать макросы Jinja: {{ ds }} (дата запуска), {{ data_interval_start }} / {{ data_interval_end }}.
PostgresOperator(
    task_id='load_orders',
    sql="""
        DELETE FROM orders_daily WHERE event_date = '{{ ds }}';
        INSERT INTO orders_daily
        SELECT date('{{ ds }}'), COUNT(*), SUM(amount)
        FROM raw.orders
        WHERE created_at >= '{{ data_interval_start }}'
          AND created_at <  '{{ data_interval_end }}';
    """,
)

Если в SQL стоит NOW() - INTERVAL '1 day' — backfill за 100 дней назад вычислит сегодняшний минус один. Все 100 запусков напишут одно и то же. Классический баг.

  1. depends_on_past=False по умолчанию. Иначе backfill параллельно не запустишь.

  2. Catchup осознанно. catchup=True (default) при первом запуске пересчитает все периоды от start_date. Если не нужно — catchup=False.

  3. External resources — с retries и без побочных эффектов. Вызов API с дедупликацией по idempotency-key, не «увеличить счётчик».

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Идемпотентность в Spark и стриминге

Spark batch: записывать через partition overwrite (см. выше). Если пишем в S3/HDFS — путь должен быть детерминирован: s3://bucket/orders/dt=2026-05-01/. Ретрай удаляет директорию и пишет снова.

Spark Structured Streaming: включить checkpointLocation. Spark хранит offset Kafka и состояние агрегаций — при рестарте продолжает ровно с того места.

df.writeStream \
    .format("parquet") \
    .option("checkpointLocation", "/checkpoints/orders") \
    .option("path", "/data/orders") \
    .trigger(processingTime="1 minute") \
    .start()

Без checkpointLocation — после рестарта стрим начнёт с latest и потеряет необработанные данные.

Kafka consumer + Postgres sink: at-least-once + idempotent UPSERT в Postgres. Kafka offset коммитится после успешного UPSERT, не до. Если упадём после UPSERT, но до commit offset — на ретрае придёт то же сообщение, UPSERT перезапишет ту же строку.

Backfill без дублей

Backfill — пересчёт пайплайна за прошлый период. Должен работать без ручных интервенций:

  1. Декомпозировать по партициям. Запускать N независимых тасков на N дней, не один большой DELETE FROM target WHERE date BETWEEN — это монолит, упадёт целиком.

  2. Параллельность ограничить. max_active_runs в DAG, чтобы 100-дневный backfill не задушил DWH.

  3. Логика без зависимости от текущего состояния. SQL не должен использовать MAX(created_at) FROM target для определения окна — тогда backfill старых дней увидит новый MAX и пропустит данные. Окно — параметр запуска.

  4. Тестовый прогон. Сначала backfill в _test-схему, диффнуть с продом по контрольным метрикам (count, sum по дню), потом в прод.

  5. Атомарность смены. Записать в target_new, переименовать target → target_old, target_new → target. Не править прод во время чтения.

Частые ошибки

NOW() или current_date в production-SQL. Любой backfill сломан. Только макросы Airflow или параметризованная дата.

INSERT без предварительного DELETE/UPSERT. Любой ретрай удваивает данные. Если поставили auto-retry в Airflow — половина прода в дублях через неделю.

Append-only таблица без бизнес-ключа. Невозможно дедуплицировать после обнаружения дубля. Минимум — surrogate key из (source_id, source_table, event_id) и ROW_NUMBER при чтении.

Пайплайн правит данные за «вчера и раньше». Граница окна должна быть жёсткой — event_date = '{{ ds }}', не event_date <= '{{ ds }}'. Иначе перезапись окна затрёт чужие интервалы.

Транзакция вокруг DELETE+INSERT забыта. При падении между ними получаем пустоту в production. Всегда BEGIN; DELETE; INSERT; COMMIT;.

Side-effects в Python-операторах. Отправил письмо клиенту через EmailOperator — ретрай отправит ещё раз. Внешние API — только с idempotency-key.

Связанные темы

FAQ

Идемпотентность и exactly-once — это одно и то же?

Нет. Exactly-once — гарантия от системы доставки (Kafka producer, Spark Streaming с checkpoint). Идемпотентность — свойство операции записи. На практике at-least-once + идемпотентный sink эквивалентны exactly-once и проще в реализации.

Как сделать идемпотентным пайплайн с агрегацией по уникальным юзерам за месяц?

Записывать результат как полную перезапись месячной партиции: DELETE FROM monthly_users WHERE month = '2026-05'; INSERT ... GROUP BY month;. Не пытаться накапливать COUNT(DISTINCT) инкрементально — это не аддитивная метрика, дубли при ретраях гарантированы.

Что делать с external API, у которого нет idempotency-key?

Слой между пайплайном и API: writeahead-лог запросов с уникальным id, проверка перед отправкой «уже отправляли?». Альтернатива — переключить вызов в режим upsert на стороне API (например, PUT вместо POST).

Идемпотентность нужна только в production?

Нужна везде, кроме одноразовых ad-hoc-скриптов. В dev — потому что разрабы постоянно перезапускают. В CI — потому что тесты прогоняются заново. В staging — потому что туда часто перезаливают данные.

Как тестировать идемпотентность в CI?

Прогнать пайплайн дважды с одним параметром, сравнить результат: count(*) и sum(хеши строк) должны совпасть. Это самый дешёвый smoke-тест на регрессию.

Это официальная информация?

Нет. Статья основана на публичной документации Airflow, Spark, dbt и общей практике дата-инжиниринга. Конкретные паттерны зависят от стека компании.


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.