快速将数据写入PG 库当中 Copy 方法 通过python 调用

1、在PG库中创建表

create table sip_sip.tablename
(
id bigserial not null,
column1 varchar(32),
column2 varchar(32)
)

2、 清洗数据整理好格式如下:

IP 地址 逗号 IP地址这种形式

3、 上传文件到linux 上

4、编写python 代码

# coding=utf-8
import io
import sys
import datetime
import pandas as pd
from sqlalchemy import create_engine

class Copy_Data2PG:
    def __init__(self, table_name):
        self.table_name = table_name
        config = {
            "pg_config": {
                "host": "xxxx",
                "user": "xxxx",
                "password": "xxxx",
                "port": "xxxx",
                "database": "xxxx"
                    }
        }
        user = config["pg_config"]["user"]
        password = config["pg_config"]["password"]
        host = config["pg_config"]["host"]
        database = config["pg_config"]["database"]
        postgresql_url = 'postgresql+psycopg2://{}:{}@{}:5432/{}'.format(user, password, host, database)
        sip_black_data_engine = create_engine(str(postgresql_url), pool_size=10, max_overflow=20)

        # 初始化链接引擎 设置参数为: pool_size=10, max_overflow=20
        self.sip_black_data_engine = sip_black_data_engine

    """
        入库写入到PG库中
    """

    def write_to_table(self, df, table_name, sip_black_data_engine, if_exists=u'append'):
        db_engine = sip_black_data_engine
        string_data_io = io.BytesIO()
        df.to_csv(string_data_io, sep=',', index=False)
        pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
        table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                                   index=False, if_exists=if_exists, schema=u'sip_sip')
        table.create()
        string_data_io.seek(0)
        string_data_io.readline()  # remove header
        with db_engine.connect() as connection:
            with connection.connection.cursor() as cursor:
                copy_cmd = "COPY 模式名称.%s(col1,col2) FROM STDIN HEADER DELIMITER ',' CSV" %table_name
                cursor.copy_expert(copy_cmd, string_data_io)
            connection.connection.commit()



    def main(self):
        result_dataframe = pd.read_csv('/dist/MonitoringBlackIP/Monitor_history_abroad2telecom_ip_data/11.txt',delimiter=',')

        self.write_to_table(result_dataframe, self.table_name, self.sip_black_data_engine)

if __name__ == "__main__":

    table_name = "history_abroad2telecom_ip_data"
    gddfttb = Copy_Data2PG(table_name)
    gddfttb.main()

执行代码:

可查看数据:

数据全部很快导入了。

记得点个赞哦!

猜你喜欢

转载自blog.csdn.net/Cincinnati_De/article/details/84976990