1,读取数据
2,创建外表
from pyhive import hive
import pandas as pd
import datetime
from datetime import timedelta
import time
#获取Hive连接
def getHiveConn():
conn = None
try:
conn = hive.Connection(host='20.58.32.9', port=10000, username='hive', database='xdja')
except Exception as e:
print(e)
return conn
#读取Hive数据
def readDataFromHive(conn, sql):
curosr = conn.cursor()
print(sql)
curosr.execute(sql)
result = curosr.fetchall()
df = pd.DataFrame(result)
return df
#关闭Hive数据连接
def closeHiveConn(conn):
if conn != None:
conn.close()
#给定开始时间和结束时间,返回时间DF
def timeToDF(start_day, end_day):
timeDF = pd.DataFrame()
while start_day < end_day:
time_str = datetime.datetime.strftime(start_day, '%Y%m%d')
start_day = start_day + timedelta(days = 1)
timeDF = pd.concat([timeDF, pd.DataFrame([time_str])], axis = 1)
timeDF = timeDF.reset_index(drop = True)
return timeDF
#获取不同个数的采集点,并将其拼接成字符串
#begin:开始点数,一般取1
#pointType:采集点类型,比如96,24
#dataType:采集数据类型,比如'P'或者'R'
def dataPointToStr(begin, pointType, dataType):
string = ''
for i in range(begin, 97, int(96/pointType)):
string = string + dataType + str(i) + ' string,'
string = string[:-1]
return string
#为创建的hive表字段拼接成字符串
def hiveFieldToStr():
start_day_str = '2018-01-01'
start_day = datetime.datetime.strptime(start_day_str, '%Y-%m-%d')
end_day_str = '2019-07-01'
end_day = datetime.datetime.strptime(end_day_str, '%Y-%m-%d')
date = timeToDF(start_day, end_day)
date = date.T.reset_index(drop=True)
result = ''
for index, row in date.iterrows():
str_tmp = dataPointToStr(1, 96, str(row[0]) + '_P')
result = result + str_tmp + ','
result = result[:-1]
return result
#创建hive表
def run1():
conn = getHiveConn()
sql = '''
create table hn_power_all_1_no_lable(meter_id string,'''+ hiveFieldToStr() +''')
'''
curosr = conn.cursor()
curosr.execute(sql)
conn.close()
#创建hive外表
def run2():
conn = getHiveConn()
#写到数据所在的文件夹即可
sql = '''
create external table hn_power_all_1_no_lable(meter_id string,'''+ hiveFieldToStr() +''') row format delimited fields terminated by ',' location '/user/model/hrh/power/power_1'
'''
print(sql)
curosr = conn.cursor()
curosr.execute(sql)
conn.close()
#删除表
def run3():
conn = getHiveConn()
#写到数据所在的文件夹即可
sql = '''
drop table if exists stu2
'''
print(sql)
curosr = conn.cursor()
curosr.execute(sql)
conn.close()
#查询表数据
def run4():
conn = getHiveConn()
#写到数据所在的文件夹即可
sql = '''
select *
from hn_power_all_1_no_lable
limit 2
'''
result = readDataFromHive(conn, sql)
print(result)
conn.close()
#更新hive表元数据信息
def run5():
conn = getHiveConn()
#写到数据所在的文件夹即可
sql = '''
msck repair table stu2
'''
print(sql)
curosr = conn.cursor()
curosr.execute(sql)
conn.close()
# run2()
# run3()
# run4()
# run5()