Spark Catalyst и AQE на собеседовании Data Engineer

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Зачем спрашивают на собесе DE

Catalyst — мозг Spark SQL. AQE (Adaptive Query Execution) — главное улучшение Spark 3.x. На собесе DE: «как Catalyst оптимизирует JOIN», «зачем AQE», «что такое skew join optimization». Senior — нюансы physical plans, broadcast joins, codegen.

Главная боль без понимания — DE отключил AQE из-за непонимания, не получил скорость на skewed данных и думает, что Spark тормозит.

Catalyst: фазы оптимизации

Catalyst — extensible query optimizer Spark. На входе — SQL или DSL, на выходе — JVM-байткод.

Фазы:

SQL/DSL
   ↓
Unresolved Logical Plan       ← после парсинга
   ↓ (analysis)
Resolved Logical Plan         ← подтянули каталог: имена → колонки
   ↓ (logical optimization)
Optimized Logical Plan        ← rule-based преобразования
   ↓ (physical planning)
Physical Plan                 ← выбор conkpetных алгоритмов (Sort vs Hash join)
   ↓ (code generation)
RDDs                          ← byte-code на JVM

df.explain(mode='extended') показывает все 4 этапа.

Logical optimization

Применяются правила преобразования (rule-based):

Predicate pushdown. Фильтр поднимается ближе к источнику.

filter(amount > 100) ↔ scan(orders) → scan(orders, push filter [amount > 100])

В parquet это превращается в фильтрацию на уровне row group statistics — читаем меньше с диска.

Projection pruning. Если в SELECT нет колонки — она не читается.

SELECT user_id FROM orders → scan только user_id колонки в parquet

Constant folding. 1 + 2 → 3 заранее.

Join reordering. Если возможно (cost-based) — поменять порядок join'ов для меньшего intermediate.

Predicate combination. a=1 AND a=2 упрощается / отсекается.

Subquery elimination. Простые подзапросы разворачиваются в join.

Все эти правила Spark знает «from the box» и применяет автоматически.

Physical planning

Logical plan → physical: выбираются конкретные операторы.

Joins:

  • BroadcastHashJoin — если одна сторона маленькая (≤ spark.sql.autoBroadcastJoinThreshold, дефолт 10 МБ).
  • SortMergeJoin — для больших равенств (=).
  • ShuffleHashJoin — реже, для специфичных случаев.
  • CartesianProduct — для cross join.
  • BroadcastNestedLoopJoin — для условий, не подходящих под equi-join.

Aggregations:

  • HashAggregate — основной, быстрый.
  • SortAggregate — fallback, если данные не помещаются в hash.

Cost-Based Optimizer (CBO). Использует статистику таблиц (ANALYZE TABLE ... COMPUTE STATISTICS). Выбирает join order и тип join. Включается через spark.sql.cbo.enabled=true.

Tungsten и codegen

Tungsten — оптимизация уровня памяти и CPU.

  • Off-heap memory — меньше GC.
  • Cache-aware computation.
  • Whole-stage codegen — генерирует Java-код, объединяющий несколько операций в один цикл (без overhead на iterator pattern).
HashAggregate
└─ Project
   └─ Filter
      └─ Scan

Whole-stage codegen → один Java-метод, где filter + project + aggregate комбинированы. Эффективность — близко к нативному коду.

В explain whole-stage выглядит как WholeStageCodegen (1) — обёртка над несколькими операторами.

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

AQE: что и зачем

Adaptive Query Execution — добавлено в Spark 3.0, по умолчанию on с 3.2+.

Идея. Catalyst строит план до выполнения. AQE может перепланировать план во время выполнения, опираясь на реальную статистику shuffle.

Главные оптимизации AQE:

1. Dynamic coalesce shuffle partitions. После shuffle Spark смотрит реальный размер партиций — слишком мелкие объединяет.

spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64MB  # дефолт

Без AQE — 200 партиций (по spark.sql.shuffle.partitions), большинство мелкие → overhead.

2. Switch SortMergeJoin → BroadcastHashJoin. Если после фильтрации одна сторона стала достаточно маленькой — broadcast вместо merge.

3. Skew join optimization. Обнаруживает «горячие» партиции (skewed key с массой записей) и разбивает их на части, дублируя другую сторону.

spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB

Включение AQE:

spark.sql.adaptive.enabled = true   # дефолт в 3.2+

Что AQE НЕ умеет.

  • Не оптимизирует план до начала выполнения (это Catalyst).
  • Не помогает, если skew в исходных данных без shuffle (например, partition skew в Parquet).

Чтение плана через explain

df.explain()                  # physical plan
df.explain(mode='extended')   # все 4 фазы
df.explain(mode='cost')       # с оценками size/rows
df.explain(mode='formatted')  # форматированный, удобный

Ключевые элементы:

  • Scan parquet [...] — чтение источника.
  • Filter (amount > 100) — фильтр (если pushed — внутри Scan).
  • Project [user_id, amount] — projection.
  • Exchange hashpartitioning(user_id, 200) — shuffle.
  • *(N) HashAggregate ... — звёздочка = whole-stage codegen группа N.
  • BroadcastExchange — собрали маленькую сторону в broadcast.
  • BroadcastHashJoin / SortMergeJoin — тип join.

AQE на runtime. AdaptiveSparkPlan ... оборачивает план; реальные изменения видны в Spark UI после выполнения.

Частые ошибки

Отключать AQE. Из-за слухов «AQE нестабилен» (это было в 3.0). С 3.2+ — production-ready, обычно даёт буст.

Проигнорировать broadcast threshold. Если справа 11 МБ, а threshold 10 — Spark сделает SortMergeJoin (медленно). Поднять threshold или явно broadcast(df).

Не запускать ANALYZE TABLE. CBO без статистики работает с дефолтами и часто выбирает плохой плана.

Огромный spark.sql.shuffle.partitions без AQE. 4000 партиций × несколько килобайт каждая = катастрофа. С AQE — coalesces автоматически.

Считать explain ленивым. explain() сам по себе не запускает action. Чтобы посмотреть фактическое поведение — Spark UI после show()/write().

Полагаться на CBO без actual stats. Без noscan правильного типа CBO считает только row count. Для full benefit — ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS ....

Использовать repartition бездумно. repartition вызывает shuffle. Если просто хочешь меньше партиций — coalesce (без shuffle, но без перебалансировки).

Тоэн-вью CBO как панацея. Spark CBO пока хуже Snowflake / BigQuery. Часто прямые подсказки (broadcast, repartition) лучше автоматики.

Связанные темы

FAQ

Catalyst оптимизирует Python UDF?

Нет — UDF чёрный ящик. Catalyst push'ает фильтры до UDF, но не оптимизирует внутри.

AQE замедляет первый запуск?

Может — добавляет небольшой overhead на runtime planning. Но обычно benefit от skew/coalesce/broadcast switch перевешивает.

Когда нужен broadcast(df) явно?

Если auto-broadcast threshold не сработал (например, размер таблицы не оценён точно), но известно, что она маленькая. Hint: import org.apache.spark.sql.functions.broadcast; df1.join(broadcast(df2), ...).

Что такое Dynamic Partition Pruning (DPP)?

Spark 3+ — runtime определение, какие партиции читать в fact таблице на основании результата filter в dim таблице. Огромный буст в star schema.

Catalyst работает только в DataFrame?

Да. RDD-API не оптимизируется. Дополнительная мотивация перейти на DataFrame.

Это официальная информация?

Нет. Статья основана на документации Spark 3.x, статьях Databricks по AQE и codegen.


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.