Async Python для аналитика: параллельные запросы и ETL

Зачем аналитику асинхронность

Аналитик часто пишет скрипты, которые обращаются к внешним источникам: REST API, несколько баз данных, загрузка большого количества файлов. Классический синхронный подход делает каждый запрос последовательно.

Представьте задачу: выгрузить данные по 1000 пользователям из CRM API. Каждый запрос занимает 200 мс. Последовательно — 200 секунд. С параллельными запросами — 5 секунд. Разница в 40 раз.

Asyncio — стандартная библиотека Python для асинхронного кода. Она позволяет одновременно держать много запросов «в воздухе», не блокируя поток. Для аналитика это означает быстрее, эффективнее, производительнее.

Не все задачи выигрывают от async. CPU-bound работа (тяжёлый расчёт в pandas, ML-обучение) не ускоряется. Async помогает только IO-bound задачам — сети, диск, базы.

Базовый пример

Синхронная версия загрузки нескольких URL:

import requests

def fetch(url):
    r = requests.get(url)
    return r.json()

urls = ['https://api.example.com/1', 'https://api.example.com/2', ...]
results = [fetch(url) for url in urls]

Если каждый запрос 200 мс, то 10 запросов — 2 секунды.

Async версия с aiohttp:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as r:
        return await r.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

results = asyncio.run(main())

Те же 10 запросов — 0.3 секунды (столько, сколько самый медленный). Все запросы идут параллельно.

Ключевые концепты

async def объявляет асинхронную функцию. Она возвращает coroutine, которая выполняется только при вызове через await или event loop.

await — ожидание результата corrоutine без блокировки потока. Во время ожидания Python может работать над другими задачами.

asyncio.gather — параллельный запуск нескольких corоutines. Возвращает список результатов, когда все завершились.

asyncio.run — entry point. Запускает event loop, выполняет async-функцию, закрывает loop.

Начинающих часто путает: нельзя вызвать async-функцию как обычную. fetch(url) не выполнит запрос — вернёт coroutine object. Нужно await fetch(url) внутри другой async-функции или asyncio.run(fetch(url)) в sync-коде.

Rate limiting

API обычно имеют ограничения — не более N запросов в секунду. Чистый asyncio.gather без контроля быстро получит 429 Too Many Requests.

Для ограничения используют Semaphore:

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as r:
            return await r.json()

async def main():
    sem = asyncio.Semaphore(10)  # максимум 10 одновременных
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, sem) for url in urls]
        return await asyncio.gather(*tasks)

Semaphore с лимитом 10 гарантирует, что одновременно выполняется не более 10 запросов. Остальные ждут в очереди.

Для точного rate limit (например, 100 req/sec) есть библиотеки aiolimiter:

from aiolimiter import AsyncLimiter

limiter = AsyncLimiter(100, 1)  # 100 запросов за 1 секунду

async def fetch_rate_limited(session, url):
    async with limiter:
        async with session.get(url) as r:
            return await r.json()

Практика: выгрузка из API

Типичная задача — забрать все страницы из paginated API:

import asyncio
import aiohttp

async def fetch_page(session, page):
    url = f'https://api.example.com/events?page={page}'
    async with session.get(url, headers={'Authorization': 'Bearer TOKEN'}) as r:
        data = await r.json()
        return data['results']

async def fetch_all():
    async with aiohttp.ClientSession() as session:
        # Сначала узнаём общее число страниц
        async with session.get('https://api.example.com/events?page=1') as r:
            first = await r.json()
            total_pages = first['meta']['total_pages']

        # Параллельно скачиваем все страницы
        tasks = [fetch_page(session, p) for p in range(1, total_pages + 1)]
        all_results = await asyncio.gather(*tasks)

    # Flatten
    return [item for page_results in all_results for item in page_results]

data = asyncio.run(fetch_all())

Если API имеет 100 страниц по 200 мс каждая:

  • Sync: 20 секунд.
  • Async без лимита: 0.2-1 секунда (но API может забанить).
  • Async с лимитом 10: 2 секунды.

Оптимальный лимит — обычно указан в документации API.

Async с базами данных

Для PostgreSQL есть asyncpg — быстрый async-драйвер:

import asyncpg
import asyncio

async def fetch_user_stats(user_id):
    conn = await asyncpg.connect('postgresql://user:pass@host/db')
    row = await conn.fetchrow(
        'SELECT SUM(amount) FROM orders WHERE user_id = $1',
        user_id
    )
    await conn.close()
    return row

async def many_users(user_ids):
    tasks = [fetch_user_stats(uid) for uid in user_ids]
    return await asyncio.gather(*tasks)

Это даёт параллельные запросы к БД. Но осторожно с connection pool — если каждая задача создаёт соединение, быстро уперётесь в лимит connections.

Лучше pool:

async def main():
    pool = await asyncpg.create_pool('postgresql://...', min_size=5, max_size=20)
    # Используем pool для всех задач
    ...
    await pool.close()

Понимание асинхронного программирования становится полезным на middle-senior уровне аналитика, когда задачи вырастают из «скрипт на коленке» в production-пайплайны. В тренажёре Карьерник есть упражнения на современный Python-стек.

Error handling

asyncio.gather падает целиком, если одна задача упала. Чтобы продолжить с остальными:

results = await asyncio.gather(*tasks, return_exceptions=True)

# results может содержать exceptions вместо значений
for result in results:
    if isinstance(result, Exception):
        print(f'Error: {result}')
    else:
        process(result)

Это важно для production ETL — один сломанный endpoint не должен валить весь скрипт.

Concurrent vs parallel

Важно понимать разницу:

Concurrent (asyncio) — много задач переключаются в одном потоке. Подходит для IO-bound.

Parallel (multiprocessing) — задачи на разных CPU-ядрах. Подходит для CPU-bound.

Смешать можно: asyncio + ProcessPoolExecutor для вычислений внутри async-кода.

Когда asyncio не нужен

Если ваш скрипт:

  • Работает только с локальными файлами и pandas.
  • Делает один запрос к БД и считает что-то.
  • Сложные вычисления доминируют над сетью.

— asyncio не ускорит.

Ускорит:

  • 10+ запросов к API.
  • Множественная загрузка файлов.
  • Parallel обращение к нескольким источникам данных.

Ошибки начинающих

Смешивать async и sync. requests.get(url) внутри async def — блокирует весь event loop. Используйте aiohttp или wraps в asyncio.to_thread.

Забыть await. fetch(url) без await возвращает coroutine, не результат. Забытый await — самая частая ошибка.

Нагружать event loop CPU-задачами. Тяжёлый numpy-расчёт блокирует loop. Для CPU нужен executor.

Не закрывать ресурсы. Connections, sessions должны закрываться через async with или явно await .close().

Когда пробовать

Если ваш скрипт выгрузки из API работает 30+ секунд из-за сети — попробуйте переписать на aiohttp. Одна итерация обучения + переписывание — день работы. В результате скрипт ускоряется в 5-20 раз.

Для одноразовых задач это может быть оверкилл. Но для regular pipeline, который запускается каждый день, async окупается сразу.

Читайте также

FAQ

asyncio сложнее обычного Python?

Концептуально да. Event loop, coroutines, await — нужно понять. После первых 2-3 скриптов становится интуитивно.

Использовать в pandas-коде?

pandas — CPU-bound, асинхронность не даст выигрыша. Async полезен перед pandas (загрузить данные быстрее) или после (отправить результаты в несколько API параллельно).

aiohttp или httpx?

aiohttp — более старый и устоявшийся. httpx — современнее, поддерживает и sync и async одновременно. Для новых проектов можно httpx.

Threads vs async?

Threads тоже позволяют concurrency, но имеют overhead на переключение. Для 100+ одновременных запросов async эффективнее. Для 2-10 — можно threads, проще код.