一、思路分析 ---------------------------------------------------------- 按每天的每个小时的session数量,占当天session总数的比例,乘以每天要抽取的session数量,计算出每个小时要抽取的session数量; 然后呢,在每天每小时的session中,随机抽取出之前计算出来的数量的session。 二、算法分析 ------------------------------------------------------------ 1.已有的数据: 聚合后的<sessionid,aggrinfo> rdd1 2.具体算法: a.先求出以starttime[yyyy-MM-dd_HH]为key的数据: <sessionid,aggrinfo> --- maptipair---> <starttime,sessionid> b.按key[starttime]聚合: <starttime,sessionid> --- countbykey----> <starttime,serssion_count> c.求出每小时的sessionids的占比以及需要抽取的数量 d.随机抽取相应数量的session: 获取小时段内0~session小时数量的的随机sessionid <starttime,sessionid> 应用groupbykey,形成 <starttime,[sessionids]> 遍历<starttime,[sessionids]>,遇到要抽取的sessionids,就抽取出来,写入mysql数据库 三、具体步骤 ------------------------------------------------------------- 1.生成10000个访问session,按照小时粒度随机抽取100个session。 2.计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,aggrInfo>格式的RDD JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair( new PairFunction<Tuple2<String,String>, String, String>() { private static final long serialVersionUID = 1L; public Tuple2<String, String> call( Tuple2<String, String> tuple) throws Exception { String aggrInfo = tuple._2; String startTime = StringUtils.getFieldFromConcatString( aggrInfo, "\\|", Constants.FIELD_START_TIME); String dateHour = DateUtils.getDateHour(startTime); return new Tuple2<String, String>(dateHour, aggrInfo); } }); 3.得到每天每小时的session数量 Map<String, Object> countMap = time2sessionidRDD.countByKey(); 4.将<yyyy-MM-dd_HH,count>格式的map,转换成<yyyy-MM-dd,<HH,count>>的格式,以便于后期使用 Map<String, Map<String, Long>> dateHourCountMap = new HashMap<String, Map<String, Long>>(); 5.使用按时间比例随机抽取算法,计算出每天每小时要抽取session的索引 首先明确要得到的结果:<String日期,<String小时,List<Integer>(随机数)>> 例如<date,<hour,(3,6,9,12...)>> a.取得每天抽取数,比如20个/天 String date = dateHourCountMap.key //2019-1-11 Map<hour,count> hourCountMap = dateHourCountMap.value //<[1,15],[4,20]...> int dayCount = dateHourCountMap.size //总共有多少天的sesssion int perDayCount = 100 / dayCount //总共抽取100个,均分到每天,抽取的数量 b.将每天抽取的数量,再次按照小时再分配 //计算每日总session数 foreach hourCountMap { int totleDayCount += hourCountMap.value } //计算每小时抽取数量 foreach hourCountMap { String hour = hourCountMap.key //1 long count = hourCountMap.value //15 int hourCount = count / totleDayCount * perDayCount //每小时抽取的数量 for(i:hourCount) { List<Integer> add 随机数,例如<3,6,9,12...> } 生成Map<String小时,List<Integer>(随机数)>> 例如<hour,(3,6,9,12...)> } //生成day的随机抽取数 <String日期,<String小时,List<Integer>(随机数)>>,例如 <date,<hour,(3,6,9,12...)>> c.遍历每天每小时的session,然后根据随机索引进行抽取的随机sessionid //[<yyyy-MM-dd_HH,[aggrInfo1,aggrInfo2...]>] = [<yyyy-MM-dd_HH,aggrInfo>].groupByKey(); JavaPairRDD<String, Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey(); //遍历time2sessionsRDD,每一条都是yyyy-MM-dd_HH,[aggrInfo1,aggrInfo2...]的信息 //结合之前按照小时抽取的随机数,将抽取的随机数对应成sessionid time2sessionsRDD.flatMapToPair( //获取随机数List String dateHour = tuple._1; String date = dateHour.split("_")[0]; String hour = dateHour.split("_")[1]; List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour); //遍历[aggrInfo1,aggrInfo2...],将extractIndexList的索引与[aggrInfo1,aggrInfo2...]数组索引一一对应 //得到sessionid以及sessioninfo //最后将sessioninfo通过mysql写入到表session_random_extract中 //返回rdd<sessionid,sessionid> ) d.最后一步,是用抽取出来的sessionid rdd,去join它们的访问行为明细数据,写入session表 //结果rdd的每一行的数据格式是: 抽取的sessionid,tuple<抽取的sessionid, sessioninfo_row> JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD = extractSessionidsRDD.join(sessionid2actionRDD);
电商用户行为分析大数据平台(三)-- 按照时间粒度随机抽取用户session
猜你喜欢
转载自blog.csdn.net/xcvbxv01/article/details/86510966
今日推荐
周排行