Spark shuffle и skew на собеседовании Data Engineer

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Что такое shuffle

Shuffle — перераспределение данных между worker-ами Spark по новому ключу. Это самая дорогая операция в Spark. Происходит, когда данные на разных узлах нужно собрать вместе по какому-то ключу.

Этапы shuffle:

  1. Map side: каждый task по входной партиции хеширует ключи, пишет данные на локальный диск, разделённые по target-партициям
  2. Network: target task запрашивает свои данные с других executors → сетевая передача
  3. Reduce side: target task читает свои данные, объединяет, сортирует (если нужно)

Стоимость shuffle:

  • Запись на диск (worker'ы пишут результаты map-стадии локально)
  • Сериализация/десериализация
  • Сетевая передача (обычно бутылочное горлышко)
  • Чтение с диска
  • Memory pressure → spill в диск

На реальных задачах 70–90% времени Spark-job — shuffle. Оптимизация = минимизация shuffle.

Когда происходит shuffle

Гарантированный shuffle:

  • groupByKey / groupBy (DataFrame API)
  • reduceByKey (но менее болезненный, так как агрегация на map-side)
  • join без broadcast
  • repartition(N)
  • distinct
  • orderBy / sort
  • Window functions с partitionBy

Без shuffle (narrow transformation):

  • select, filter, withColumn
  • map, flatMap, mapPartitions
  • union
  • coalesce(N) (если уменьшаем партиции — без shuffle)

Идея оптимизации: объединять трансформации до shuffle, фильтровать как можно раньше.

# плохо: shuffle до фильтра
df.repartition(200, 'user_id').filter('event_date = ...')

# хорошо
df.filter('event_date = ...').repartition(200, 'user_id')

Skew и его признаки

Skew — неравномерное распределение данных по партициям после shuffle. Один partition содержит 90% данных, остальные простаивают.

Откуда берётся: распределение бизнес-данных не равномерно. Один user_id = 10М событий (бот, технический аккаунт), остальные — десятки. После hash-партиционирования по user_id один partition получает все эти 10М, остальные — небольшие пачки.

Признаки skew:

  • В Spark UI вкладка Stages: один task длится 10 минут, остальные секунды — это skew
  • В Stage details — Shuffle Read Size: одна партиция 50 ГБ, остальные 100 МБ
  • Memory pressure / OOM на одном executor
  • Job не масштабируется при добавлении executors

Самый частый кейс на собесе: «Spark JOIN двух таблиц, одна сторона небольшая, всё равно медленно. Почему?» Ответ: «Возможно skew по join-ключу. Один ключ имеет миллионы строк в большой таблице, после shuffle он попадает в один partition, и одна задача делает всю работу».

Salting для решения skew

Salt — добавление случайного хвоста к ключу для равномерного распределения.

from pyspark.sql.functions import concat, lit, rand, floor, col

# до salting: skew по user_id
joined = events.join(users, 'user_id')

# salting
N_SALTS = 10
events_salted = events.withColumn(
    'user_salt', concat(col('user_id'), lit('_'), floor(rand() * N_SALTS))
)
# users надо «продублировать» под все salt-варианты
import pyspark.sql.functions as F
salts = spark.range(N_SALTS).withColumnRenamed('id', 'salt_n')
users_salted = users.crossJoin(salts).withColumn(
    'user_salt', concat(col('user_id'), lit('_'), col('salt_n'))
)
joined = events_salted.join(users_salted, 'user_salt')

Идея: «горячий» ключ распределяется на N партиций. Маленькая сторона дублируется ×N. После join результат корректен, нагрузка распределена.

Цена: маленькая сторона раздувается ×N. Если N=10 и users — 1 ГБ, после salting — 10 ГБ. Но JOIN параллелизуется.

Адаптивный salting: salt только для горячих ключей. Сначала найти топ-100 ключей по count, для них salt применить, для остальных — обычный JOIN, потом UNION.

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

AQE: автоматический skew handling

Adaptive Query Execution (Spark 3.0+) — оптимизации на лету. Главные фичи для shuffle:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Skew join handling: AQE определяет skewed partitions (по медиане размера), разбивает их на меньшие, дублирует «маленькую» сторону JOIN — то же самое, что ручной salting, но автоматом.

Coalesce shuffle partitions: AQE мерджит маленькие партиции после shuffle, чтобы не запускать таски для партиций по 1 МБ.

Dynamic broadcast join: если после filter одна сторона JOIN оказалась маленькой — AQE меняет SortMergeJoin на BroadcastHashJoin.

С AQE большинство skew-проблем решается без ручного salting. Но AQE — не панацея: на сильном skew (один ключ — терабайт) AQE не справится.

Broadcast joins

BroadcastHashJoin — мелкая сторона рассылается на все workers и хранится в памяти. JOIN без shuffle.

from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), on='user_id')

Когда применять:

  • Маленькая сторона помещается в memory (<= 100 МБ — комфортно, до 1 ГБ — с настройкой)
  • Часто join factов с dimensions

Порог автоматического broadcast:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")  # дефолт 10MB

Минусы:

  • Обе стороны должны влезть на каждый executor
  • Дороже на старте (рассылка), но один раз

Не использовать на:

  • Очень большие таблицы (driver OOM)
  • Outer join с большой стороной NULL (broadcast не помогает)

Частые ошибки

coalesce(1) для записи одного файла на финале. Принудительно сводит весь датафрейм на один task. Если данных много — OOM или часы. Лучше coalesce(N) с разумным N или partition-write с post-processing.

repartition с дефолтным числом. repartition(col) использует spark.sql.shuffle.partitions (200). Если данные после shuffle — 100 ГБ, 200 партиций по 500 МБ — мало для параллелизма.

Игнорировать AQE. В Spark 3+ AQE даёт +20–50% к скорости почти бесплатно. Включать.

Salting без понимания. N=2 на сильном skew не поможет. N=100 — раздует маленькую сторону в 100 раз. Подбирать по размеру горячих ключей.

Hash JOIN на disk-spilled данных. Если shuffle памяти не хватает, спарк проливается в диск. Если skew → несколько ГБ spill на одном executor → аварийно медленно. Решать через salting или увеличение spark.executor.memory.

Filter после JOIN. df1.join(df2).filter(...) — JOIN всю кашу, потом фильтрует. Filter как можно раньше.

Игнорировать Spark UI. Skew, OOM, slow stages — всё видно в UI. Без анализа UI оптимизация — пальцем в небо.

Связанные темы

FAQ

Сколько shuffle partitions ставить?

Дефолт 200 редко оптимален. Эвристика: total_shuffle_size / 128MB. На 100ГБ shuffle — 800 партиций. С AQE coalesce можно ставить с запасом — Spark смерджит маленькие сам.

repartition или coalesce?

repartition(N) — shuffle, может увеличить и уменьшить партиции. coalesce(N) — без shuffle, только уменьшение, дешевле, но без перераспределения. Для уменьшения файла — coalesce. Для распределения skew — repartition.

Что такое spill?

Когда shuffle-данные не помещаются в памяти, Spark пишет их во временный файл на диск. Видно в UI как «Spill (Memory)» / «Spill (Disk)». Сильный spill — признак нехватки памяти.

Можно ли отключить shuffle полностью?

Нет, если задача требует группировки/JOIN. Можно использовать broadcast (без shuffle) или предварительно репартиционировать данные при write — следующее чтение будет с правильной партицией без shuffle.

Bucket join — это что?

Если две таблицы записаны с одинаковым bucketing по ключу, JOIN не требует shuffle. Реализуется через df.write.bucketBy(N, 'user_id'). Реже используется на S3, чаще в Hive.

Это официальная информация?

Нет. Статья основана на документации Apache Spark 3.x, докладах Databricks и опыте практик. Конкретные оптимизации зависят от версий и стека.


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.