Spark broadcast joins на собеседовании Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем спрашивают на собесе DE
Broadcast join — главный приём ускорения Spark joins. На собесе DE: «когда broadcast», «threshold», «как заставить Spark broadcast».
Broadcast Hash Join
Идея. Маленькая таблица (B) копируется (broadcast'ится) на каждый executor. Каждый executor джойнит свой кусок большой таблицы (A) с локальной копией B.
Преимущество. Нет shuffle большой таблицы. Один shuffle (broadcast = O(N executors × |B|)) вместо двух.
A (huge) ─── stays on executors
B (small) ── copied to all executors
↓
each executor: A.partition ⨝ BВ EXPLAIN: BroadcastHashJoin + BroadcastExchange.
Auto-broadcast threshold
spark.sql.autoBroadcastJoinThreshold = 10MB (по умолчанию)Если Spark оценил, что одна сторона ≤ threshold — auto-broadcast.
Поднять для больших dim-таблиц:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 200 * 1024 * 1024) # 200MBВнимание. Spark считает size in-memory (после декомпрессии и десериализации). 100MB Parquet в памяти — может быть 500MB. Threshold должен учитывать.
Disable auto-broadcast:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)Иногда нужно если оценка неверна.
Broadcast hint
Если оценка размера неверна / threshold не подошёл — явный hint.
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")В Spark SQL:
SELECT /*+ BROADCAST(b) */ ...
FROM a JOIN b ON a.key = b.key;Spark гарантированно сделает broadcast (если возможно).
Когда нет смысла
Маленькая сторона на самом деле большая. Broadcast 1GB — driver OOM при сборке + network overhead. Threshold ~ 100-500MB зависит от cluster.
Right outer / full outer. Семантика broadcast не позволяет делать full outer (нужны строки с обеих сторон).
# Right outer + broadcast(left) — не работает
# Spark fallback на SortMergeJoinMany joins подряд. Broadcast одной маленькой dim — OK. Broadcast 5 dim последовательно — память накапливается на executor.
Skewed key. Если в большой таблице 95% строк имеют один ключ — broadcast не помогает, только AQE skew handling.
AQE и dynamic broadcast
С Spark 3 + AQE (Adaptive Query Execution) — Spark может runtime переключиться на broadcast после shuffle, если оценил, что одна сторона теперь маленькая.
spark.sql.adaptive.enabled = true
spark.sql.adaptive.localShuffleReader.enabled = trueНапример, после фильтрации одна сторона стала 5MB → AQE заменит SortMergeJoin → BroadcastHashJoin.
Частые ошибки
Broadcast большой таблицы. Driver OOM при сборе на драйвер перед отправкой.
Hint без понимания. broadcast() не работает чудом — если реально 5GB, упадёт.
Игнорировать stats. Spark оценивает по ANALYZE TABLE ... COMPUTE STATISTICS. Без stats — оценка через row count + бритая avg.
Broadcast обеих сторон. Бессмысленно — фор какой-то одной side. Spark выберет автоматически.
Не учитывать сериализацию. Java serializer медленнее. Kryo быстрее, но не дефолт.
Связанные темы
- Spark RDD vs DataFrame для DE
- Spark Catalyst и AQE для DE
- Spark shuffle и skew на собесе DE
- Anti и semi joins для DE
- Подготовка к собесу Data Engineer
FAQ
Сколько data можно broadcast?
Driver memory должна вместить + executor memory тоже. На обычной 8GB executor — broadcast до 1-2GB ок. Лучше < 500MB.
Это официальная информация?
Нет. Статья основана на документации Spark 3.x.
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.