一种是导入sqlalchemy包,另一种是导入psycopg2包。
具体用法如下(此处以postgre数据库举例)
第一种:
具体用法如下(此处以postgre数据库举例)
第一种:
# 导入包 from sqlalchemy import create_engine import pandas as pd from string import Template engine = create_engine("oracle://user:pwd@***:***/racdb", echo=False) # 初始化引擎 engine = create_engine('postgresql+psycopg2://' + pg_username + ':' + pg_password + '@' + pg_host + ':' + str( pg_port) + '/' + pg_database) query_sql = """ select * from $arg1 """ query_sql = Template(query_sql) # template方法 df = pd.read_sql_query(query_sql .substitute(arg1=tablename),engine) # 配合pandas的方法读取数据库值 # 配合pandas的to_sql方法使用十分方便(dataframe对象直接入库) df.to_sql(table, engine, if_exists='replace', index=False) #覆盖入库 df.to_sql(table, engine, if_exists='append', index=False) #增量入库
注意:上述df.to_sql的方法实在是太慢太慢了,千万的数据chunksize设置为万,上传了5个小时 郁闷。查资料后得知以下方法:速度极快!!!!!
def write_to_table(df, table_name, if_exists='fail'): import io import pandas as pd from sqlalchemy import create_engine db_engine = create_engine('postgresql://***:***@***:***/***')# 初始化引擎 string_data_io = io.StringIO() df.to_csv(string_data_io, sep='|', index=False) pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine) table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df, index=False, if_exists=if_exists,schema = 'goods_code') table.create() string_data_io.seek(0) string_data_io.readline() # remove header with db_engine.connect() as connection: with connection.connection.cursor() as cursor: copy_cmd = "COPY goods_code.%s FROM STDIN HEADER DELIMITER '|' CSV" %table_name cursor.copy_expert(copy_cmd, string_data_io) connection.connection.commit()