Spark Catalyst и AQE на собеседовании Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем спрашивают на собесе DE
Catalyst — мозг Spark SQL. AQE (Adaptive Query Execution) — главное улучшение Spark 3.x. На собесе DE: «как Catalyst оптимизирует JOIN», «зачем AQE», «что такое skew join optimization». Senior — нюансы physical plans, broadcast joins, codegen.
Главная боль без понимания — DE отключил AQE из-за непонимания, не получил скорость на skewed данных и думает, что Spark тормозит.
Catalyst: фазы оптимизации
Catalyst — extensible query optimizer Spark. На входе — SQL или DSL, на выходе — JVM-байткод.
Фазы:
SQL/DSL
↓
Unresolved Logical Plan ← после парсинга
↓ (analysis)
Resolved Logical Plan ← подтянули каталог: имена → колонки
↓ (logical optimization)
Optimized Logical Plan ← rule-based преобразования
↓ (physical planning)
Physical Plan ← выбор conkpetных алгоритмов (Sort vs Hash join)
↓ (code generation)
RDDs ← byte-code на JVMdf.explain(mode='extended') показывает все 4 этапа.
Logical optimization
Применяются правила преобразования (rule-based):
Predicate pushdown. Фильтр поднимается ближе к источнику.
filter(amount > 100) ↔ scan(orders) → scan(orders, push filter [amount > 100])В parquet это превращается в фильтрацию на уровне row group statistics — читаем меньше с диска.
Projection pruning. Если в SELECT нет колонки — она не читается.
SELECT user_id FROM orders → scan только user_id колонки в parquetConstant folding. 1 + 2 → 3 заранее.
Join reordering. Если возможно (cost-based) — поменять порядок join'ов для меньшего intermediate.
Predicate combination. a=1 AND a=2 упрощается / отсекается.
Subquery elimination. Простые подзапросы разворачиваются в join.
Все эти правила Spark знает «from the box» и применяет автоматически.
Physical planning
Logical plan → physical: выбираются конкретные операторы.
Joins:
BroadcastHashJoin— если одна сторона маленькая (≤spark.sql.autoBroadcastJoinThreshold, дефолт 10 МБ).SortMergeJoin— для больших равенств (=).ShuffleHashJoin— реже, для специфичных случаев.CartesianProduct— для cross join.BroadcastNestedLoopJoin— для условий, не подходящих под equi-join.
Aggregations:
HashAggregate— основной, быстрый.SortAggregate— fallback, если данные не помещаются в hash.
Cost-Based Optimizer (CBO). Использует статистику таблиц (ANALYZE TABLE ... COMPUTE STATISTICS). Выбирает join order и тип join. Включается через spark.sql.cbo.enabled=true.
Tungsten и codegen
Tungsten — оптимизация уровня памяти и CPU.
- Off-heap memory — меньше GC.
- Cache-aware computation.
- Whole-stage codegen — генерирует Java-код, объединяющий несколько операций в один цикл (без overhead на iterator pattern).
HashAggregate
└─ Project
└─ Filter
└─ ScanWhole-stage codegen → один Java-метод, где filter + project + aggregate комбинированы. Эффективность — близко к нативному коду.
В explain whole-stage выглядит как WholeStageCodegen (1) — обёртка над несколькими операторами.
AQE: что и зачем
Adaptive Query Execution — добавлено в Spark 3.0, по умолчанию on с 3.2+.
Идея. Catalyst строит план до выполнения. AQE может перепланировать план во время выполнения, опираясь на реальную статистику shuffle.
Главные оптимизации AQE:
1. Dynamic coalesce shuffle partitions. После shuffle Spark смотрит реальный размер партиций — слишком мелкие объединяет.
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64MB # дефолтБез AQE — 200 партиций (по spark.sql.shuffle.partitions), большинство мелкие → overhead.
2. Switch SortMergeJoin → BroadcastHashJoin. Если после фильтрации одна сторона стала достаточно маленькой — broadcast вместо merge.
3. Skew join optimization. Обнаруживает «горячие» партиции (skewed key с массой записей) и разбивает их на части, дублируя другую сторону.
spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MBВключение AQE:
spark.sql.adaptive.enabled = true # дефолт в 3.2+Что AQE НЕ умеет.
- Не оптимизирует план до начала выполнения (это Catalyst).
- Не помогает, если skew в исходных данных без shuffle (например, partition skew в Parquet).
Чтение плана через explain
df.explain() # physical plan
df.explain(mode='extended') # все 4 фазы
df.explain(mode='cost') # с оценками size/rows
df.explain(mode='formatted') # форматированный, удобныйКлючевые элементы:
Scan parquet [...]— чтение источника.Filter (amount > 100)— фильтр (если pushed — внутри Scan).Project [user_id, amount]— projection.Exchange hashpartitioning(user_id, 200)— shuffle.*(N) HashAggregate ...— звёздочка = whole-stage codegen группа N.BroadcastExchange— собрали маленькую сторону в broadcast.BroadcastHashJoin/SortMergeJoin— тип join.
AQE на runtime. AdaptiveSparkPlan ... оборачивает план; реальные изменения видны в Spark UI после выполнения.
Частые ошибки
Отключать AQE. Из-за слухов «AQE нестабилен» (это было в 3.0). С 3.2+ — production-ready, обычно даёт буст.
Проигнорировать broadcast threshold. Если справа 11 МБ, а threshold 10 — Spark сделает SortMergeJoin (медленно). Поднять threshold или явно broadcast(df).
Не запускать ANALYZE TABLE. CBO без статистики работает с дефолтами и часто выбирает плохой плана.
Огромный spark.sql.shuffle.partitions без AQE. 4000 партиций × несколько килобайт каждая = катастрофа. С AQE — coalesces автоматически.
Считать explain ленивым. explain() сам по себе не запускает action. Чтобы посмотреть фактическое поведение — Spark UI после show()/write().
Полагаться на CBO без actual stats. Без noscan правильного типа CBO считает только row count. Для full benefit — ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS ....
Использовать repartition бездумно. repartition вызывает shuffle. Если просто хочешь меньше партиций — coalesce (без shuffle, но без перебалансировки).
Тоэн-вью CBO как панацея. Spark CBO пока хуже Snowflake / BigQuery. Часто прямые подсказки (broadcast, repartition) лучше автоматики.
Связанные темы
- Spark RDD vs DataFrame для DE
- Spark shuffle и skew на собесе DE
- Spark на собесе DE
- EXPLAIN и план запроса для DE
- Подготовка к собесу Data Engineer
FAQ
Catalyst оптимизирует Python UDF?
Нет — UDF чёрный ящик. Catalyst push'ает фильтры до UDF, но не оптимизирует внутри.
AQE замедляет первый запуск?
Может — добавляет небольшой overhead на runtime planning. Но обычно benefit от skew/coalesce/broadcast switch перевешивает.
Когда нужен broadcast(df) явно?
Если auto-broadcast threshold не сработал (например, размер таблицы не оценён точно), но известно, что она маленькая. Hint: import org.apache.spark.sql.functions.broadcast; df1.join(broadcast(df2), ...).
Что такое Dynamic Partition Pruning (DPP)?
Spark 3+ — runtime определение, какие партиции читать в fact таблице на основании результата filter в dim таблице. Огромный буст в star schema.
Catalyst работает только в DataFrame?
Да. RDD-API не оптимизируется. Дополнительная мотивация перейти на DataFrame.
Это официальная информация?
Нет. Статья основана на документации Spark 3.x, статьях Databricks по AQE и codegen.
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.