Генераторы и итераторы в Python для Data Engineer
Карьерник — Duolingo для аналитиков: 10 минут в день тренируй SQL, Python, A/B, статистику, метрики и ещё 3 темы собеса. 1500+ вопросов в Telegram-боте. Бесплатно.
Содержание:
Зачем спрашивают на собесе DE
DE постоянно работает с данными, которые не влезают в память. Прочитать 50 ГБ CSV в DataFrame и сразу OOM — не вариант. Генераторы и потоковая обработка — must-have. На собесе DE: «как обработать большой файл», «отличие генератора от списка», «что такое itertools.islice».
Главная боль без понимания — DE пишет pd.read_csv('huge.csv') и удивляется, почему джоб упал на 32 ГБ memory. С генератором или chunksize — работает на 200 МБ.
Iterator vs iterable: протокол
Iterable — объект, который можно перебрать (for x in ...). Реализует __iter__. Списки, словари, файлы, range, set — iterable.
Iterator — объект, реализующий __next__ и __iter__. На каждом вызове next() возвращает следующий элемент или поднимает StopIteration.
nums = [1, 2, 3]
it = iter(nums)
print(next(it)) # 1
print(next(it)) # 2
print(next(it)) # 3
print(next(it)) # StopIterationfor под капотом — это iter() + цикл next() до StopIteration.
Iterator одноразовый. После полного прохода новые next() всегда дают StopIteration.
Генераторы через yield
Функция с yield — генератор. На каждом yield возвращает значение и сохраняет состояние. На следующий next() — продолжает с этой точки.
def gen_evens(n):
for i in range(n):
if i % 2 == 0:
yield i
g = gen_evens(10)
print(list(g)) # [0, 2, 4, 6, 8]Свойства:
- O(1) memory — не накапливает результат.
- Lazy — значения вычисляются по запросу.
- Можно бесконечный (
while True: yield ...) — главное не материализовать.
yield from — делегирование к другому iterable.
def gen_a():
yield from [1, 2, 3]
yield from [4, 5, 6]
list(gen_a()) # [1, 2, 3, 4, 5, 6]Generator expression и list comprehension
Один и тот же синтаксис, разница в скобках:
# list comp — материализуется в памяти
nums = [x*x for x in range(10**7)] # ~80 MB
# gen expr — ленивый
nums = (x*x for x in range(10**7)) # ~200 байтИспользуй gen expr, когда нужно один раз пройти. Используй list comp, когда нужно индексировать или несколько раз обходить.
# суммирование без материализации
total = sum(x*x for x in range(10**7))
# любой next()
big = (line for line in open('huge.txt'))
first = next(big)itertools для DE
Стандартная библиотека для работы с итерациями.
islice — leniently взять подсписок:
from itertools import islice
with open('big.csv') as f:
head = list(islice(f, 5)) # первые 5 строк, без чтения файла целикомchain — конкатенация:
from itertools import chain
combined = chain(file1, file2, file3)groupby — группировка подряд идущих элементов:
from itertools import groupby
events = [('a', 1), ('a', 2), ('b', 3), ('b', 4), ('a', 5)]
for key, group in groupby(events, key=lambda x: x[0]):
print(key, list(group))
# a [(a,1), (a,2)]
# b [(b,3), (b,4)]
# a [(a,5)]Внимание: groupby группирует только подряд идущие. Для полной группировки — отсортируй сначала.
tee — разветвить итератор:
from itertools import tee
g1, g2 = tee(my_gen, 2)Аккуратно: tee буферизует — если потребление веток сильно расходится, память съесть можно.
accumulate — running totals:
from itertools import accumulate
list(accumulate([1, 2, 3, 4])) # [1, 3, 6, 10]Большой файл чанками
Построчно (CSV/JSONL). Файл — iterable строк. Не нужны хитрости.
with open('huge.jsonl') as f:
for line in f:
record = json.loads(line)
process(record)Чанки фиксированного размера.
def chunked(iterable, size):
it = iter(iterable)
while True:
chunk = list(islice(it, size))
if not chunk:
break
yield chunk
with open('big.csv') as f:
for chunk in chunked(f, 10000):
# bulk insert в БД
cur.executemany('INSERT ...', chunk)Дешёвая идиома для batch-вставки.
pandas.read_csv с chunksize:
for chunk_df in pd.read_csv('huge.csv', chunksize=100_000):
chunk_df.to_sql('table', con, if_exists='append', index=False)Чанк = DataFrame в 100k строк. Память контролируема.
Pipeline через генераторы
Композиция генераторов = data pipeline без промежуточной материализации.
def read_lines(path):
with open(path) as f:
for line in f:
yield line.rstrip()
def parse_json(lines):
for line in lines:
yield json.loads(line)
def filter_active(records):
for r in records:
if r.get('is_active'):
yield r
def to_db_rows(records):
for r in records:
yield (r['id'], r['email'], r['updated_at'])
# композиция
pipeline = to_db_rows(filter_active(parse_json(read_lines('users.jsonl'))))
with conn.cursor() as cur:
for chunk in chunked(pipeline, 5000):
cur.executemany('INSERT INTO users VALUES (%s, %s, %s)', chunk)Память O(chunk_size), не O(file_size). Каждый шаг — простая функция, легко тестировать.
Частые ошибки
list(generator) для проверки. Полная материализация — теряем выгоду генератора. Если нужно посчитать count — sum(1 for _ in gen).
Двойной проход по генератору. Iterator одноразовый. Если нужно дважды — либо tee, либо переподключайся к источнику.
Забыть про close на файле. При for line in open(...) файл не закрывается явно. Используй with.
groupby на неотсортированных данных. Группирует только подряд идущие. Без sorted() сначала результат удивит.
Хранить состояние в замыкании генератора. Сложно отлаживать. Лучше — class с __iter__ или явный объект.
Использовать generator там, где нужна индексация. g[5] не работает. list(g) материализует, потом индексируй.
Бесконечный генератор + list(). OOM. Защита: islice или take.
for line in f.readlines(). Читает весь файл. for line in f — потоково.
Связанные темы
- Bash и Unix для Data Engineer
- Pandas для DE: чанки
- Idempotentnost пайплайна для DE
- SQL для Data Engineer: собеседование
- Подготовка к собесу Data Engineer
FAQ
Чем отличается yield от return?
return завершает функцию. yield приостанавливает с сохранением состояния. Следующий next() продолжает с этой точки.
yield from зачем?
Делегирование итерации. yield from gen_b() равноценно for x in gen_b(): yield x, но плюс поддерживает send, throw, возвращаемое значение.
Generator expression медленнее списка?
На один проход — обычно одинаково. Главное преимущество — память. На multiple-pass — list, потому что повторно создавать gen-expr дороже.
pd.read_csv(chunksize=N) возвращает что?
TextFileReader — itertable, выдающий DataFrame по N строк. Ровно как генератор, можно for chunk_df in reader.
Как сделать генератор thread-safe?
По умолчанию — нет. Обернуть в lock или использовать queue.Queue для producer-consumer pattern.
Это официальная информация?
Нет. Статья основана на документации Python 3.12, PEP 255 (yield), PEP 380 (yield from).
Тренируйте Data Engineering — откройте тренажёр с 1500+ вопросами для собесов.