Как записать pandas DataFrame в базу

Базовый синтаксис

Стандартный способ — метод to_sql в паре с SQLAlchemy:

import pandas as pd
from sqlalchemy import create_engine

df = pd.read_csv('orders.csv')

engine = create_engine('postgresql://user:password@host:5432/mydb')
df.to_sql('orders', engine, if_exists='append', index=False)

Через несколько секунд данные в базе. Проверяем в SQL:

SELECT COUNT(*) FROM orders;

Параметры to_sql

if_exists управляет поведением, если таблица уже существует:

  • 'fail' — бросить ошибку, не записать.
  • 'replace' — удалить и создать заново. Удобно для dev, опасно для prod.
  • 'append' — дописать в существующую. Наиболее частый use case.

index — писать ли pandas-индекс как колонку. Обычно False, чтобы не создавать лишний столбец.

dtype — явно задать типы в базе. Полезно, когда autodetect работает неправильно:

from sqlalchemy import Integer, String, DateTime, Numeric

df.to_sql('orders', engine, if_exists='append', index=False,
          dtype={
              'order_id': Integer(),
              'user_id': Integer(),
              'amount': Numeric(10, 2),
              'created_at': DateTime(),
              'status': String(20)
          })

Ускорение через chunksize

Для больших DataFrame прямая запись падает по timeout или памяти. Решение — писать пачками:

df.to_sql('orders', engine, if_exists='append', index=False,
          chunksize=10000)

Pandas разбивает DataFrame на куски по 10к строк и пишет их по отдельности. Для миллионов строк обязательно.

method='multi'

Стандартная запись делает один INSERT на строку. Для PostgreSQL и MySQL это медленно. Параметр method='multi' группирует вставки:

df.to_sql('orders', engine, if_exists='append', index=False,
          chunksize=10000, method='multi')

Это в несколько раз быстрее на больших объёмах. Использует multi-row INSERT, который обрабатывается базой эффективнее.

Тренироваться на таких вопросах можно в Telegram-боте Карьерник — там 1500+ задач с реальных собесов с разборами.

Максимальная скорость — COPY для PostgreSQL

Для огромных DataFrame самый быстрый способ — COPY через psycopg2:

import psycopg2
from io import StringIO

conn = psycopg2.connect(dsn='postgresql://...')
cursor = conn.cursor()

buffer = StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)

cursor.copy_from(buffer, 'orders', sep=',', columns=df.columns.tolist())
conn.commit()

Это в 10-50 раз быстрее to_sql. Для многомиллионных DataFrame — must have.

ClickHouse

ClickHouse имеет специфический протокол. Стандартный to_sql работает плохо. Лучше использовать clickhouse-driver:

from clickhouse_driver import Client

client = Client('localhost')
client.execute('INSERT INTO orders VALUES', df.to_dict('records'))

Для больших DataFrame используйте execute_iter или пакетную вставку через HTTP interface.

Таблица должна существовать

to_sql с if_exists='append' падает, если таблицы нет. Создайте заранее:

CREATE TABLE orders (
    order_id BIGINT,
    user_id BIGINT,
    amount NUMERIC(10, 2),
    created_at TIMESTAMPTZ,
    status VARCHAR(20)
);

Или используйте if_exists='replace' для первого запуска (потом меняйте на append).

Типы данных

Pandas не всегда правильно конвертирует типы. Обычные проблемы:

Integers с NaN становятся float в pandas, потому что NaN — это float. В базе это может быть проблемой. Используйте nullable Int64:

df['user_id'] = df['user_id'].astype('Int64')  # nullable integer

Datetime без timezone может вставиться в TIMESTAMPTZ как UTC. Всегда явно указывайте tz, если таблица с timezone.

Object (pandas для смеси типов) — непредсказуемо в базе. Приведите к str перед записью:

df['description'] = df['description'].astype(str)

Транзакции

По умолчанию to_sql делает транзакцию. Если одна строка падает, откатываются все в текущем chunk.

Для полного контроля используйте контекст менеджер:

with engine.begin() as conn:
    df.to_sql('orders', conn, if_exists='append', index=False, chunksize=10000)

Это гарантирует atomic insert всей операции. Если что-то сломалось — никаких partial inserts.

К слову, набить руку на таких кейсах удобно через тренажёр в Telegram — разбирайте по 10 вопросов в день, через 2 недели тема становится рефлексом.

Upsert (UPDATE если есть, INSERT если нет)

Стандартный to_sql не поддерживает upsert. Для этого пишите SQL вручную:

from sqlalchemy import text

with engine.connect() as conn:
    for _, row in df.iterrows():
        conn.execute(text("""
            INSERT INTO orders (order_id, amount, status)
            VALUES (:id, :amount, :status)
            ON CONFLICT (order_id) DO UPDATE
            SET amount = EXCLUDED.amount, status = EXCLUDED.status
        """), id=row['order_id'], amount=row['amount'], status=row['status'])

Медленно. Для больших upsert используйте staging table + INSERT ... SELECT.

Staging table для upsert

Быстрый upsert миллионов строк:

# 1. Загружаем в temp-таблицу
df.to_sql('orders_staging', engine, if_exists='replace', index=False)

# 2. Upsert одним SQL-запросом
with engine.begin() as conn:
    conn.execute(text("""
        INSERT INTO orders (order_id, amount, status)
        SELECT order_id, amount, status FROM orders_staging
        ON CONFLICT (order_id) DO UPDATE
        SET amount = EXCLUDED.amount, status = EXCLUDED.status
    """))

    # 3. Чистим staging
    conn.execute(text("DROP TABLE orders_staging"))

Этот подход работает за минуты даже для миллионов строк.

Типичные ошибки

Запись на prod-базу без if_exists='append'. Случайно 'replace' убьёт таблицу со всеми данными. Ставьте safeguards в коде.

Забыть index=False. Создаётся лишний столбец 'index' с porядковыми номерами. Потом этот столбец никому не нужен, но удалить его — задача.

Игнорировать chunksize. DataFrame на 10M строк прямой записью тормозит на час. chunksize=50000 делает то же за минуты.

Неправильные типы. Float вместо integer, object вместо string. Создаёт проблемы для дальнейших запросов. Проверяйте DataFrame.dtypes перед записью.

Читайте также

FAQ

to_sql или COPY?

to_sql — проще. COPY — в разы быстрее. Для одноразовых задач хватает to_sql. Для production-пайплайнов берите COPY.

Upsert через pandas?

Прямо — нет. Через staging-таблицу — легко.

Сколько чанков для большого DataFrame?

chunksize 10-50к обычно хорошо. Мельче — медленно из-за overhead. Крупнее — memory issues.

Как ускорить совсем?

Parallel write через Dask. Или COPY. Или bulk insert через специализированный driver.