asyncio vs threading vs multiprocessing для Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем спрашивают на собесе DE
Производительность пайплайна часто = правильный выбор concurrency-модели. На собесе DE: «как параллелить выгрузку из 100 API», «зачем asyncio», «когда multiprocessing». Senior — нюансы GIL, free-threaded Python (3.13t), shared memory.
Главная боль без понимания — DE взял multiprocessing.Pool для запросов к API (IO-bound), получил overhead от форка процессов и не выиграл по скорости. Или наоборот — попытался ускорить numpy через threading и встал в GIL.
GIL: главное ограничение Python
GIL (Global Interpreter Lock) — глобальная блокировка в CPython. Только один поток выполняет Python-байткод одновременно. Не зависит от числа ядер.
Что это значит:
- CPU-bound код в threading ≠ ускорение, потому что ядра простаивают.
- IO-bound код в threading ≈ ускорение, потому что во время ожидания GIL отпускается.
- C-extensions (NumPy, asyncio.sleep) часто отпускают GIL.
Python 3.13t (PEP 703) — экспериментальный free-threaded build без GIL. Доступен с 2024, в продакшене пока редкость.
threading: для IO-bound
Поток (threading.Thread) — лёгкий, разделяет память с остальными. Идеален для блокирующих IO-операций.
import threading
def fetch(url):
response = requests.get(url)
process(response)
threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads:
t.start()
for t in threads:
t.join()ThreadPoolExecutor — проще:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=10) as ex:
results = list(ex.map(fetch, urls))Когда:
- HTTP-запросы (requests, urllib).
- Чтение/запись файлов.
- Запросы к БД через драйвер с blocking IO.
- 10-100 параллельных IO-операций.
Когда нет:
- CPU-bound (math, parsing) — упрётся в GIL.
- Тысячи параллельных задач — overhead на поток (~8 МБ stack).
multiprocessing: для CPU-bound
Каждый процесс — отдельный Python-интерпретатор с отдельной памятью и отдельным GIL. Параллелизм по ядрам реальный.
from concurrent.futures import ProcessPoolExecutor
def compute_heavy(x):
# CPU-bound вычисление
return sum(i*i for i in range(x))
with ProcessPoolExecutor(max_workers=8) as ex:
results = list(ex.map(compute_heavy, inputs))Особенности:
- Передача данных = pickle. Большие объекты дорого.
- Старт процесса медленный (фунт ms), особенно на macOS / Win (spawn).
- Shared memory через
multiprocessing.shared_memory,Value,Array,Manager. - Логирование сложнее (один файл — race conditions, использовать QueueHandler).
Когда:
- Pandas, numpy без GPU и без векторизации.
- Парсинг файлов CPU-intensive.
- Сжатие/расжатие.
- ML-инференс на CPU.
Когда нет:
- IO-bound — overhead процессов перевешивает.
- Маленькие задачи — overhead больше работы.
- Огромные данные между процессами — pickle становится bottleneck.
asyncio: для massive IO
Один поток, кооперативная многозадачность через await. Каждая корутина уступает контроль на IO.
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as r:
return await r.text()
async def main(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, u) for u in urls]
return await asyncio.gather(*tasks)
results = asyncio.run(main(urls))Свойства:
- Тысячи concurrent IO-операций — нормально.
- Один поток → нет переключений контекста OS, лёгкость.
- Требуется asyncio-aware библиотеки (aiohttp, asyncpg, aiokafka).
- Smешивание blocking-кода с asyncio = блокировка event loop.
Когда:
- 1000+ HTTP-запросов одновременно (parser сайтов, API-агрегатор).
- WebSocket / streaming.
- Высокая нагрузка с маленькими CPU-затратами.
Когда нет:
- CPU-bound — упрётся в один поток + GIL.
- Нет async-драйвера для БД/брокера — блокирует event loop.
Сравнительная таблица
| Свойство | threading | multiprocessing | asyncio |
|---|---|---|---|
| Ускоряет CPU-bound | Нет (GIL) | Да | Нет |
| Ускоряет IO-bound | Да | Да (но overhead) | Да (масштабнее) |
| Память | Общая | Изолированная | Общая |
| Стоимость задачи | ~8 МБ | ~ десятки MB | ~ КБ |
| Overhead на старт | низкий | высокий | низкий |
| Сложность кода | средняя | средняя | высокая |
| Отладка | средняя | сложная | сложная |
| Подходит для | 10-100 IO | CPU-задачи | 1000+ IO |
Применение в ETL
Сценарий 1: выгрузка из 50 API-эндпоинтов.
50 запросов IO-bound → ThreadPoolExecutor(50) или asyncio. Threading проще, asyncio быстрее на больших объёмах.
Сценарий 2: parsing 100 GB JSON-логов.
CPU-bound (json.loads тяжёлый) → multiprocessing на N ядер. Делим файлы между процессами, каждый процесс читает свой кусок.
Сценарий 3: load 1М сообщений в Postgres.
IO-bound (network + БД) → одноточечная вставка через bulk executemany или COPY. Параллелить почти бессмысленно — БД сама — bottleneck.
Сценарий 4: HTTP-скрейпер с rate-limit.
IO-bound с лимитами → asyncio + Semaphore. Контроль concurrency точечный.
Сценарий 5: Spark или Airflow.
Не тащи threading внутрь Spark-таска (он сам параллелится). В Airflow — ParallelTask, не self-multiprocessing.
Частые ошибки
multiprocessing для IO. Тяжёлый старт процессов перевешивает выгоду. Threading или asyncio быстрее.
threading для CPU. GIL съедает. Сравни — time будет почти одинаковый, чем больше потоков.
Mixing blocking + asyncio. time.sleep(1) внутри async-функции блокирует весь event loop. Используй await asyncio.sleep(1).
asyncio через requests. requests — синхронный. Замени на aiohttp / httpx async. Иначе loop встаёт.
Игнорировать if __name__ == '__main__'. На Windows и macOS multiprocessing требует main guard. Без него — бесконечный fork.
Не закрывать pool / executor. Использовать with или явно pool.close(); pool.join().
Сериализация через pickle. Объект должен быть picklable. Лямбды, локальные функции — нет. Используй модульные функции.
Не учитывать DB connection pool. На threading 50 потоков → 50 connections в Postgres → исчерпание max_connections. Используй pgbouncer или ограничь pool size.
asyncio для CPU-heavy. await на CPU-задаче не помогает. Вынеси в loop.run_in_executor (на ThreadPool / ProcessPool).
Связанные темы
- Генераторы и итераторы Python для DE
- Bash и Unix для Data Engineer
- Idempotentnost пайплайна для DE
- Airflow на собесе DE
- Подготовка к собесу Data Engineer
FAQ
Когда asyncio лучше threading?
При 1000+ одновременных IO. Threading на таком объёме упирается в overhead на поток (RAM, переключения OS). asyncio в одном потоке держит десятки тысяч корутин.
Может ли GIL мешать numpy?
Numpy большую часть времени работает в C-коде с отпущенным GIL. Поэтому threading с numpy часто работает хорошо. Чистый Python-код в numpy (например, apply) — упирается в GIL.
Что такое concurrent.futures?
Унифицированный API над threading и multiprocessing. ThreadPoolExecutor / ProcessPoolExecutor дают одинаковый интерфейс — легко менять модель.
asyncio.gather vs asyncio.as_completed?
gather — ждать всех, возвращает список в порядке аргументов. as_completed — yield-ит по мере готовности. Второе удобно, когда хочется обрабатывать первые ответы, не дожидаясь самых медленных.
Как ограничить число одновременных корутин?
asyncio.Semaphore(N) + async with sem: внутри корутины. Стандартный паттерн для rate-limit.
Это официальная информация?
Нет. Статья основана на документации Python 3.12 (asyncio, threading, multiprocessing) и PEP 703 (free-threaded).
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.