Spark broadcast joins на собеседовании Data Engineer

Прокачай SQL для собеса
500+ задач по SQL: оконные функции, JOIN, CTE — с разбором каждой
Тренировать SQL в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Зачем спрашивают на собесе DE

Broadcast join — главный приём ускорения Spark joins. На собесе DE: «когда broadcast», «threshold», «как заставить Spark broadcast».

Broadcast Hash Join

Идея. Маленькая таблица (B) копируется (broadcast'ится) на каждый executor. Каждый executor джойнит свой кусок большой таблицы (A) с локальной копией B.

Преимущество. Нет shuffle большой таблицы. Один shuffle (broadcast = O(N executors × |B|)) вместо двух.

A (huge) ─── stays on executors
B (small) ── copied to all executors
                  ↓
         each executor: A.partition ⨝ B

В EXPLAIN: BroadcastHashJoin + BroadcastExchange.

Auto-broadcast threshold

spark.sql.autoBroadcastJoinThreshold = 10MB  (по умолчанию)

Если Spark оценил, что одна сторона ≤ threshold — auto-broadcast.

Поднять для больших dim-таблиц:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 200 * 1024 * 1024)  # 200MB

Внимание. Spark считает size in-memory (после декомпрессии и десериализации). 100MB Parquet в памяти — может быть 500MB. Threshold должен учитывать.

Disable auto-broadcast:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Иногда нужно если оценка неверна.

Broadcast hint

Если оценка размера неверна / threshold не подошёл — явный hint.

from pyspark.sql.functions import broadcast

result = large_df.join(broadcast(small_df), "key")

В Spark SQL:

SELECT /*+ BROADCAST(b) */ ...
FROM a JOIN b ON a.key = b.key;

Spark гарантированно сделает broadcast (если возможно).

Прокачай SQL для собеса
500+ задач по SQL: оконные функции, JOIN, CTE — с разбором каждой
Тренировать SQL в Telegram

Когда нет смысла

Маленькая сторона на самом деле большая. Broadcast 1GB — driver OOM при сборке + network overhead. Threshold ~ 100-500MB зависит от cluster.

Right outer / full outer. Семантика broadcast не позволяет делать full outer (нужны строки с обеих сторон).

# Right outer + broadcast(left) — не работает
# Spark fallback на SortMergeJoin

Many joins подряд. Broadcast одной маленькой dim — OK. Broadcast 5 dim последовательно — память накапливается на executor.

Skewed key. Если в большой таблице 95% строк имеют один ключ — broadcast не помогает, только AQE skew handling.

AQE и dynamic broadcast

С Spark 3 + AQE (Adaptive Query Execution) — Spark может runtime переключиться на broadcast после shuffle, если оценил, что одна сторона теперь маленькая.

spark.sql.adaptive.enabled = true
spark.sql.adaptive.localShuffleReader.enabled = true

Например, после фильтрации одна сторона стала 5MB → AQE заменит SortMergeJoin → BroadcastHashJoin.

Частые ошибки

Broadcast большой таблицы. Driver OOM при сборе на драйвер перед отправкой.

Hint без понимания. broadcast() не работает чудом — если реально 5GB, упадёт.

Игнорировать stats. Spark оценивает по ANALYZE TABLE ... COMPUTE STATISTICS. Без stats — оценка через row count + бритая avg.

Broadcast обеих сторон. Бессмысленно — фор какой-то одной side. Spark выберет автоматически.

Не учитывать сериализацию. Java serializer медленнее. Kryo быстрее, но не дефолт.

Связанные темы

FAQ

Сколько data можно broadcast?

Driver memory должна вместить + executor memory тоже. На обычной 8GB executor — broadcast до 1-2GB ок. Лучше < 500MB.

Это официальная информация?

Нет. Статья основана на документации Spark 3.x.


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.