programing

SQLALchemy를 사용하여 Pandas DataFrame 대량 삽입

topblog 2023. 9. 20. 20:03
반응형

SQLALchemy를 사용하여 Pandas DataFrame 대량 삽입

좀 큰 팬더 DataFrame이 있는데 SQL Alchemy를 통해 Microsoft SQL Server에 업로드하기 위해 새로운 대량 SQL 매핑을 사용하고자 합니다.방법은 좋지만 느립니다.

코드를 쓰는 데 어려움을 겪고 있어요

이 기능을 팬더 데이터 프레임으로 전달하고 싶습니다.table, 내가 부르는 스키마 이름schema, 그리고 제가 부르는 테이블 이름은name. 이상적으로 함수는 테이블이 이미 존재하는 경우 1.) 삭제합니다. 2.) 새 테이블을 만듭니다. 3.) 맵퍼를 만들고 4.) 맵퍼와 팬더 데이터를 사용하여 대량 삽입합니다.3부에서 막혔어요.

제 암호는 여기 있습니다.기본 키로 매퍼 기능을 작동시키는 방법이 고민입니다.기본 키는 필요 없지만 맵퍼 기능은 필요합니다.

통찰력 감사합니다.

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return

저는 와 비슷한 문제에 부딪혔습니다.pd.to_sql데이터를 업로드하는 데 몇 시간이 걸립니다.아래 코드 벌크는 몇초만에 동일한 데이터를 삽입하였습니다.

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()
     
#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()

그 때쯤 답변이 왔을 수도 있지만, 저는 이 사이트에서 다른 답변을 대조하고 SQLAlchemy의 문서와 일치함으로써 해결책을 찾았습니다.

  1. 테이블이 db1에 이미 존재해야 합니다. 인덱스가 auto_increment on으로 설정되어 있습니다.
  2. Class Current는 CSV에서 가져온 데이터 프레임 및 db1의 테이블과 일치해야 합니다.

이것이 누가 여기에 오든 간에 판다와 에스큐엘 화학을 빨리 혼합하고 싶어하는 데 도움이 되기를 바랍니다.

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd


# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes
session.commit()

# Close the session
session.close()

@ansonw 답변 기준:

def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
    # Create Table
    df[:0].to_sql(table, engine, if_exists=if_exists)

    # Prepare data
    output = cStringIO.StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    cursor.copy_from(output, table, sep=sep, null='')
    connection.commit()
    cursor.close()

4분이 아니라 5초만에 20000줄을 삽입합니다.

Pandas 0.25.1에는 다중 삽입을 수행하는 매개 변수가 있으므로 더 이상 SQLAlchemy로 이 문제를 해결할 필요가 없습니다.

세트method='multi'부를 때pandas.DataFrame.to_sql.

이 예에서는 다음과 같습니다.df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')

여기 문서에서 출처를 알 수

레드시프트로 이것만 테스트했다는 것을 주목할 필요가 있습니다.제가 이 답변을 업데이트할 수 있도록 다른 데이터베이스에서 어떻게 진행되는지 알려주시기 바랍니다.

이것은 I/O 작업량이 많기 때문에 다중처리.dummy를 통해 python threading module을 사용할 수도 있습니다.덕분에 일이 빨라졌습니다.

import math
from multiprocessing.dummy import Pool as ThreadPool

...

def insert_df(df, *args, **kwargs):
    nworkers = 4

    chunksize = math.floor(df.shape[0] / nworkers)
    chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
    chunks.append((chunksize * nworkers, df.shape[0]))
    pool = ThreadPool(nworkers)

    def worker(chunk):
        i, j = chunk
        df.iloc[i:j, :].to_sql(*args, **kwargs)

    pool.map(worker, chunks)
    pool.close()
    pool.join()


....

insert_df(df, "foo_bar", engine, if_exists='append')

여기 간단한 방법이 있습니다.

SQL 데이터베이스 연결용 드라이버 다운로드

리눅스 및 Mac OS의 경우:

https://learn.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017

Windows의 경우:

https://www.microsoft.com/en-us/download/details.aspx?id=56567

연결 만들기

from sqlalchemy import create_engine 
import urllib
server = '*****'
database = '********'
username = '**********'
password = '*********'

params = urllib.parse.quote_plus(
'DRIVER={ODBC Driver 17 for SQL Server};'+ 
'SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) 

engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params) 

#Checking Connection 
connected = pd.io.sql._is_sqlalchemy_connectable(engine)

print(connected)   #Output is True if connection established successfully

자료삽입

df.to_sql('Table_Name', con=engine, if_exists='append', index=False)


"""
if_exists: {'fail', 'replace', 'append'}, default 'fail'
     fail: If table exists, do nothing.
     replace: If table exists, drop it, recreate it, and insert data.
     append: If table exists, insert data. Create if does not exist.
"""

기록이 많은 경우

# limit based on sp_prepexec parameter count
tsql_chunksize = 2097 // len(bd_pred_score_100.columns)
# cap at 1000 (limit for number of rows inserted by table-value constructor)
tsql_chunksize = 1000 if tsql_chunksize > 1000 else tsql_chunksize
print(tsql_chunksize)


df.to_sql('table_name', con = engine, if_exists = 'append', index= False, chunksize=tsql_chunksize)

PS: 필요에 따라 파라미터를 변경할 수 있습니다.

dataframe pandas dataframe 하고, postgres 를 bulk insert 를합니다를 합니다.COPY my_table FROM ...

import io

import pandas as pd
from sqlalchemy import create_engine

def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
    string_data_io = io.StringIO()
    df.to_csv(string_data_io, sep='|', index=False)
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                               index=False, if_exists=if_exists, schema=schema)
    table.create()
    string_data_io.seek(0)
    string_data_io.readline()  # remove header
    with db_engine.connect() as connection:
        with connection.connection.cursor() as cursor:
            copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
            cursor.copy_expert(copy_cmd, string_data_io)
        connection.connection.commit()

위의 솔루션을 구현하려는 저와 같은 사람들을 위해:

이제 Pandas 0.24.0에 대량으로 삽입되는 chunksize 및 method='multi' 옵션이 있는 to_sql이 있습니다...

이를 통해 cx_Oracle과 SQLAchemy를 사용하여 Oracle Database에 연결할 수 있었습니다.

import sqlalchemy
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String
from sqlalchemy.orm import sessionmaker
import pandas as pd

# credentials
username = "username"
password = "password"
connectStr = "connection:/string"
tableName = "tablename"

t0 = time.time()

# connection
dsn = cx_Oracle.makedsn('host','port',service_name='servicename')

Base = declarative_base()

class LANDMANMINERAL(Base):
    __tablename__ = 'tablename'

    DOCUMENTNUM = Column(String(500), primary_key=True)
    DOCUMENTTYPE = Column(String(500))
    FILENUM = Column(String(500))
    LEASEPAYOR = Column(String(500))
    LEASESTATUS = Column(String(500))
    PROSPECT = Column(String(500))
    SPLIT = Column(String(500))
    SPLITSTATUS = Column(String(500))

engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (username, password, dsn))
conn = engine.connect()  

Base.metadata.bind = engine

# Creating the session

DBSession = sessionmaker(bind=engine)

session = DBSession()

# Bulk insertion
data = pd.read_csv('data.csv')
lists = data.to_dict(orient='records')


table = sqlalchemy.Table('landmanmineral', Base.metadata, autoreload=True)
conn.execute(table.insert(), lists)

session.commit()

session.close() 

print("time taken %8.8f seconds" % (time.time() - t0) )

아래 코드가 도움이 될 것 같습니다. 695,000K 레코드를 로드하는 동안 동일한 문제에 직면했습니다.

방법 로드하기 전에 테이블 잘라내기

with engine.begin() as conn:
     conn.execute(sa.text("TRUNCATE TABLE <schama.table>")

참고:- engine= 대상 서버, sa for("sa"로 sqalchemy 가져오기)

table_name = "<destination_table>"
df.to_sql(table_name, engine, schema = 'schema', if_exists = 'replace', index=False)

요구 사항에 따라 추가/교체 여부에 따라 다름

이 DB 를 Redshift 는 Redshift Postgres 의합니다 Postgres 로를 하여 일부 COPY FROM아니면copy_from()작동하지 않습니다.싸이캅2.프로그래밍 오류: redshift에서 copy_from redshift를 시도할 때 발생한 "stdin" 오류 또는 "stdin"에 가까운 구문 오류

Redshift로의 INSERT 속도를 높이는 방법은 파일 수집 또는 Odo를 사용하는 것입니다.

참조:
오도 http://odo.pydata.org/en/latest/perf.html 정보

://
빨간색 이동 COPY(S3 파일에서)
://

언급URL : https://stackoverflow.com/questions/31997859/bulk-insert-a-pandas-dataframe-using-sqlalchemy

반응형