Hive整表数据分成256分表样式导出

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/BabyFish13/article/details/81510958

不是由于mysql数据装载的需要,这样做实在太费时费力;即使用上32个并行,10来分钟就可以整场导出的表,要耗时1个半小时。
/Users/nisj/PycharmProjects/BiDataProc/love/userLevel/HiveRunData-yicheng.py

# -*- coding=utf-8 -*-
import os
import time
import datetime
import warnings
import threadpool

warnings.filterwarnings("ignore")

def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return dates

def hiveRunData(submeterPlus):
    os.system("""/usr/lib/hive-current/bin/hive -e " \
            select uid,total_curr_empval exp \
            from bitmp_all_empirical_value_store_sum \
            where pmod(uid,256)={submeterPlus} and total_curr_empval>0; \
            ">/home/hadoop/nisj/xx/yic/dataDir/user_exp_{submeterPlus}.txt """.format(submeterPlus=submeterPlus));


    os.system("""/usr/lib/hive-current/bin/hive -e " \
            select uid,0 action,total_curr_empval exp,'初始新增' memo \
            from bitmp_all_empirical_value_store_sum \
            where pmod(uid,256)={submeterPlus} and total_curr_empval>0; \
            ">/home/hadoop/nisj/xx/yic/dataDir/user_exp_record_{submeterPlus}.txt """.format(submeterPlus=submeterPlus));

# data export
# hiveRunData()
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:", now_time

parList = []
for submeterPlus in range(0, 256, 1):
    parList.append(submeterPlus)

# print parList
requests = []
request_hiveRunData = threadpool.makeRequests(hiveRunData, parList)
requests.extend(request_hiveRunData)
main_pool = threadpool.ThreadPool(32)
[main_pool.putRequest(req) for req in requests]

if __name__ == '__main__':
    while True:
        try:
            time.sleep(30)
            main_pool.poll()
        except KeyboardInterrupt:
            print("**** Interrupted!")
            break
        except threadpool.NoResultsPending:
            break

    if main_pool.dismissedWorkers:
        print("Joining all dismissed worker threads...")
        main_pool.joinAllDismissedWorkers()

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "当前时间是:", now_time

猜你喜欢

转载自blog.csdn.net/BabyFish13/article/details/81510958