原文链接:
http://blog.csdn.net/babyfish13/article/details/79539697
主要更新要点:
1、业务规则的更改
原先是在一起统一算;业务规则修改之后,是每一天的先算好然后再将多天的汇总。这样无形中,增加了系统的资源开销。
2、新增了python中分组汇总部分
/Users/nisj/PycharmProjects/BiDataProc/Demand/Cc0810/ccQuery_sum.py
主要更新要点:
1、业务规则的更改
原先是在一起统一算;业务规则修改之后,是每一天的先算好然后再将多天的汇总。这样无形中,增加了系统的资源开销。
2、新增了python中分组汇总部分
from itertools import groupby from operator import itemgetter groupbyAllData = groupby(sorted(fans_byself_cnt_list, key=itemgetter(0)), itemgetter(0)) for key, item in groupbyAllData: i = 0 fans_byself_cnt = 0 day7_pay_amount = 0 nextday_remain = 0 for jtem in item: i += 13、字符串转数字及非数字型字符串的例外处理
if jtem[2] == 'NULL': fans_byself_cnt_new = 0 else: fans_byself_cnt_new = jtem[2] if jtem[3] == 'NULL': day7_pay_amount_new = 0 else: day7_pay_amount_new = jtem[3] if jtem[4] == 'NULL': nextday_remain_new = 0 else: nextday_remain_new = jtem[4] fans_byself_cnt = fans_byself_cnt + int(fans_byself_cnt_new) day7_pay_amount = day7_pay_amount + int(day7_pay_amount_new) nextday_remain = nextday_remain + int(nextday_remain_new) # print key, i, fans_byself_cnt, day7_pay_amount, nextday_remain更新后的脚本如下:
/Users/nisj/PycharmProjects/BiDataProc/Demand/Cc0810/ccQuery_sum.py
# -*- coding=utf-8 -*- import datetime import time import os import warnings import sys import re reload(sys) sys.setdefaultencoding('utf8') from itertools import groupby from operator import itemgetter warnings.filterwarnings("ignore") yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates def getRoomIdAndFirstRecDates(): roomIds = os.popen("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6605 -uMysqlUser -pMysqlPass -N -e "select room_id,substr(first_rec_start_date,1,10) first_rec_start_date,substr(first_rec_end_date,1,10) first_rec_end_date,created_time \ from jellyfish_hadoop_stat.invite_anchor_sum \ where updated_time='2099-12-30 23:59:59'; \ " """).readlines(); roomId_list = [] for roomIdList in roomIds: roomId = re.split('\t', roomIdList.replace('\n', '')) roomId_list.append(roomId) return roomId_list def getFansCntUpdate2Mysql(): if time.strftime('%H', time.localtime(time.time())) == '06': # 临时表数据处理 os.system("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ drop table if exists xxxxx_room_subscriber_dislodgebat; \ create table xxxxx_room_subscriber_dislodgebat as \ select a2.room_id,a2.uid,a2.state,a2.created_time \ from (select uid,created_time \ from oss_room_subscriber_roomid \ where pt_day='{yesterday}' \ group by uid,created_time \ having count(*)=1) a1 \ inner join oss_room_subscriber_roomid a2 on a1.uid=a2.uid and a1.created_time=a2.created_time \ where a2.pt_day='{yesterday}' \ ; \ " """.format(yesterday=(datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d'))); if getRoomIdAndFirstRecDates(): # 自带粉丝数据的计算 for roomId, first_rec_start_date, first_rec_end_date, created_time in getRoomIdAndFirstRecDates(): yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d') curr_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) fans_byself_cnt_list = [] for runDay in dateRange(beginDate=first_rec_start_date, endDate=first_rec_end_date): day7After = (datetime.datetime.strptime(runDay, '%Y-%m-%d') + datetime.timedelta(days=7)).strftime('%Y-%m-%d') day1After = (datetime.datetime.strptime(runDay, '%Y-%m-%d') + datetime.timedelta(days=1)).strftime('%Y-%m-%d') fans_byself_cnts=os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ with tab_user_frist_subscriber as (select room_id,uid view_uid,state,substr(created_time,1,10) subscriber_date \ from (select room_id,uid,state,created_time,row_number()over(partition by uid order by created_time asc) rk from xxxxx_room_subscriber_dislodgebat) x \ where rk=1 and room_id={roomId} and substr(created_time,1,10)='{runDay}'), \ tab_pay_info as (select uid,sum(amount) amount \ from data_chushou_pay_info \ where state=0 and pt_day between '{runDay}' and '{day7After}' \ group by uid), \ tab_access_log as( \ select distinct RadixChange(lower(uid),16,10) uid \ from oss_bi_all_access_log \ where pt_day = '{day1After}' \ ), \ tab_newidentifier_newuser as( \ select uid \ from oss_bi_type_of_all_user \ where pt_day='{runDay}' and type=1 \ ) \ select a1.room_id,'{runDay}' runDay,count(distinct a1.view_uid) fans_byself_cnt,sum(amount) day7_pay_amount,count(distinct a3.uid) nextday_remain \ from tab_user_frist_subscriber a1 \ left join tab_pay_info a2 on a1.view_uid=a2.uid \ left join tab_access_log a3 on a1.view_uid=a3.uid \ inner join tab_newidentifier_newuser a4 on a1.view_uid=a4.uid \ group by a1.room_id; \ " """.format(runDay=runDay, day7After=day7After, day1After=day1After, yesterday=yesterday, roomId=roomId)).readlines(); # select a1.room_id,'{first_rec_start_date}' first_rec_start_date,'{first_rec_end_date}' first_rec_end_date,count(distinct a1.view_uid) fans_byself_cnt,sum(amount) rang_pay_amount,count(distinct a3.uid) nextday_remain \ for fans_byself_cntList in fans_byself_cnts: fans_byself_cnt = re.split('\t', fans_byself_cntList.replace('\n', '')) fans_byself_cnt_list.append(fans_byself_cnt) groupbyAllData = groupby(sorted(fans_byself_cnt_list, key=itemgetter(0)), itemgetter(0)) for key, item in groupbyAllData: i = 0 fans_byself_cnt = 0 day7_pay_amount = 0 nextday_remain = 0 for jtem in item: i += 1 if jtem[2] == 'NULL': fans_byself_cnt_new = 0 else: fans_byself_cnt_new = jtem[2] if jtem[3] == 'NULL': day7_pay_amount_new = 0 else: day7_pay_amount_new = jtem[3] if jtem[4] == 'NULL': nextday_remain_new = 0 else: nextday_remain_new = jtem[4] fans_byself_cnt = fans_byself_cnt + int(fans_byself_cnt_new) day7_pay_amount = day7_pay_amount + int(day7_pay_amount_new) nextday_remain = nextday_remain + int(nextday_remain_new) # print key, i, fans_byself_cnt, day7_pay_amount, nextday_remain # 数据更新到原表 os.system("""source /etc/profile; \ /usr/bin/mysql -hMysqlHost -P6605 -uMysqlUser -pMysqlPass -e "update jellyfish_hadoop_stat.invite_anchor_sum \ set fans_count={fans_byself_cnt}, \ amount={rang_pay_amount}, \ preserve={last7day_remain}, \ updated_time='{curr_time}' \ where room_id={roomId} and created_time='{created_time}'; \ " """.format(roomId=roomId, created_time=created_time, fans_byself_cnt=fans_byself_cnt, rang_pay_amount=day7_pay_amount, last7day_remain=nextday_remain, curr_time=curr_time)); # Batch Test getFansCntUpdate2Mysql()