python操作hive

1,读取数据

2,创建外表 

from pyhive import hive
import pandas as pd
import datetime
from datetime import timedelta
import time



#获取Hive连接
def getHiveConn():
    conn = None
    try:
        conn = hive.Connection(host='20.58.32.9', port=10000, username='hive', database='xdja')
    except Exception as e:
        print(e)
    return conn



#读取Hive数据
def readDataFromHive(conn, sql):
    curosr = conn.cursor()
    print(sql)
    curosr.execute(sql)
    result = curosr.fetchall()
    df = pd.DataFrame(result)
    return df


#关闭Hive数据连接
def closeHiveConn(conn):
    if conn != None:
        conn.close()

        
#给定开始时间和结束时间,返回时间DF
def timeToDF(start_day, end_day):
    timeDF = pd.DataFrame()
    while start_day < end_day:
        time_str = datetime.datetime.strftime(start_day, '%Y%m%d')
        start_day = start_day + timedelta(days = 1)
        timeDF = pd.concat([timeDF, pd.DataFrame([time_str])], axis = 1)
        timeDF = timeDF.reset_index(drop = True)
    return timeDF


#获取不同个数的采集点,并将其拼接成字符串
#begin:开始点数,一般取1
#pointType:采集点类型,比如96,24
#dataType:采集数据类型,比如'P'或者'R'
def dataPointToStr(begin, pointType, dataType):
    string = ''
    for i in range(begin, 97, int(96/pointType)):
        string = string + dataType + str(i) + '    string,'
    string = string[:-1]
    return string


#为创建的hive表字段拼接成字符串
def hiveFieldToStr():
    start_day_str = '2018-01-01'
    start_day = datetime.datetime.strptime(start_day_str, '%Y-%m-%d')
    end_day_str = '2019-07-01'
    end_day = datetime.datetime.strptime(end_day_str, '%Y-%m-%d')
    date = timeToDF(start_day, end_day)
    date = date.T.reset_index(drop=True)
    result = ''
    for index, row in date.iterrows():
        str_tmp = dataPointToStr(1, 96, str(row[0]) + '_P')
        result = result + str_tmp + ','
    result = result[:-1]
    return result


#创建hive表        
def run1():
    conn = getHiveConn()
    sql = '''
             create table hn_power_all_1_no_lable(meter_id  string,'''+ hiveFieldToStr() +''')
          '''
    curosr = conn.cursor()
    curosr.execute(sql)
    conn.close()
    
    
#创建hive外表
def run2():
    conn = getHiveConn()
    #写到数据所在的文件夹即可
    sql = '''
             create external table hn_power_all_1_no_lable(meter_id  string,'''+ hiveFieldToStr() +''') row format delimited fields terminated by ',' location '/user/model/hrh/power/power_1'
          '''
    print(sql)
    curosr = conn.cursor()
    curosr.execute(sql)
    conn.close()    

    
#删除表
def run3():
    conn = getHiveConn()
    #写到数据所在的文件夹即可
    sql = '''
              drop table if exists stu2
          '''
    print(sql)
    curosr = conn.cursor()
    curosr.execute(sql)
    conn.close()   

    
#查询表数据
def run4():
    conn = getHiveConn()
    #写到数据所在的文件夹即可
    sql = '''
              select * 
              from hn_power_all_1_no_lable
              limit 2
          '''
    result = readDataFromHive(conn, sql)
    print(result)
    conn.close()    
    
    
#更新hive表元数据信息
def run5():
    conn = getHiveConn()
    #写到数据所在的文件夹即可
    sql = '''
          msck repair table stu2
      '''
    print(sql)
    curosr = conn.cursor()
    curosr.execute(sql)
    conn.close()   
    
    
# run2()    
# run3()   
# run4()  
# run5()
    
    
发布了348 篇原创文章 · 获赞 210 · 访问量 87万+

猜你喜欢

转载自blog.csdn.net/u010916338/article/details/105225620