python 之Postgres连接

       朋友们,如果你接触大数据,肯定离不开greenplum,需要你用python进行操作和处理,这里面需要封装很多函数,下面我封装一个较全的类,你们可以自己下载后修改后复用。

import pandas as pd
import psycopg2
import psycopg2.extras
from io import StringIO
import os


class GPDB:
    def __init__(self):
        self.dbname = "bigdata"
        self.user = "bigdata"
        self.password = "bigdata!"
        self.host = "localhost"
        self.port = "5432"

    def gp_connect(self):
        try:
            db = psycopg2.connect(dbname=self.dbname,
                                  user=self.user,
                                  password=self.password,
                                  host=self.host,
                                  port=self.port)
            return db
        except psycopg2.DatabaseError as e:
            print("could not connect to Greenplum server", e)

    def select_data(self, sql):
        conn = self.gp_connect()
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.execute(sql)
        data = cur.fetchall()
        conn.close()  # 关闭连接
        return data
    def truncate_table(self, sql):
        try:
            conn = self.gp_connect()
            cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
            # Delete data row from table
            cur.execute(sql)
            conn.commit()
            cur.close()
            print("TRUNCATE TABLE")
        except Exception as e:
            raise e
        finally:
            cur.close()
            conn.close()
    def insert_df(self, tablename, df: pd.DataFrame):
        placeholders = ', '.join(['%s'] * df.shape[1])
        columns_str = ', '.join(df.columns)
        sql = "insert into {}({})values ({})".format(tablename, columns_str, placeholders)
        conn = self.gp_connect()
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.executemany(sql, df.values)

    def read_df_format(self, sql):
        conn = self.gp_connect()
        df = pd.read_sql(sql=sql, con=conn)
        return df

    def insert_copy_df(self, tablename, df: pd.DataFrame()):
        fp = StringIO()
        df.to_csv(fp, sep='&', index=False, header=False)
        value = fp.getvalue()
        conn = self.gp_connect()
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cur.copy_from(StringIO(value), tablename, sep="&",
                      columns=df.columns)
        conn.commit()
        cur.close()
        conn.close()

    def copy_from_file(self, table, df_path):
        """
        Here we are going save the dataframe on disk as
        a csv file, load the csv file
        and use copy_from() to copy it to the table
        """
        # Save the dataframe to disk
        f = open(df_path, 'r', encoding="utf8")
        conn = self.gp_connect()
        cursor = self.gp_connect().cursor()
        try:
            cursor.copy_from(f, table, sep="&")
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            return 1
        print("copy_from_file() done")
        cursor.close()

 上面是连接类,用的时候也非常方便,只需要创建连接,调用即可:

gpdb = GPDB()
gpd = gpdb.read_df_format("select * from demo")

批量数据插入可以用:

gpdb.insert_copy_df("daba.demo", df)

猜你喜欢

转载自blog.csdn.net/qq_23953717/article/details/130648028