需求是这样的:将两个数据集进行ks检验,算中位数方差标准差等数据,最后输出到数据库中
import psycopg2
import os
import pandas as pd
from scipy.stats import ks_2samp
import numpy as np
from sqlalchemy import create_engine
# 用来操作数据库的类
class GPCommand(object):
# 类的初始化
def __init__(self):
self.hostname = 'ip'
self.username = 'name'
self.password = 'password'
self.database = 'database'
def connectGp(self):
try:
#链接数据库
#读取配置利用connect链接数据库
self.connect = psycopg2.connect( host=self.hostname, user=self.username, password=self.password, dbname=self.database )
#创建一个新的cursor
self.cursor = self.connect.cursor()
print("connect gp successful."+'\n' + '数据库连接成功')
return ('con_successful')
except psycopg2.Error:
error = 'Failed to setup Postgres environment.\n{0}'.format(sys.exc_info())
print('connect gp error.'+'\n' + '数据库连接失败')
return 'con_error'+ error
#关闭数据库
def closeMysql(self):
self.cursor.close()
self.connect.close()
print("数据库已关闭")
def select_data(self):
try:
select_sql1 = "select distinct a.order_no,a.cust_no,b.item_num from dw_edw.edw_ord_fct_order_info a " \
"left join (select order_no,count(distinct item_no) as item_num " \
"from dw_edw.edw_ord_fct_order_items_info group by order_no)b on a.order_no=b.order_no where a.customer_confirm_date>='2015-01-01 00:00:00' "
self.cursor.execute(select_sql1)
rows1 = self.cursor.fetchall()
select_sql2 = "select t1.*,t2.order_num,t3.item_num from( select distinct cust_no,order_no from dw_edw.edw_ord_fct_order_info " \
"where customer_confirm_date>='2015-01-01 00:00:00' )t1 left join ( select cust_no,count(distinct order_no) as order_num " \
"from dw_edw.edw_ord_fct_order_info where customer_confirm_date>='2015-01-01 00:00:00' group by cust_no )t2 on t1.cust_no=t2.cust_no " \
"left join (select order_no,count(distinct item_no) as item_num from dw_edw.edw_ord_fct_order_items_info group by order_no)t3 " \
"on t1.order_no=t3.order_no where t2.order_num>5 "
self.cursor.execute(select_sql2)
rows2 = self.cursor.fetchall()
return rows1,rows2
except Exception as e:
print(e)
os._exit(0)
def kt(df1,df2,x):
itemks = df1['item_num1']
beta = itemks
result2 =df2.groupby('cust_no2').count().reset_index()
res_fram2 = result2.iloc[x]
res_fram2_cust = res_fram2['cust_no2']
res2 = df2[df2['cust_no2'] == res_fram2_cust]
sampleks = res2['item_num2']
norm = sampleks
ks = ks_2samp(beta, norm)
# df = pd.DataFrame({'cust_no':[res_fram2_cust],
# 'pvalue': [ks.pvalue]})
return res_fram2_cust,ks.pvalue
def insert_data(data):
engine = create_engine('postgresql://name:password@ip:port/database')
try:
data.to_sql('market_sales_precision_ks_p_test',schema='dw_ana', con=engine, index=False, if_exists='append')
except Exception as e:
print(e)
def main():
gpCommand = GPCommand()
gpCommand.connectGp()
####计算平均值和方差/标准差
res1 = gpCommand.select_data()
cust_no1 = []
item_num1 = []
for item1 in res1[0]:
cust_no1.append(item1[1])
item_num1.append(item1[2])
df1 = pd.DataFrame({'cust_no1':cust_no1,
'item_num1':item_num1 })
result1 = df1.groupby('cust_no1')
res1_mean1 = df1.groupby('cust_no1').mean()
res1_var1 = df1.groupby('cust_no1').var()
res1_std1 = df1.groupby('cust_no1').std()
res1_median1 = df1.groupby('cust_no1').median()
cust_no2 = []
item_num2 = []
for item2 in res1[1]:
cust_no2.append(item2[0])
item_num2.append(item2[3])
df2 = pd.DataFrame({'cust_no2': cust_no2,
'item_num2': item_num2})
res1_mean2 = df2.groupby('cust_no2').mean()
res1_var2 = df2.groupby('cust_no2').var()
res1_std2 = df2.groupby('cust_no2').std()
res1_median2 = df2.groupby('cust_no2').median()
####分布分析-ks检验
data = []
df2_size = df2.groupby('cust_no2').size()
for i in range(0,len(df2_size)):
datef = kt(df1,df2,i)
data.append(datef)
data_cust1=[]
data_p1=[]
for item in data:
data_cust1.append(item[0])
data_p1.append(item[1])
df_pdata = pd.DataFrame({'cust_no': data_cust1,
'p': data_p1})
gpCommand.closeMysql()
insert_data(df_pdata)
main()