朋友们,如果你接触大数据,肯定离不开greenplum,需要你用python进行操作和处理,这里面需要封装很多函数,下面我封装一个较全的类,你们可以自己下载后修改后复用。
import pandas as pd
import psycopg2
import psycopg2.extras
from io import StringIO
import os
class GPDB:
def __init__(self):
self.dbname = "bigdata"
self.user = "bigdata"
self.password = "bigdata!"
self.host = "localhost"
self.port = "5432"
def gp_connect(self):
try:
db = psycopg2.connect(dbname=self.dbname,
user=self.user,
password=self.password,
host=self.host,
port=self.port)
return db
except psycopg2.DatabaseError as e:
print("could not connect to Greenplum server", e)
def select_data(self, sql):
conn = self.gp_connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(sql)
data = cur.fetchall()
conn.close() # 关闭连接
return data
def truncate_table(self, sql):
try:
conn = self.gp_connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# Delete data row from table
cur.execute(sql)
conn.commit()
cur.close()
print("TRUNCATE TABLE")
except Exception as e:
raise e
finally:
cur.close()
conn.close()
def insert_df(self, tablename, df: pd.DataFrame):
placeholders = ', '.join(['%s'] * df.shape[1])
columns_str = ', '.join(df.columns)
sql = "insert into {}({})values ({})".format(tablename, columns_str, placeholders)
conn = self.gp_connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.executemany(sql, df.values)
def read_df_format(self, sql):
conn = self.gp_connect()
df = pd.read_sql(sql=sql, con=conn)
return df
def insert_copy_df(self, tablename, df: pd.DataFrame()):
fp = StringIO()
df.to_csv(fp, sep='&', index=False, header=False)
value = fp.getvalue()
conn = self.gp_connect()
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.copy_from(StringIO(value), tablename, sep="&",
columns=df.columns)
conn.commit()
cur.close()
conn.close()
def copy_from_file(self, table, df_path):
"""
Here we are going save the dataframe on disk as
a csv file, load the csv file
and use copy_from() to copy it to the table
"""
# Save the dataframe to disk
f = open(df_path, 'r', encoding="utf8")
conn = self.gp_connect()
cursor = self.gp_connect().cursor()
try:
cursor.copy_from(f, table, sep="&")
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cursor.close()
return 1
print("copy_from_file() done")
cursor.close()
上面是连接类,用的时候也非常方便,只需要创建连接,调用即可:
gpdb = GPDB()
gpd = gpdb.read_df_format("select * from demo")
批量数据插入可以用:
gpdb.insert_copy_df("daba.demo", df)