Как записать pandas DataFrame в базу
Базовый синтаксис
Стандартный способ — метод to_sql в паре с SQLAlchemy:
import pandas as pd
from sqlalchemy import create_engine
df = pd.read_csv('orders.csv')
engine = create_engine('postgresql://user:password@host:5432/mydb')
df.to_sql('orders', engine, if_exists='append', index=False)Через несколько секунд данные в базе. Проверяем в SQL:
SELECT COUNT(*) FROM orders;Параметры to_sql
if_exists управляет поведением, если таблица уже существует:
'fail'— бросить ошибку, не записать.'replace'— удалить и создать заново. Удобно для dev, опасно для prod.'append'— дописать в существующую. Наиболее частый use case.
index — писать ли pandas-индекс как колонку. Обычно False, чтобы не создавать лишний столбец.
dtype — явно задать типы в базе. Полезно, когда autodetect работает неправильно:
from sqlalchemy import Integer, String, DateTime, Numeric
df.to_sql('orders', engine, if_exists='append', index=False,
dtype={
'order_id': Integer(),
'user_id': Integer(),
'amount': Numeric(10, 2),
'created_at': DateTime(),
'status': String(20)
})Ускорение через chunksize
Для больших DataFrame прямая запись падает по timeout или памяти. Решение — писать пачками:
df.to_sql('orders', engine, if_exists='append', index=False,
chunksize=10000)Pandas разбивает DataFrame на куски по 10к строк и пишет их по отдельности. Для миллионов строк обязательно.
method='multi'
Стандартная запись делает один INSERT на строку. Для PostgreSQL и MySQL это медленно. Параметр method='multi' группирует вставки:
df.to_sql('orders', engine, if_exists='append', index=False,
chunksize=10000, method='multi')Это в несколько раз быстрее на больших объёмах. Использует multi-row INSERT, который обрабатывается базой эффективнее.
Тренироваться на таких вопросах можно в Telegram-боте Карьерник — там 1500+ задач с реальных собесов с разборами.
Максимальная скорость — COPY для PostgreSQL
Для огромных DataFrame самый быстрый способ — COPY через psycopg2:
import psycopg2
from io import StringIO
conn = psycopg2.connect(dsn='postgresql://...')
cursor = conn.cursor()
buffer = StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cursor.copy_from(buffer, 'orders', sep=',', columns=df.columns.tolist())
conn.commit()Это в 10-50 раз быстрее to_sql. Для многомиллионных DataFrame — must have.
ClickHouse
ClickHouse имеет специфический протокол. Стандартный to_sql работает плохо. Лучше использовать clickhouse-driver:
from clickhouse_driver import Client
client = Client('localhost')
client.execute('INSERT INTO orders VALUES', df.to_dict('records'))Для больших DataFrame используйте execute_iter или пакетную вставку через HTTP interface.
Таблица должна существовать
to_sql с if_exists='append' падает, если таблицы нет. Создайте заранее:
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount NUMERIC(10, 2),
created_at TIMESTAMPTZ,
status VARCHAR(20)
);Или используйте if_exists='replace' для первого запуска (потом меняйте на append).
Типы данных
Pandas не всегда правильно конвертирует типы. Обычные проблемы:
Integers с NaN становятся float в pandas, потому что NaN — это float. В базе это может быть проблемой. Используйте nullable Int64:
df['user_id'] = df['user_id'].astype('Int64') # nullable integerDatetime без timezone может вставиться в TIMESTAMPTZ как UTC. Всегда явно указывайте tz, если таблица с timezone.
Object (pandas для смеси типов) — непредсказуемо в базе. Приведите к str перед записью:
df['description'] = df['description'].astype(str)Транзакции
По умолчанию to_sql делает транзакцию. Если одна строка падает, откатываются все в текущем chunk.
Для полного контроля используйте контекст менеджер:
with engine.begin() as conn:
df.to_sql('orders', conn, if_exists='append', index=False, chunksize=10000)Это гарантирует atomic insert всей операции. Если что-то сломалось — никаких partial inserts.
К слову, набить руку на таких кейсах удобно через тренажёр в Telegram — разбирайте по 10 вопросов в день, через 2 недели тема становится рефлексом.
Upsert (UPDATE если есть, INSERT если нет)
Стандартный to_sql не поддерживает upsert. Для этого пишите SQL вручную:
from sqlalchemy import text
with engine.connect() as conn:
for _, row in df.iterrows():
conn.execute(text("""
INSERT INTO orders (order_id, amount, status)
VALUES (:id, :amount, :status)
ON CONFLICT (order_id) DO UPDATE
SET amount = EXCLUDED.amount, status = EXCLUDED.status
"""), id=row['order_id'], amount=row['amount'], status=row['status'])Медленно. Для больших upsert используйте staging table + INSERT ... SELECT.
Staging table для upsert
Быстрый upsert миллионов строк:
# 1. Загружаем в temp-таблицу
df.to_sql('orders_staging', engine, if_exists='replace', index=False)
# 2. Upsert одним SQL-запросом
with engine.begin() as conn:
conn.execute(text("""
INSERT INTO orders (order_id, amount, status)
SELECT order_id, amount, status FROM orders_staging
ON CONFLICT (order_id) DO UPDATE
SET amount = EXCLUDED.amount, status = EXCLUDED.status
"""))
# 3. Чистим staging
conn.execute(text("DROP TABLE orders_staging"))Этот подход работает за минуты даже для миллионов строк.
Типичные ошибки
Запись на prod-базу без if_exists='append'. Случайно 'replace' убьёт таблицу со всеми данными. Ставьте safeguards в коде.
Забыть index=False. Создаётся лишний столбец 'index' с porядковыми номерами. Потом этот столбец никому не нужен, но удалить его — задача.
Игнорировать chunksize. DataFrame на 10M строк прямой записью тормозит на час. chunksize=50000 делает то же за минуты.
Неправильные типы. Float вместо integer, object вместо string. Создаёт проблемы для дальнейших запросов. Проверяйте DataFrame.dtypes перед записью.
Читайте также
FAQ
to_sql или COPY?
to_sql — проще. COPY — в разы быстрее. Для одноразовых задач хватает to_sql. Для production-пайплайнов берите COPY.
Upsert через pandas?
Прямо — нет. Через staging-таблицу — легко.
Сколько чанков для большого DataFrame?
chunksize 10-50к обычно хорошо. Мельче — медленно из-за overhead. Крупнее — memory issues.
Как ускорить совсем?
Parallel write через Dask. Или COPY. Или bulk insert через специализированный driver.