Spark shuffle и skew на собеседовании Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Что такое shuffle
Shuffle — перераспределение данных между worker-ами Spark по новому ключу. Это самая дорогая операция в Spark. Происходит, когда данные на разных узлах нужно собрать вместе по какому-то ключу.
Этапы shuffle:
- Map side: каждый task по входной партиции хеширует ключи, пишет данные на локальный диск, разделённые по target-партициям
- Network: target task запрашивает свои данные с других executors → сетевая передача
- Reduce side: target task читает свои данные, объединяет, сортирует (если нужно)
Стоимость shuffle:
- Запись на диск (worker'ы пишут результаты map-стадии локально)
- Сериализация/десериализация
- Сетевая передача (обычно бутылочное горлышко)
- Чтение с диска
- Memory pressure → spill в диск
На реальных задачах 70–90% времени Spark-job — shuffle. Оптимизация = минимизация shuffle.
Когда происходит shuffle
Гарантированный shuffle:
groupByKey/groupBy(DataFrame API)reduceByKey(но менее болезненный, так как агрегация на map-side)joinбез broadcastrepartition(N)distinctorderBy/sort- Window functions с
partitionBy
Без shuffle (narrow transformation):
select,filter,withColumnmap,flatMap,mapPartitionsunioncoalesce(N)(если уменьшаем партиции — без shuffle)
Идея оптимизации: объединять трансформации до shuffle, фильтровать как можно раньше.
# плохо: shuffle до фильтра
df.repartition(200, 'user_id').filter('event_date = ...')
# хорошо
df.filter('event_date = ...').repartition(200, 'user_id')Skew и его признаки
Skew — неравномерное распределение данных по партициям после shuffle. Один partition содержит 90% данных, остальные простаивают.
Откуда берётся: распределение бизнес-данных не равномерно. Один user_id = 10М событий (бот, технический аккаунт), остальные — десятки. После hash-партиционирования по user_id один partition получает все эти 10М, остальные — небольшие пачки.
Признаки skew:
- В Spark UI вкладка Stages: один task длится 10 минут, остальные секунды — это skew
- В Stage details —
Shuffle Read Size: одна партиция 50 ГБ, остальные 100 МБ - Memory pressure / OOM на одном executor
- Job не масштабируется при добавлении executors
Самый частый кейс на собесе: «Spark JOIN двух таблиц, одна сторона небольшая, всё равно медленно. Почему?» Ответ: «Возможно skew по join-ключу. Один ключ имеет миллионы строк в большой таблице, после shuffle он попадает в один partition, и одна задача делает всю работу».
Salting для решения skew
Salt — добавление случайного хвоста к ключу для равномерного распределения.
from pyspark.sql.functions import concat, lit, rand, floor, col
# до salting: skew по user_id
joined = events.join(users, 'user_id')
# salting
N_SALTS = 10
events_salted = events.withColumn(
'user_salt', concat(col('user_id'), lit('_'), floor(rand() * N_SALTS))
)
# users надо «продублировать» под все salt-варианты
import pyspark.sql.functions as F
salts = spark.range(N_SALTS).withColumnRenamed('id', 'salt_n')
users_salted = users.crossJoin(salts).withColumn(
'user_salt', concat(col('user_id'), lit('_'), col('salt_n'))
)
joined = events_salted.join(users_salted, 'user_salt')Идея: «горячий» ключ распределяется на N партиций. Маленькая сторона дублируется ×N. После join результат корректен, нагрузка распределена.
Цена: маленькая сторона раздувается ×N. Если N=10 и users — 1 ГБ, после salting — 10 ГБ. Но JOIN параллелизуется.
Адаптивный salting: salt только для горячих ключей. Сначала найти топ-100 ключей по count, для них salt применить, для остальных — обычный JOIN, потом UNION.
AQE: автоматический skew handling
Adaptive Query Execution (Spark 3.0+) — оптимизации на лету. Главные фичи для shuffle:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")Skew join handling: AQE определяет skewed partitions (по медиане размера), разбивает их на меньшие, дублирует «маленькую» сторону JOIN — то же самое, что ручной salting, но автоматом.
Coalesce shuffle partitions: AQE мерджит маленькие партиции после shuffle, чтобы не запускать таски для партиций по 1 МБ.
Dynamic broadcast join: если после filter одна сторона JOIN оказалась маленькой — AQE меняет SortMergeJoin на BroadcastHashJoin.
С AQE большинство skew-проблем решается без ручного salting. Но AQE — не панацея: на сильном skew (один ключ — терабайт) AQE не справится.
Broadcast joins
BroadcastHashJoin — мелкая сторона рассылается на все workers и хранится в памяти. JOIN без shuffle.
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), on='user_id')Когда применять:
- Маленькая сторона помещается в memory (<= 100 МБ — комфортно, до 1 ГБ — с настройкой)
- Часто join factов с dimensions
Порог автоматического broadcast:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m") # дефолт 10MBМинусы:
- Обе стороны должны влезть на каждый executor
- Дороже на старте (рассылка), но один раз
Не использовать на:
- Очень большие таблицы (driver OOM)
- Outer join с большой стороной NULL (broadcast не помогает)
Частые ошибки
coalesce(1) для записи одного файла на финале. Принудительно сводит весь датафрейм на один task. Если данных много — OOM или часы. Лучше coalesce(N) с разумным N или partition-write с post-processing.
repartition с дефолтным числом. repartition(col) использует spark.sql.shuffle.partitions (200). Если данные после shuffle — 100 ГБ, 200 партиций по 500 МБ — мало для параллелизма.
Игнорировать AQE. В Spark 3+ AQE даёт +20–50% к скорости почти бесплатно. Включать.
Salting без понимания. N=2 на сильном skew не поможет. N=100 — раздует маленькую сторону в 100 раз. Подбирать по размеру горячих ключей.
Hash JOIN на disk-spilled данных. Если shuffle памяти не хватает, спарк проливается в диск. Если skew → несколько ГБ spill на одном executor → аварийно медленно. Решать через salting или увеличение spark.executor.memory.
Filter после JOIN. df1.join(df2).filter(...) — JOIN всю кашу, потом фильтрует. Filter как можно раньше.
Игнорировать Spark UI. Skew, OOM, slow stages — всё видно в UI. Без анализа UI оптимизация — пальцем в небо.
Связанные темы
- Spark на собеседовании DE
- Партиционирование в ClickHouse
- Подготовка к собесу Data Engineer
- Idempotency пайплайна для DE
- Parquet vs ORC vs Avro для DE
FAQ
Сколько shuffle partitions ставить?
Дефолт 200 редко оптимален. Эвристика: total_shuffle_size / 128MB. На 100ГБ shuffle — 800 партиций. С AQE coalesce можно ставить с запасом — Spark смерджит маленькие сам.
repartition или coalesce?
repartition(N) — shuffle, может увеличить и уменьшить партиции. coalesce(N) — без shuffle, только уменьшение, дешевле, но без перераспределения. Для уменьшения файла — coalesce. Для распределения skew — repartition.
Что такое spill?
Когда shuffle-данные не помещаются в памяти, Spark пишет их во временный файл на диск. Видно в UI как «Spill (Memory)» / «Spill (Disk)». Сильный spill — признак нехватки памяти.
Можно ли отключить shuffle полностью?
Нет, если задача требует группировки/JOIN. Можно использовать broadcast (без shuffle) или предварительно репартиционировать данные при write — следующее чтение будет с правильной партицией без shuffle.
Bucket join — это что?
Если две таблицы записаны с одинаковым bucketing по ключу, JOIN не требует shuffle. Реализуется через df.write.bucketBy(N, 'user_id'). Реже используется на S3, чаще в Hive.
Это официальная информация?
Нет. Статья основана на документации Apache Spark 3.x, докладах Databricks и опыте практик. Конкретные оптимизации зависят от версий и стека.
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.