Apache Spark на собеседовании Data Engineer
Содержание:
Что спрашивают
Spark на собесе DE — это про понимание того, что происходит под капотом. Запускать df.groupBy().count() умеет любой джун. Объяснить, почему этот запрос выполняется 30 минут на 1 ТБ данных — задача middle+.
Базовые темы:
- RDD vs DataFrame vs Dataset
- Lazy evaluation, transformations vs actions
- Партиционирование данных и shuffle
- Catalyst optimizer и Adaptive Query Execution (AQE)
- Joins: broadcast, shuffle, sort-merge
- Caching, persist стратегии
- UDF и их производительность
RDD vs DataFrame
RDD — низкоуровневый API. Distributed collection, lazy. Программист сам выбирает, как сериализовать, как распределять, какие операции применять.
DataFrame — высокоуровневый API на основе RDD. Знает schema, оптимизируется через Catalyst, использует columnar формат для in-memory.
Dataset (Scala/Java) — типизированный DataFrame. В PySpark — нет, есть только DataFrame.
На собесе спросят: «Когда использовать RDD?» Ответ:
- Кастомные преобразования, которые нельзя выразить в DataFrame API
- Тонкое управление сериализацией
- Legacy-код
В 99% случаев DataFrame быстрее, потому что Catalyst применяет оптимизации (predicate pushdown, projection pruning, реупорядочивание JOIN).
Shuffle, партиции, skew
Shuffle — перераспределение данных между worker-ами. Происходит при groupBy, join, repartition, distinct. Это самая дорогая операция в Spark.
Почему shuffle медленный:
- Запись на диск на каждом worker
- Передача по сети между worker-ами
- Чтение с диска на принимающей стороне
Partitions — единицы параллелизма. Один partition = одна задача. Дефолт — 200 (spark.sql.shuffle.partitions).
Skew — неравномерное распределение данных по partitions. Один partition = 90% данных, остальные простаивают. Признаки:
- Один таск выполняется на порядки дольше остальных
- В Spark UI видны несбалансированные shuffle reads
Лечится:
- Repartition по другому ключу — если skew по join key, добавить salt
- AQE skew join handling — Spark 3+ автоматически разбивает большой partition
- Broadcast одной из сторон, если она маленькая
Salting (добавление случайного хвоста к ключу для равномерного распределения):
from pyspark.sql.functions import concat, lit, rand, floor
df_salted = df.withColumn('user_salted', concat(df.user_id, lit('_'), floor(rand() * 10)))Catalyst и AQE
Catalyst — оптимизатор запросов в Spark SQL. Превращает DataFrame DAG в физический план через серию правил:
- Logical plan analysis (resolve column names)
- Logical plan optimization (predicate pushdown, projection pruning)
- Physical planning (выбор оператора: BroadcastJoin vs SortMergeJoin)
- Code generation
На собесе спросят: «Как посмотреть план запроса?» Ответ: df.explain(True) или df.explain(mode='extended').
AQE (Adaptive Query Execution) — runtime-оптимизации в Spark 3+. Включается:
spark.conf.set("spark.sql.adaptive.enabled", "true")Что делает:
- Динамически меняет shuffle partitions после фактических данных
- Преобразует SortMergeJoin в BroadcastJoin, если одна сторона оказалась маленькой
- Разбивает skewed partitions
Joins: broadcast vs shuffle
SortMergeJoin (default для больших данных) — сортирует обе стороны по ключу, потом мержит. Дорогой, требует shuffle.
BroadcastHashJoin — мелкая сторона рассылается на все worker-ы и хранится в памяти. Быстрый, но требует чтобы маленькая таблица помещалась в память.
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), on='user_id')Порог автоматического broadcast — spark.sql.autoBroadcastJoinThreshold (дефолт 10 МБ).
ShuffleHashJoin — старый вариант, редко используется в Spark 3+.
На собесе типичный кейс: «Join 1 ТБ events и 100 МБ users. Какой join выберется?» Ответ: BroadcastHashJoin, потому что users — 100 МБ, помещается в дефолтный порог.
Оптимизация Spark-job
Топ-5 проверок при медленной job:
1. Партиционирование
Слишком много мелких partitions = overhead на запуск тасков. Слишком мало = недостаточный параллелизм. Норма — 100 МБ–1 ГБ на partition. Регулируется repartition(N) или coalesce(N).
2. Predicate pushdown
Фильтрация должна быть как можно раньше. Если используете Parquet — фильтр по партиции работает на уровне чтения. Проверьте df.explain — должно быть PushedFilters: [...].
3. Кеширование
Если DataFrame используется 2+ раза — df.cache() или df.persist(StorageLevel.MEMORY_AND_DISK). Без cache Spark пересчитывает весь DAG каждый раз.
4. UDF производительность
Python UDF в PySpark — медленные, потому что данные сериализуются между JVM и Python. Альтернативы:
- Использовать встроенные функции
pyspark.sql.functions - Использовать pandas UDF (быстрее обычного Python UDF в 10-100 раз)
- Писать UDF на Scala и использовать через PySpark
5. Skew
См. раздел про shuffle. AQE решает большинство кейсов автоматически.
Частые ошибки
Использовать collect() на больших данных. collect() тянет весь датафрейм на driver. На 1 ТБ данных driver упадёт. Альтернатива — take(N) или write в файл.
Игнорировать cache() для повторно используемых DF. Без кеша Spark пересчитывает с самого начала.
Не различать transformations и actions. Lazy evaluation — это не магия. До action ничего не выполняется. Если count() идёт долго — это первое выполнение полного DAG, не баг.
Использовать большие Python UDF. Pandas UDF или нативные функции почти всегда быстрее.
Запускать Spark на маленьких данных. Для < 10 ГБ часто быстрее обычный pandas. Spark оправдан с миллионами строк.
FAQ
PySpark vs Scala Spark — что учить?
PySpark — стандарт в РФ. Scala — для глубокой оптимизации UDF и legacy-кодов.
Spark vs Trino/Presto — когда что?
Spark — для batch-обработки и transformations. Trino/Presto — для интерактивных запросов на больших данных. Не альтернативы, а разные инструменты.
Сколько Spark-практики нужно для собеса?
Минимум 2-3 проекта с реальной обработкой данных. На собесе спросят, как дебажили медленную job.
Это официальная информация?
Нет. Статья основана на публичных источниках и опыте кандидатов.