asyncio vs threading vs multiprocessing для Data Engineer

Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.

Зачем спрашивают на собесе DE

Производительность пайплайна часто = правильный выбор concurrency-модели. На собесе DE: «как параллелить выгрузку из 100 API», «зачем asyncio», «когда multiprocessing». Senior — нюансы GIL, free-threaded Python (3.13t), shared memory.

Главная боль без понимания — DE взял multiprocessing.Pool для запросов к API (IO-bound), получил overhead от форка процессов и не выиграл по скорости. Или наоборот — попытался ускорить numpy через threading и встал в GIL.

GIL: главное ограничение Python

GIL (Global Interpreter Lock) — глобальная блокировка в CPython. Только один поток выполняет Python-байткод одновременно. Не зависит от числа ядер.

Что это значит:

  • CPU-bound код в threading ≠ ускорение, потому что ядра простаивают.
  • IO-bound код в threading ≈ ускорение, потому что во время ожидания GIL отпускается.
  • C-extensions (NumPy, asyncio.sleep) часто отпускают GIL.

Python 3.13t (PEP 703) — экспериментальный free-threaded build без GIL. Доступен с 2024, в продакшене пока редкость.

threading: для IO-bound

Поток (threading.Thread) — лёгкий, разделяет память с остальными. Идеален для блокирующих IO-операций.

import threading

def fetch(url):
    response = requests.get(url)
    process(response)

threads = [threading.Thread(target=fetch, args=(u,)) for u in urls]
for t in threads:
    t.start()
for t in threads:
    t.join()

ThreadPoolExecutor — проще:

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=10) as ex:
    results = list(ex.map(fetch, urls))

Когда:

  • HTTP-запросы (requests, urllib).
  • Чтение/запись файлов.
  • Запросы к БД через драйвер с blocking IO.
  • 10-100 параллельных IO-операций.

Когда нет:

  • CPU-bound (math, parsing) — упрётся в GIL.
  • Тысячи параллельных задач — overhead на поток (~8 МБ stack).

multiprocessing: для CPU-bound

Каждый процесс — отдельный Python-интерпретатор с отдельной памятью и отдельным GIL. Параллелизм по ядрам реальный.

from concurrent.futures import ProcessPoolExecutor

def compute_heavy(x):
    # CPU-bound вычисление
    return sum(i*i for i in range(x))

with ProcessPoolExecutor(max_workers=8) as ex:
    results = list(ex.map(compute_heavy, inputs))

Особенности:

  • Передача данных = pickle. Большие объекты дорого.
  • Старт процесса медленный (фунт ms), особенно на macOS / Win (spawn).
  • Shared memory через multiprocessing.shared_memory, Value, Array, Manager.
  • Логирование сложнее (один файл — race conditions, использовать QueueHandler).

Когда:

  • Pandas, numpy без GPU и без векторизации.
  • Парсинг файлов CPU-intensive.
  • Сжатие/расжатие.
  • ML-инференс на CPU.

Когда нет:

  • IO-bound — overhead процессов перевешивает.
  • Маленькие задачи — overhead больше работы.
  • Огромные данные между процессами — pickle становится bottleneck.

asyncio: для massive IO

Один поток, кооперативная многозадачность через await. Каждая корутина уступает контроль на IO.

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as r:
        return await r.text()

async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, u) for u in urls]
        return await asyncio.gather(*tasks)

results = asyncio.run(main(urls))

Свойства:

  • Тысячи concurrent IO-операций — нормально.
  • Один поток → нет переключений контекста OS, лёгкость.
  • Требуется asyncio-aware библиотеки (aiohttp, asyncpg, aiokafka).
  • Smешивание blocking-кода с asyncio = блокировка event loop.

Когда:

  • 1000+ HTTP-запросов одновременно (parser сайтов, API-агрегатор).
  • WebSocket / streaming.
  • Высокая нагрузка с маленькими CPU-затратами.

Когда нет:

  • CPU-bound — упрётся в один поток + GIL.
  • Нет async-драйвера для БД/брокера — блокирует event loop.
Готовься к собесу аналитика как в Duolingo
10 минут в день — SQL, Python, A/B, метрики. 1700+ вопросов в Telegram
Открыть Карьерник в Telegram

Сравнительная таблица

Свойство threading multiprocessing asyncio
Ускоряет CPU-bound Нет (GIL) Да Нет
Ускоряет IO-bound Да Да (но overhead) Да (масштабнее)
Память Общая Изолированная Общая
Стоимость задачи ~8 МБ ~ десятки MB ~ КБ
Overhead на старт низкий высокий низкий
Сложность кода средняя средняя высокая
Отладка средняя сложная сложная
Подходит для 10-100 IO CPU-задачи 1000+ IO

Применение в ETL

Сценарий 1: выгрузка из 50 API-эндпоинтов.

50 запросов IO-bound → ThreadPoolExecutor(50) или asyncio. Threading проще, asyncio быстрее на больших объёмах.

Сценарий 2: parsing 100 GB JSON-логов.

CPU-bound (json.loads тяжёлый) → multiprocessing на N ядер. Делим файлы между процессами, каждый процесс читает свой кусок.

Сценарий 3: load 1М сообщений в Postgres.

IO-bound (network + БД) → одноточечная вставка через bulk executemany или COPY. Параллелить почти бессмысленно — БД сама — bottleneck.

Сценарий 4: HTTP-скрейпер с rate-limit.

IO-bound с лимитами → asyncio + Semaphore. Контроль concurrency точечный.

Сценарий 5: Spark или Airflow.

Не тащи threading внутрь Spark-таска (он сам параллелится). В Airflow — ParallelTask, не self-multiprocessing.

Частые ошибки

multiprocessing для IO. Тяжёлый старт процессов перевешивает выгоду. Threading или asyncio быстрее.

threading для CPU. GIL съедает. Сравни — time будет почти одинаковый, чем больше потоков.

Mixing blocking + asyncio. time.sleep(1) внутри async-функции блокирует весь event loop. Используй await asyncio.sleep(1).

asyncio через requests. requests — синхронный. Замени на aiohttp / httpx async. Иначе loop встаёт.

Игнорировать if __name__ == '__main__'. На Windows и macOS multiprocessing требует main guard. Без него — бесконечный fork.

Не закрывать pool / executor. Использовать with или явно pool.close(); pool.join().

Сериализация через pickle. Объект должен быть picklable. Лямбды, локальные функции — нет. Используй модульные функции.

Не учитывать DB connection pool. На threading 50 потоков → 50 connections в Postgres → исчерпание max_connections. Используй pgbouncer или ограничь pool size.

asyncio для CPU-heavy. await на CPU-задаче не помогает. Вынеси в loop.run_in_executor (на ThreadPool / ProcessPool).

Связанные темы

FAQ

Когда asyncio лучше threading?

При 1000+ одновременных IO. Threading на таком объёме упирается в overhead на поток (RAM, переключения OS). asyncio в одном потоке держит десятки тысяч корутин.

Может ли GIL мешать numpy?

Numpy большую часть времени работает в C-коде с отпущенным GIL. Поэтому threading с numpy часто работает хорошо. Чистый Python-код в numpy (например, apply) — упирается в GIL.

Что такое concurrent.futures?

Унифицированный API над threading и multiprocessing. ThreadPoolExecutor / ProcessPoolExecutor дают одинаковый интерфейс — легко менять модель.

asyncio.gather vs asyncio.as_completed?

gather — ждать всех, возвращает список в порядке аргументов. as_completed — yield-ит по мере готовности. Второе удобно, когда хочется обрабатывать первые ответы, не дожидаясь самых медленных.

Как ограничить число одновременных корутин?

asyncio.Semaphore(N) + async with sem: внутри корутины. Стандартный паттерн для rate-limit.

Это официальная информация?

Нет. Статья основана на документации Python 3.12 (asyncio, threading, multiprocessing) и PEP 703 (free-threaded).


Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.