Async Python для аналитика: параллельные запросы и ETL

Проверь себя · 1/3разбор после ответа
Какой результат даст выражение set([1, 2, 2, 3, 3])?

Зачем аналитику асинхронность

Аналитик часто пишет скрипты, которые обращаются к внешним источникам: REST API, несколько баз данных, загрузка большого количества файлов. Классический синхронный подход делает каждый запрос последовательно.

Представьте задачу: выгрузить данные по 1000 пользователям из CRM API. Каждый запрос занимает 200 мс. Последовательно — 200 секунд. С параллельными запросами — 5 секунд. Разница в 40 раз.

Asyncio — стандартная библиотека Python для асинхронного кода. Она позволяет одновременно держать много запросов «в воздухе», не блокируя поток. Для аналитика это означает быстрее, эффективнее, производительнее.

Не все задачи выигрывают от async. CPU-bound работа (тяжёлый расчёт в pandas, обучение ML) не ускоряется. Async помогает только IO-bound задачам — сеть, диск, базы.

Базовый пример

Синхронная версия загрузки нескольких URL:

import requests

def fetch(url):
    r = requests.get(url)
    return r.json()

urls = ['https://api.example.com/1', 'https://api.example.com/2', ...]
results = [fetch(url) for url in urls]

Если каждый запрос 200 мс, то 10 запросов — 2 секунды.

Async версия с aiohttp:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as r:
        return await r.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    return results

results = asyncio.run(main())

Те же 10 запросов — 0.3 секунды (столько, сколько самый медленный). Все запросы идут параллельно.

Ключевые концепции

async def объявляет асинхронную функцию. Она возвращает корутину, которая выполняется только при вызове через await или event loop.

await — ожидание результата корутины без блокировки потока. Во время ожидания Python может работать над другими задачами.

asyncio.gather — параллельный запуск нескольких корутин. Возвращает список результатов, когда все завершились.

asyncio.run — точка входа. Запускает event loop, выполняет async-функцию, закрывает loop.

Новичков часто путает: нельзя вызвать async-функцию как обычную. fetch(url) не выполнит запрос — вернёт объект-корутину. Нужно await fetch(url) внутри другой async-функции или asyncio.run(fetch(url)) в синхронном коде.

Ограничение частоты запросов

API обычно имеют ограничения — не более N запросов в секунду. Чистый asyncio.gather без контроля быстро получит 429 Too Many Requests.

Для ограничения используют Semaphore:

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        async with session.get(url) as r:
            return await r.json()

async def main():
    sem = asyncio.Semaphore(10)  # максимум 10 одновременных
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, sem) for url in urls]
        return await asyncio.gather(*tasks)

Semaphore с лимитом 10 гарантирует, что одновременно выполняется не более 10 запросов. Остальные ждут в очереди.

Для точного лимита частоты (например, 100 запросов в секунду) есть библиотека aiolimiter:

from aiolimiter import AsyncLimiter

limiter = AsyncLimiter(100, 1)  # 100 запросов за 1 секунду

async def fetch_rate_limited(session, url):
    async with limiter:
        async with session.get(url) as r:
            return await r.json()

Практика: выгрузка из API

Типичная задача — забрать все страницы из API с пагинацией:

import asyncio
import aiohttp

async def fetch_page(session, page):
    url = f'https://api.example.com/events?page={page}'
    async with session.get(url, headers={'Authorization': 'Bearer TOKEN'}) as r:
        data = await r.json()
        return data['results']

async def fetch_all():
    async with aiohttp.ClientSession() as session:
        # Сначала узнаём общее число страниц
        async with session.get('https://api.example.com/events?page=1') as r:
            first = await r.json()
            total_pages = first['meta']['total_pages']

        # Параллельно скачиваем все страницы
        tasks = [fetch_page(session, p) for p in range(1, total_pages + 1)]
        all_results = await asyncio.gather(*tasks)

    # Разворачиваем в плоский список
    return [item for page_results in all_results for item in page_results]

data = asyncio.run(fetch_all())

Если у API 100 страниц по 200 мс каждая:

  • Синхронно: 20 секунд.
  • Async без лимита: 0.2-1 секунда (но API может забанить).
  • Async с лимитом 10: 2 секунды.

Оптимальный лимит — обычно указан в документации API.

Async с базами данных

Для PostgreSQL есть asyncpg — быстрый async-драйвер:

import asyncpg
import asyncio

async def fetch_user_stats(user_id):
    conn = await asyncpg.connect('postgresql://user:pass@host/db')
    row = await conn.fetchrow(
        'SELECT SUM(amount) FROM orders WHERE user_id = $1',
        user_id
    )
    await conn.close()
    return row

async def many_users(user_ids):
    tasks = [fetch_user_stats(uid) for uid in user_ids]
    return await asyncio.gather(*tasks)

Это даёт параллельные запросы к БД. Но осторожно с пулом соединений — если каждая задача создаёт соединение, быстро уперётесь в лимит подключений.

Лучше пул:

async def main():
    pool = await asyncpg.create_pool('postgresql://...', min_size=5, max_size=20)
    # Используем pool для всех задач
    ...
    await pool.close()

Понимание асинхронного программирования становится полезным на middle-senior уровне аналитика, когда задачи вырастают из «скрипт на коленке» в продовые пайплайны. В тренажёре Карьерник есть упражнения на современный Python-стек.

Закрепи Python для аналитика
200+ задач по pandas, numpy и работе с данными — с разборами
Тренировать Python в Telegram

Обработка ошибок

asyncio.gather падает целиком, если одна задача упала. Чтобы продолжить с остальными:

results = await asyncio.gather(*tasks, return_exceptions=True)

# results может содержать исключения вместо значений
for result in results:
    if isinstance(result, Exception):
        print(f'Error: {result}')
    else:
        process(result)

Это важно для продового ETL — один сломанный endpoint не должен валить весь скрипт.

Concurrent vs parallel

Важно понимать разницу:

Concurrent (asyncio) — много задач переключаются в одном потоке. Подходит для IO-bound.

Parallel (multiprocessing) — задачи на разных ядрах CPU. Подходит для CPU-bound.

Можно смешать: asyncio + ProcessPoolExecutor для вычислений внутри async-кода.

Когда asyncio не нужен

Если ваш скрипт:

  • Работает только с локальными файлами и pandas.
  • Делает один запрос к БД и считает что-то.
  • Сложные вычисления доминируют над сетью.

— asyncio не ускорит.

Ускорит:

  • 10+ запросов к API.
  • Множественная загрузка файлов.
  • Параллельное обращение к нескольким источникам данных.

Типичные ошибки новичков

Смешивать async и sync. requests.get(url) внутри async def — блокирует весь event loop. Используйте aiohttp или оборачивайте в asyncio.to_thread.

Забыть await. fetch(url) без await возвращает корутину, а не результат. Забытый await — самая частая ошибка.

Нагружать event loop CPU-задачами. Тяжёлый расчёт в NumPy блокирует loop. Для CPU нужен executor.

Не закрывать ресурсы. Соединения и сессии должны закрываться через async with или явный await .close().

Когда пробовать

Если ваш скрипт выгрузки из API работает 30+ секунд из-за сети — попробуйте переписать на aiohttp. Одна итерация обучения + переписывание — день работы. В результате скрипт ускоряется в 5-20 раз.

Для одноразовых задач это может быть избыточно. Но для регулярного пайплайна, который запускается каждый день, async окупается сразу.

Читайте также

FAQ

asyncio сложнее обычного Python?

Концептуально да. Event loop, корутины, await — нужно понять. После первых 2-3 скриптов становится интуитивно.

Использовать в pandas-коде?

Pandas — CPU-bound, асинхронность не даст выигрыша. Async полезен перед pandas (загрузить данные быстрее) или после (отправить результаты в несколько API параллельно).

aiohttp или httpx?

aiohttp — более старый и устоявшийся. httpx — современнее, поддерживает sync и async одновременно. Для новых проектов можно httpx.

Потоки vs async?

Потоки тоже позволяют конкурентность, но имеют накладные расходы на переключение. Для 100+ одновременных запросов async эффективнее. Для 2-10 — можно потоки, проще код.