1、在PG库中创建表
create table sip_sip.tablename
(
id bigserial not null,
column1 varchar(32),
column2 varchar(32)
)
2、 清洗数据整理好格式如下:
IP 地址 逗号 IP地址这种形式
3、 上传文件到linux 上
4、编写python 代码
# coding=utf-8
import io
import sys
import datetime
import pandas as pd
from sqlalchemy import create_engine
class Copy_Data2PG:
def __init__(self, table_name):
self.table_name = table_name
config = {
"pg_config": {
"host": "xxxx",
"user": "xxxx",
"password": "xxxx",
"port": "xxxx",
"database": "xxxx"
}
}
user = config["pg_config"]["user"]
password = config["pg_config"]["password"]
host = config["pg_config"]["host"]
database = config["pg_config"]["database"]
postgresql_url = 'postgresql+psycopg2://{}:{}@{}:5432/{}'.format(user, password, host, database)
sip_black_data_engine = create_engine(str(postgresql_url), pool_size=10, max_overflow=20)
# 初始化链接引擎 设置参数为: pool_size=10, max_overflow=20
self.sip_black_data_engine = sip_black_data_engine
"""
入库写入到PG库中
"""
def write_to_table(self, df, table_name, sip_black_data_engine, if_exists=u'append'):
db_engine = sip_black_data_engine
string_data_io = io.BytesIO()
df.to_csv(string_data_io, sep=',', index=False)
pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
index=False, if_exists=if_exists, schema=u'sip_sip')
table.create()
string_data_io.seek(0)
string_data_io.readline() # remove header
with db_engine.connect() as connection:
with connection.connection.cursor() as cursor:
copy_cmd = "COPY 模式名称.%s(col1,col2) FROM STDIN HEADER DELIMITER ',' CSV" %table_name
cursor.copy_expert(copy_cmd, string_data_io)
connection.connection.commit()
def main(self):
result_dataframe = pd.read_csv('/dist/MonitoringBlackIP/Monitor_history_abroad2telecom_ip_data/11.txt',delimiter=',')
self.write_to_table(result_dataframe, self.table_name, self.sip_black_data_engine)
if __name__ == "__main__":
table_name = "history_abroad2telecom_ip_data"
gddfttb = Copy_Data2PG(table_name)
gddfttb.main()
执行代码:
可查看数据:
数据全部很快导入了。
记得点个赞哦!