版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/someby/article/details/88727392
目录
UserVisitSessionAnalyzeSpark.java
UserVisitSessionAnalyzeSpark.java完整代码
本篇文章将记录用户访问session分析-session随机抽取之获取抽取session的明细数据。
SessionDetail表实例化
domain
SessionDetail.java
package graduation.java.domain; /** * FileName: SessionDetail * Author: hadoop * Email: [email protected] * Date: 19-3-21 下午9:31 * Description: * session_detail表实体类 */ public class SessionDetail { private long taskid; private long userid; private String sessionid; private long pageid; private String actionTime; private String seachKeyWord; private long clickCategoryId; private long clickProductId; private String orderCategoryIds; private String orderProductIds; private String payCategoryIds; private String payProductIds; public long getTaskid() { return taskid; } public void setTaskid(long taskid) { this.taskid = taskid; } public long getUserid() { return userid; } public void setUserid(long userid) { this.userid = userid; } public String getSessionid() { return sessionid; } public void setSessionid(String sessionid) { this.sessionid = sessionid; } public long getPageid() { return pageid; } public void setPageid(long pageid) { this.pageid = pageid; } public String getActionTime() { return actionTime; } public void setActionTime(String actionTime) { this.actionTime = actionTime; } public String getSeachKeyWord() { return seachKeyWord; } public void setSeachKeyWord(String seachKeyWord) { this.seachKeyWord = seachKeyWord; } public long getClickCategoryId() { return clickCategoryId; } public void setClickCategoryId(long clickCategoryId) { this.clickCategoryId = clickCategoryId; } public long getClickProductId() { return clickProductId; } public void setClickProductId(long clickProductId) { this.clickProductId = clickProductId; } public String getOrderCategoryIds() { return orderCategoryIds; } public void setOrderCategoryIds(String orderCategoryIds) { this.orderCategoryIds = orderCategoryIds; } public String getOrderProductIds() { return orderProductIds; } public void setOrderProductIds(String orderProductIds) { this.orderProductIds = orderProductIds; } public String getPayCategoryIds() { return payCategoryIds; } public void setPayCategoryIds(String payCategoryIds) { this.payCategoryIds = payCategoryIds; } public String getPayProductIds() { return payProductIds; } public void setPayProductIds(String payProductIds) { this.payProductIds = payProductIds; } @Override public String toString() { return "SessionDetail{" + "taskid=" + taskid + ", userid=" + userid + ", sessionid='" + sessionid + '\'' + ", pageid=" + pageid + ", actionTime='" + actionTime + '\'' + ", seachKeyWord='" + seachKeyWord + '\'' + ", clickCategoryId=" + clickCategoryId + ", clickProductId=" + clickProductId + ", orderCategoryIds='" + orderCategoryIds + '\'' + ", orderProductIds='" + orderProductIds + '\'' + ", payCategoryIds='" + payCategoryIds + '\'' + ", payProductIds='" + payProductIds + '\'' + '}'; } }
dao
ISessionDetailDAO.java
package graduation.java.dao; import graduation.java.domain.SessionDetail; /** * FileName: ISessionDetailDAO * Author: hadoop * Email: [email protected] * Date: 19-3-21 下午9:38 * Description: * Session明细DAO接口 */ public interface ISessionDetailDAO { /** * 出入一条session明细 * @param sessionDetail */ void insert(SessionDetail sessionDetail); }
impl
SessionDetailImpl.java
package graduation.java.impl; import graduation.java.dao.ISessionDetailDAO; import graduation.java.domain.SessionDetail; import graduation.java.jdbc.JDBCHelper; /** * FileName: SessionDetailImpl * Author: hadoop * Email: [email protected] * Date: 19-3-21 下午9:39 * Description: * session明细DAO实现类 */ public class SessionDetailImpl implements ISessionDetailDAO { /** * 插入一条session明细 * @param sessionDetail */ public void insert(SessionDetail sessionDetail){ String sql = "insert into session_detail values(?,?,?,?,?,?,?,?,?,?,?,?)"; Object[] param = new Object[]{ sessionDetail.getTaskid(), sessionDetail.getSessionid(), sessionDetail.getPageid(), sessionDetail.getActionTime(), sessionDetail.getSeachKeyWord(), sessionDetail.getClickCategoryId(), sessionDetail.getClickProductId(), sessionDetail.getOrderCategoryIds(), sessionDetail.getOrderProductIds(), sessionDetail.getPayCategoryIds(), sessionDetail.getPayProductIds() }; JDBCHelper jdbcHelper = JDBCHelper.getInstance(); jdbcHelper.executeUpdate(sql,param); } }
spark
UserVisitSessionAnalyzeSpark.java
(加入的内容)
// 如果要进行session粒度的数据聚合 // 首先要从user_visit_action表中,查询出来指定日期范围内的行为数据 JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext,taskParam); JavaPairRDD<String,Row> session2actionRDD = getSessionid2ActionRDD(actionRDD);
randomExtractSession(task.getTaskId(),filteredSessionid2AggrInfoRDD,session2actionRDD);
/** * 获取sessionid到访问行为数据的映射的RDD * @param actionRDD * @return */ public static JavaPairRDD<String,Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD){ return actionRDD.mapToPair(new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Row> call(Row row) throws Exception { return new Tuple2<String,Row>(row.getString(2),row); } }); }
/** * 随机抽取session * @param sessionid2AggrInfoRDD */ private static void randomExtractSession( final long taskid, JavaPairRDD<String, String> sessionid2AggrInfoRDD, JavaPairRDD<String,Row> sessionid2actionRDD) { // 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,sessionid>格式的RDD JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair( new PairFunction<Tuple2<String,String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call( Tuple2<String, String> tuple) throws Exception { String aggrInfo = tuple._2; String startTime = StringUtils.getFieldFromConcatString( aggrInfo, "\\|", Constants.FIELD_START_TIME); String dateHour = DateUtils.getDateHour(startTime); return new Tuple2<String, String>(dateHour, aggrInfo); } }); /** * 思考一下:这里我们不要着急写大量的代码,做项目的时候,一定要用脑子多思考 * * 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session * 首先抽取出的session的聚合数据,写入session_random_extract表 * 所以第一个RDD的value,应该是session聚合数据 * */ // 得到每天每小时的session数量 Map<String, Long> countMap = time2sessionidRDD.countByKey(); //第二步,使用按时间比例随机抽取算法,计算出每天每小时需要抽取session的索引 //将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>> Map<String,Map<String,Long>> dateHourCountMap = new HashMap<String,Map<String,Long>>(); for (Map.Entry<String, Long> countEntry : countMap.entrySet()){ String dateHour = countEntry.getKey(); String date = dateHour.split("_")[0]; String hour = dateHour.split("_")[1]; long count = countEntry.getValue(); Map<String,Long> hourCountMap = dateHourCountMap.get(date); if (hourCountMap ==null){ hourCountMap = new HashMap<String,Long>(); dateHourCountMap.put(date,hourCountMap); } dateHourCountMap.put(date,hourCountMap); } //开始实现按时间比例随机抽取算法 //总共要抽取100个session,按照天数,进行平分 int extractNumberPerDay = 100 /dateHourCountMap.size(); //<date,<hour,(1,3,4,2103)>> Map<String,Map<String, List<Integer>>> dateHourExtractMap = new HashMap<String,Map<String,List<Integer>>>(); Random random = new Random(); for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){ String date = dateHourCountEntry.getKey(); Map<String,Long> hourCountMap = dateHourCountEntry.getValue(); //计算出每天的session总数 long sessionCount = 0L; for (long hourCount : hourCountMap.values()){ sessionCount += hourCount; } Map<String,List<Integer>> hourExtractMap = dateHourExtractMap.get(date); if (hourExtractMap == null){ hourExtractMap = new HashMap<String,List<Integer>>(); dateHourExtractMap.put(date,hourExtractMap); } //遍历每一个小时 for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){ String hour = hourCountEntry.getKey(); long count = hourCountEntry.getValue(); // 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量 // 就可以计算出,当前小时需要抽取的session数量 int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay; if (hourExtractNumber > count){ hourExtractNumber = (int)count; } //先获取当前小时的存放随机数的list List<Integer> extractIndexList = hourExtractMap.get(hour); if (extractIndexList == null){ extractIndexList = new ArrayList<Integer>(); hourExtractMap.put(hour,extractIndexList); } //生成上面计算出来的数量的随机数 for (int i = 0; i < hourExtractNumber;i++){ int extractIndex = random.nextInt((int)count); while (extractIndexList.contains(extractIndex)){ extractIndex = random.nextInt((int)count); } extractIndexList.add(extractIndex); } } } /** * 第三步:遍历每天每小时的session,然后根据随机索引进行抽取 */ // 执行groupByKey算子,得到<dateHour,(session aggrInfo)> JavaPairRDD<String,Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey(); // 我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据 // 然后呢,会遍历每天每小时的session // 如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上 // 那么抽取该session,直接写入MySQL的random_extract_session表 // 将抽取出来的session id返回回来,形成一个新的JavaRDD<String> // 然后最后一步,是用抽取出来的sessionid,去join它们的访问行为明细数据,写入session表 JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair( new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<String, String>> call( Tuple2<String, Iterable<String>> tuple) throws Exception { List<Tuple2<String, String>> extractSessionids = new ArrayList<Tuple2<String, String>>(); String dateHour = tuple._1; String date = dateHour.split("_")[0]; String hour = dateHour.split("_")[1]; Iterator<String> iterator = tuple._2.iterator(); List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour); ISessionRandomExtractDAO sessionRandomExtractDAO = DAOFactory.getSessionRandomExtractDAO(); int index = 0; while(iterator.hasNext()) { String sessionAggrInfo = iterator.next(); if(extractIndexList.contains(index)) { String sessionid = StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID); // 将数据写入MySQL SessionRandomExtract sessionRandomExtract = new SessionRandomExtract(); sessionRandomExtract.setTaskid(taskid); sessionRandomExtract.setSessionid(sessionid); sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_START_TIME)); sessionRandomExtract.setSerachKeyWords(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)); sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)); sessionRandomExtractDAO.insert(sessionRandomExtract); // 将sessionid加入list extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid)); } index++; } return (Iterator<Tuple2<String, String>>) extractSessionids; } }); /** * 第四步:获取抽取出来的session的明细数据 */ JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD = extractSessionidsRDD.join(sessionid2actionRDD); extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception { Row row = tuple._2._2; SessionDetail sessionDetail = new SessionDetail(); sessionDetail.setTaskid(taskid); sessionDetail.setUserid(row.getLong(0)); sessionDetail.setSessionid(row.getString(1)); sessionDetail.setPageid(row.getLong(2)); sessionDetail.setActionTime(row.getString(3)); sessionDetail.setSeachKeyWord(row.getString(4)); sessionDetail.setClickCategoryId(row.getLong(5)); sessionDetail.setClickProductId(row.getLong(6)); sessionDetail.setOrderCategoryIds(row.getString(7)); sessionDetail.setOrderProductIds(row.getString(8)); sessionDetail.setPayCategoryIds(row.getString(9)); sessionDetail.setPayProductIds(row.getString(11)); ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO(); sessionDetailDAO.insert(sessionDetail); } }); }
UserVisitSessionAnalyzeSpark.java完整代码
package graduation.java.spark; /** * FileName: UserVisitSessionAnlyizSpark * Author: hadoop * Email: [email protected] * Date: 19-3-1 上午10:41 * Description: * 用户访问session分析Spark作业 * * 接收用户创建的分析任务,用户可能指定的条件如下: * * 1、时间范围:起始日期~结束日期 * 2、性别:男或女 * 3、年龄范围 * 4、职业:多选 * 5、城市:多选 * 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件 * 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件 * * 我们的spark作业如何接受用户创建的任务? * * J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param * 字段中 * * 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本 * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数 * 参数就封装在main函数的args数组中 * * 这是spark本身提供的特性 */ import com.alibaba.fastjson.JSONObject; import graduation.java.conf.ConfigurationManager; import graduation.java.constant.Constants; import graduation.java.dao.ISessionAggrStatDAO; import graduation.java.dao.ISessionDetailDAO; import graduation.java.dao.ISessionRandomExtractDAO; import graduation.java.dao.ITaskDAO; import graduation.java.domain.SessionAggrStat; import graduation.java.domain.SessionDetail; import graduation.java.domain.SessionRandomExtract; import graduation.java.domain.Task; import graduation.java.impl.DAOFactory; import graduation.java.test.MockData; import graduation.java.util.*; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.HiveContext; import scala.Tuple2; import org.apache.spark.sql.SQLContext; import java.util.*; /** * 用户访问session分析Spark作业 * * */ public class UserVisitSessionAnalyzeSpark { public static void main(String[] args) { Logger.getLogger("org").setLevel(Level.ERROR); args = new String[]{"1"}; // 构建Spark上下文 SparkConf conf = new SparkConf() .setAppName(Constants.SPARK_APP_NAME_SESSION) .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = getSQLContext(sc.sc()); // 生成模拟测试数据 mockData(sc, sqlContext); //创建需要使用的DAO组件 ITaskDAO taskDAO = DAOFactory.getTaskDAO(); //首先查询书来指定的任务,并获取任务查询参数 long taskid = ParamUtils.getTaskIdFromArgs(args); Task task = taskDAO.findById(taskid); //测试 //System.out.println("taskId: "+taskid); JSONObject taskParam = JSONObject.parseObject(task.getTaskParam()); //测试 /*if (taskParam !=null){ System.out.println("taskParam is "+ taskParam.values()); }*/ // 如果要进行session粒度的数据聚合 // 首先要从user_visit_action表中,查询出来指定日期范围内的行为数据 JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext,taskParam); JavaPairRDD<String,Row> session2actionRDD = getSessionid2ActionRDD(actionRDD); /*System.out.println("*****************************************"); actionRDD.rdd().count(); System.out.println("*****************************************"); for (Row row : actionRDD.take(10)){ System.out.println(row.toString()); }*/ // 首先,可以将行为数据,按照session_id进行groupByKey分组 // 此时的数据的粒度就是session粒度了,然后呢,可以将session粒度的数据 // 与用户信息数据,进行join // 然后就可以获取到session粒度的数据,同时呢,数据里面还包含了session对应的user的信息 //测试 //System.out.println("sessionid2AggrInfoRDD"); JavaPairRDD<String,String> sessionid2AggrInfoRDD = aggregateBySession(sqlContext,actionRDD); //测试 //sessionid2AggrInfoRDD.count(); /* System.out.println("**************sessionid2AggrInfoRDD**************"); for (Tuple2<String,String> tuple2 : sessionid2AggrInfoRDD.take(10)){ System.out.println(tuple2.toString()); } System.out.println("**************sessionid2AggrInfoRDD**************");*/ // 接着,就要针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤 // 相当于我们自己编写的算子,是要访问外面的任务参数对象的 // 所以,大家记得我们之前说的,匿名内部类(算子函数),访问外部对象,是要给外部对象使用final修饰的 // 重构,同时进行过滤和统计 Accumulator<String> sessionAggrStatAccumulator = sc.accumulator( "", new SessionAggrStatAccumulator()); JavaPairRDD<String,String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(sessionid2AggrInfoRDD,taskParam,sessionAggrStatAccumulator); //测试 /* System.out.println("\n\n\n\n\filteredSessionid2InfoRDD"); filteredSessionid2AggrInfoRDD.count(); for(Tuple2<String,String> tuple : filteredSessionid2AggrInfoRDD.take(10)){ System.out.println(tuple); }*/ /** * 对于Accumulator这种分布式累加计算的变量的使用,有一个重要说明 * * 从Accumulator中,获取数据,插入数据库的时候,一定要,一定要,是在有某一个action操作以后 * 再进行。。。 * * 如果没有action的话,那么整个程序根本不会运行。。。 * * 是不是在calculateAndPersisitAggrStat方法之后,运行一个action操作,比如count、take * 不对!!! * * 必须把能够触发job执行的操作,放在最终写入MySQL方法之前 * * 计算出来的结果,在J2EE中,是怎么显示的,是用两张柱状图显示 */ /*System.out.println(filteredSessionid2AggrInfoRDD.count());*/ /** * 特别说明 * 我们知道,要将上一个功能的session聚合统计数据获取到,就必须是在一个action操作触发job之后 * 才能从Accumulator中获取数据,否则是获取不到数据的,因为没有job执行,Accumulator的值为空 * 所以,我们在这里,将随机抽取的功能的实现代码,放在session聚合统计功能的最终计算和写库之前 * 因为随机抽取功能中,有一个countByKey算子,是action操作,会触发job */ randomExtractSession(task.getTaskId(),filteredSessionid2AggrInfoRDD,session2actionRDD); //计算出各个范围的session占比,并写入Mysql // calculateAndPersistAggrStat(sessionAggrStatAccumulator.value(),task.getTaskId()); /** * session聚合统计(统计出访问时长和访问步长,各个区间的session数量占总session数量的比例) * * 如果不进行重构,直接来实现,思路: * 1、actionRDD,映射成<sessionid,Row>的格式 * 2、按sessionid聚合,计算出每个session的访问时长和访问步长,生成一个新的RDD * 3、遍历新生成的RDD,将每个session的访问时长和访问步长,去更新自定义Accumulator中的对应的值 * 4、使用自定义Accumulator中的统计值,去计算各个区间的比例 * 5、将最后计算出来的结果,写入MySQL对应的表中 * * 普通实现思路的问题: * 1、为什么还要用actionRDD,去映射?其实我们之前在session聚合的时候,映射已经做过了。多此一举 * 2、是不是一定要,为了session的聚合这个功能,单独去遍历一遍session?其实没有必要,已经有session数据 * 之前过滤session的时候,其实,就相当于,是在遍历session,那么这里就没有必要再过滤一遍了 * * 重构实现思路: * 1、不要去生成任何新的RDD(处理上亿的数据) * 2、不要去单独遍历一遍session的数据(处理上千万的数据) * 3、可以在进行session聚合的时候,就直接计算出来每个session的访问时长和访问步长 * 4、在进行过滤的时候,本来就要遍历所有的聚合session信息,此时,就可以在某个session通过筛选条件后 * 将其访问时长和访问步长,累加到自定义的Accumulator上面去 * 5、就是两种截然不同的思考方式,和实现方式,在面对上亿,上千万数据的时候,甚至可以节省时间长达 * 半个小时,或者数个小时 * * 开发Spark大型复杂项目的一些经验准则: * 1、尽量少生成RDD * 2、尽量少对RDD进行算子操作,如果有可能,尽量在一个算子里面,实现多个需要做的功能 * 3、尽量少对RDD进行shuffle算子操作,比如groupByKey、reduceByKey、sortByKey(map、mapToPair) * shuffle操作,会导致大量的磁盘读写,严重降低性能 * 有shuffle的算子,和没有shuffle的算子,甚至性能,会达到几十分钟,甚至数个小时的差别 * 有shfufle的算子,很容易导致数据倾斜,一旦数据倾斜,简直就是性能杀手(完整的解决方案) * 4、无论做什么功能,性能第一 * 在传统的J2EE或者.NET后者PHP,软件/系统/网站开发中,我认为是架构和可维护性,可扩展性的重要 * 程度,远远高于了性能,大量的分布式的架构,设计模式,代码的划分,类的划分(高并发网站除外) * * 在大数据项目中,比如MapReduce、Hive、Spark、Storm,我认为性能的重要程度,远远大于一些代码 * 的规范,和设计模式,代码的划分,类的划分;大数据,大数据,最重要的,就是性能 * 主要就是因为大数据以及大数据项目的特点,决定了,大数据的程序和项目的速度,都比较慢 * 如果不优先考虑性能的话,会导致一个大数据处理程序运行时间长度数个小时,甚至数十个小时 * 此时,对于用户体验,简直就是一场灾难 * * 所以,推荐大数据项目,在开发和代码的架构中,优先考虑性能;其次考虑功能代码的划分、解耦合 * * 我们如果采用第一种实现方案,那么其实就是代码划分(解耦合、可维护)优先,设计优先 * 如果采用第二种方案,那么其实就是性能优先 * * 讲了这么多,其实大家不要以为我是在岔开话题,大家不要觉得项目的课程,就是单纯的项目本身以及 * 代码coding最重要,其实项目,我觉得,最重要的,除了技术本身和项目经验以外;非常重要的一点,就是 * 积累了,处理各种问题的经验 * */ // 关闭Spark上下文 sc.close(); } /** * 获取SQLContext * 如果是在本地测试环境的话,那么就生成SQLContext对象 * 如果是在生产环境运行的话,那么就生成HiveContext对象 * @param sc SparkContext * @return SQLContext */ private static SQLContext getSQLContext(SparkContext sc) { boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if(local) { return new SQLContext(sc); } else { return new HiveContext(sc); } } /** * 生成模拟数据(只有本地模式,才会去生成模拟数据) * @param sc * @param sqlContext */ private static void mockData(JavaSparkContext sc, SQLContext sqlContext) { boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if(local) { MockData.mock(sc, sqlContext); } } /** * 获取指定日期范围内的用户访问行为数据 * @param sqlContext SQLContext * @param taskParam 任务参数 * @return 行为数据RDD */ private static JavaRDD<Row> getActionRDDByDateRange( SQLContext sqlContext, JSONObject taskParam) { String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE); String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE); String sql = "select * " + "from user_visit_action " + "where date>='" + startDate + "' " + "and date<='" + endDate + "'"; Dataset actionDF = sqlContext.sql(sql); System.out.println("actionDF"); actionDF.show(10); return actionDF.javaRDD(); } /** * 获取sessionid到访问行为数据的映射的RDD * @param actionRDD * @return */ public static JavaPairRDD<String,Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD){ return actionRDD.mapToPair(new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Row> call(Row row) throws Exception { return new Tuple2<String,Row>(row.getString(2),row); } }); } /** * 对行为数据按session粒度进行聚合 * @param actionRDD 行为数据RDD * @return session粒度聚合数据 */ private static JavaPairRDD<String, String> aggregateBySession( SQLContext sqlContext, JavaRDD<Row> actionRDD) { // 现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索 // 我们现在需要将这个Row映射成<sessionid,Row>的格式 JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair( /** * PairFunction * 第一个参数,相当于是函数的输入 * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值 */ new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Row> call(Row row) throws Exception { return new Tuple2<String, Row>(row.getString(2), row); } }); // 对行为数据按session粒度进行分组 JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD = sessionid2ActionRDD.groupByKey(); // 对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来 // 到此为止,获取的数据格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)> JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair( new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple) throws Exception { String sessionid = tuple._1; Iterator<Row> iterator = tuple._2.iterator(); StringBuffer searchKeywordsBuffer = new StringBuffer(""); StringBuffer clickCategoryIdsBuffer = new StringBuffer(""); Long userid = null; // session的起始和结束时间 Date startTime = null; Date endTime = null; // session的访问步长 int stepLength = 0; // 遍历session所有的访问行为 while(iterator.hasNext()) { // 提取每个访问行为的搜索词字段和点击品类字段 Row row = iterator.next(); if(userid == null) { userid = row.getLong(1); } System.out.println("row.toString"+row.toString()); String searchKeyword = row.getString(5); Long clickCategoryId = row.getLong(6); //String clickCategoryId = String.valueOf(row.getLong(6)); // 实际上这里要对数据说明一下 // 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的 // 其实,只有搜索行为,是有searchKeyword字段的 // 只有点击品类的行为,是有clickCategoryId字段的 // 所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的 // 我们决定是否将搜索词或点击品类id拼接到字符串中去 // 首先要满足:不能是null值 // 其次,之前的字符串中还没有搜索词或者点击品类id if(StringUtils.isNotEmpty(searchKeyword)) { if(!searchKeywordsBuffer.toString().contains(searchKeyword)) { searchKeywordsBuffer.append(searchKeyword + ","); } } if(clickCategoryId != Long.MAX_VALUE) { if(!clickCategoryIdsBuffer.toString().contains( String.valueOf(clickCategoryId))) { clickCategoryIdsBuffer.append(clickCategoryId + ","); } } // 计算session开始和结束时间 Date actionTime = DateUtils.parseTime(row.getString(4)); if(startTime == null) { startTime = actionTime; } if(endTime == null) { endTime = actionTime; } if(actionTime.before(startTime)) { startTime = actionTime; } if(actionTime.after(endTime)) { endTime = actionTime; } // 计算session访问步长 stepLength++; } String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString()); String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString()); // 计算session访问时长(秒) long visitLength = (endTime.getTime() - startTime.getTime()) / 1000; // 大家思考一下 // 我们返回的数据格式,即使<sessionid,partAggrInfo> // 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合 // 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid // 就应该是userid,才能够跟<userid,Row>格式的用户信息进行聚合 // 如果我们这里直接返回<sessionid,partAggrInfo>,还得再做一次mapToPair算子 // 将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举 // 所以,我们这里其实可以直接,返回的数据格式,就是<userid,partAggrInfo> // 然后跟用户信息join的时候,将partAggrInfo关联上userInfo // 然后再直接将返回的Tuple的key设置成sessionid // 最后的数据格式,还是<sessionid,fullAggrInfo> // 聚合数据,用什么样的格式进行拼接? // 我们这里统一定义,使用key=value|key=value String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|" + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|" + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|" + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|" + Constants.FIELD_STEP_LENGTH + "=" + stepLength +Constants.FIELD_START_TIME + "=" +DateUtils.formateTime(startTime); //测试 //System.out.println("partAggrInfo: "+ partAggrInfo); return new Tuple2<Long, String>(userid, partAggrInfo); } }); // 查询所有用户数据,并映射成<userid,Row>的格式 String sql = "select * from user_info"; JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD(); JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair( new PairFunction<Row, Long, Row>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Long, Row> call(Row row) throws Exception { return new Tuple2<Long, Row>(row.getLong(0), row); } }); // 将session粒度聚合数据,与用户信息进行join JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD = userid2PartAggrInfoRDD.join(userid2InfoRDD); // 对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据 JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair( new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call( Tuple2<Long, Tuple2<String, Row>> tuple) throws Exception { String partAggrInfo = tuple._2._1; Row userInfoRow = tuple._2._2; String sessionid = StringUtils.getFieldFromConcatString( partAggrInfo, "\\|", Constants.FIELD_SESSION_ID); int age = userInfoRow.getInt(3); String professional = userInfoRow.getString(4); String city = userInfoRow.getString(5); String sex = userInfoRow.getString(6); String fullAggrInfo = partAggrInfo + "|" + Constants.FIELD_AGE + "=" + age + "|" + Constants.FIELD_PROFESSIONAL + "=" + professional + "|" + Constants.FIELD_CITY + "=" + city + "|" + Constants.FIELD_SEX + "=" + sex; return new Tuple2<String, String>(sessionid, fullAggrInfo); } }); return sessionid2FullAggrInfoRDD; } /** * 过滤session数据 * @param sessionid2AggrInfoRDD * @return */ private static JavaPairRDD<String, String> filterSessionAndAggrStat( JavaPairRDD<String, String> sessionid2AggrInfoRDD, final JSONObject taskParam, final Accumulator sessionAggrStatAccumulator) { // 为了使用我们后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串 // 此外,这里其实大家不要觉得是多此一举 // 其实我们是给后面的性能优化埋下了一个伏笔 String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE); String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE); String professionals = ParamUtils.getParam(taskParam, Constants.PARAM_PROFESSIONALS); String cities = ParamUtils.getParam(taskParam, Constants.PARAM_CITIES); String sex = ParamUtils.getParam(taskParam, Constants.PARAM_SEX); String keywords = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS); String categoryIds = ParamUtils.getParam(taskParam, Constants.PARAM_CATEGORY_IDS); String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "") + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "") + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "") + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "") + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "") + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "") + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds: ""); if(_parameter.endsWith("\\|")) { _parameter = _parameter.substring(0, _parameter.length() - 1); } final String parameter = _parameter; // 根据筛选参数进行过滤 JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter( new Function<Tuple2<String,String>, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, String> tuple) throws Exception { // 首先,从tuple中,获取聚合数据 String aggrInfo = tuple._2; // 接着,依次按照筛选条件进行过滤 // 按照年龄范围进行过滤(startAge、endAge) if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE, parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) { return false; } // 按照职业范围进行过滤(professionals) // 互联网,IT,软件 // 互联网 if(!ValidUtils.in(aggrInfo, Constants.FIELD_PROFESSIONAL, parameter, Constants.PARAM_PROFESSIONALS)) { return false; } // 按照城市范围进行过滤(cities) // 北京,上海,广州,深圳 // 成都 if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY, parameter, Constants.PARAM_CITIES)) { return false; } // 按照性别进行过滤 // 男/女 // 男,女 if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX, parameter, Constants.PARAM_SEX)) { return false; } // 按照搜索词进行过滤 // 我们的session可能搜索了 火锅,蛋糕,烧烤 // 我们的筛选条件可能是 火锅,串串香,iphone手机 // 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中 // 任何一个搜索词相当,即通过 if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEARCH_KEYWORDS, parameter, Constants.PARAM_KEYWORDS)) { return false; } // 按照点击品类id进行过滤 if(!ValidUtils.in(aggrInfo, Constants.FIELD_CLICK_CATEGORY_IDS, parameter, Constants.PARAM_CATEGORY_IDS)) { return false; } // 如果经过了之前的多个过滤条件之后,程序能够走到这里 // 那么就说明,该session是通过了用户指定的筛选条件的,也就是需要保留的session // 那么就要对session的访问时长和访问步长,进行统计,根据session对应的范围 // 进行相应的累加计数 // 主要走到这一步,那么就是需要计数的session sessionAggrStatAccumulator.add(Constants.SESSION_COUNT); //计算出session的访问时间和步长的范围,并进行想要的累加 long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(aggrInfo,"\\|",Constants.FIELD_VISIT_LENGTH)); long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(aggrInfo,"\\|",Constants.FIELD_STEP_LENGTH)); calculateVisitLength(visitLength); calculateStepLength(stepLength); return true; } /** * 计算访问时间范围 * @param visitLength */ private void calculateVisitLength(long visitLength){ if (visitLength >=1 && visitLength <= 3){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s); } else if (visitLength >=4 && visitLength <= 6){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s); } else if (visitLength >=7 && visitLength <= 9){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s); } else if (visitLength >=10 && visitLength <= 30){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s); } else if (visitLength > 30 && visitLength <= 60){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s); } else if (visitLength > 60 && visitLength <= 180){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m); } else if (visitLength > 180 && visitLength <= 600){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m); } else if (visitLength > 600 && visitLength <= 1800){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m); } else if (visitLength > 1800){ sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m); } } /** * 计算访问步长范围 * @param stepLength */ private void calculateStepLength(long stepLength){ if (stepLength >= 1 && stepLength <= 3){ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3); } else if(stepLength >= 4 && stepLength <= 6){ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6); } else if (stepLength >= 7 && stepLength <= 9){ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9); } else if(stepLength >= 10 && stepLength <= 30){ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30); } else if(stepLength > 30 && stepLength <= 60){ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60); } else if (stepLength > 60){ sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60); } } }); return filteredSessionid2AggrInfoRDD; } /** * 随机抽取session * @param sessionid2AggrInfoRDD */ private static void randomExtractSession( final long taskid, JavaPairRDD<String, String> sessionid2AggrInfoRDD, JavaPairRDD<String,Row> sessionid2actionRDD) { // 第一步,计算出每天每小时的session数量,获取<yyyy-MM-dd_HH,sessionid>格式的RDD JavaPairRDD<String, String> time2sessionidRDD = sessionid2AggrInfoRDD.mapToPair( new PairFunction<Tuple2<String,String>, String, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, String> call( Tuple2<String, String> tuple) throws Exception { String aggrInfo = tuple._2; String startTime = StringUtils.getFieldFromConcatString( aggrInfo, "\\|", Constants.FIELD_START_TIME); String dateHour = DateUtils.getDateHour(startTime); return new Tuple2<String, String>(dateHour, aggrInfo); } }); /** * 思考一下:这里我们不要着急写大量的代码,做项目的时候,一定要用脑子多思考 * * 每天每小时的session数量,然后计算出每天每小时的session抽取索引,遍历每天每小时session * 首先抽取出的session的聚合数据,写入session_random_extract表 * 所以第一个RDD的value,应该是session聚合数据 * */ // 得到每天每小时的session数量 Map<String, Long> countMap = time2sessionidRDD.countByKey(); //第二步,使用按时间比例随机抽取算法,计算出每天每小时需要抽取session的索引 //将<yyyy-MM-dd_HH,count>格式的map,转换为<yyyy-MM-dd,<HH,count>> Map<String,Map<String,Long>> dateHourCountMap = new HashMap<String,Map<String,Long>>(); for (Map.Entry<String, Long> countEntry : countMap.entrySet()){ String dateHour = countEntry.getKey(); String date = dateHour.split("_")[0]; String hour = dateHour.split("_")[1]; long count = countEntry.getValue(); Map<String,Long> hourCountMap = dateHourCountMap.get(date); if (hourCountMap ==null){ hourCountMap = new HashMap<String,Long>(); dateHourCountMap.put(date,hourCountMap); } dateHourCountMap.put(date,hourCountMap); } //开始实现按时间比例随机抽取算法 //总共要抽取100个session,按照天数,进行平分 int extractNumberPerDay = 100 /dateHourCountMap.size(); //<date,<hour,(1,3,4,2103)>> Map<String,Map<String, List<Integer>>> dateHourExtractMap = new HashMap<String,Map<String,List<Integer>>>(); Random random = new Random(); for (Map.Entry<String,Map<String,Long>> dateHourCountEntry : dateHourCountMap.entrySet()){ String date = dateHourCountEntry.getKey(); Map<String,Long> hourCountMap = dateHourCountEntry.getValue(); //计算出每天的session总数 long sessionCount = 0L; for (long hourCount : hourCountMap.values()){ sessionCount += hourCount; } Map<String,List<Integer>> hourExtractMap = dateHourExtractMap.get(date); if (hourExtractMap == null){ hourExtractMap = new HashMap<String,List<Integer>>(); dateHourExtractMap.put(date,hourExtractMap); } //遍历每一个小时 for (Map.Entry<String,Long> hourCountEntry : hourCountMap.entrySet()){ String hour = hourCountEntry.getKey(); long count = hourCountEntry.getValue(); // 计算每个小时的session数量,占据当天总session数量的比例,直接乘以每天要抽取的数量 // 就可以计算出,当前小时需要抽取的session数量 int hourExtractNumber = (int)((double)count/(double) sessionCount)*extractNumberPerDay; if (hourExtractNumber > count){ hourExtractNumber = (int)count; } //先获取当前小时的存放随机数的list List<Integer> extractIndexList = hourExtractMap.get(hour); if (extractIndexList == null){ extractIndexList = new ArrayList<Integer>(); hourExtractMap.put(hour,extractIndexList); } //生成上面计算出来的数量的随机数 for (int i = 0; i < hourExtractNumber;i++){ int extractIndex = random.nextInt((int)count); while (extractIndexList.contains(extractIndex)){ extractIndex = random.nextInt((int)count); } extractIndexList.add(extractIndex); } } } /** * 第三步:遍历每天每小时的session,然后根据随机索引进行抽取 */ // 执行groupByKey算子,得到<dateHour,(session aggrInfo)> JavaPairRDD<String,Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey(); // 我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据 // 然后呢,会遍历每天每小时的session // 如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上 // 那么抽取该session,直接写入MySQL的random_extract_session表 // 将抽取出来的session id返回回来,形成一个新的JavaRDD<String> // 然后最后一步,是用抽取出来的sessionid,去join它们的访问行为明细数据,写入session表 JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair( new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<String, String>> call( Tuple2<String, Iterable<String>> tuple) throws Exception { List<Tuple2<String, String>> extractSessionids = new ArrayList<Tuple2<String, String>>(); String dateHour = tuple._1; String date = dateHour.split("_")[0]; String hour = dateHour.split("_")[1]; Iterator<String> iterator = tuple._2.iterator(); List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour); ISessionRandomExtractDAO sessionRandomExtractDAO = DAOFactory.getSessionRandomExtractDAO(); int index = 0; while(iterator.hasNext()) { String sessionAggrInfo = iterator.next(); if(extractIndexList.contains(index)) { String sessionid = StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID); // 将数据写入MySQL SessionRandomExtract sessionRandomExtract = new SessionRandomExtract(); sessionRandomExtract.setTaskid(taskid); sessionRandomExtract.setSessionid(sessionid); sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_START_TIME)); sessionRandomExtract.setSerachKeyWords(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)); sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)); sessionRandomExtractDAO.insert(sessionRandomExtract); // 将sessionid加入list extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid)); } index++; } return (Iterator<Tuple2<String, String>>) extractSessionids; } }); /** * 第四步:获取抽取出来的session的明细数据 */ JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD = extractSessionidsRDD.join(sessionid2actionRDD); extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception { Row row = tuple._2._2; SessionDetail sessionDetail = new SessionDetail(); sessionDetail.setTaskid(taskid); sessionDetail.setUserid(row.getLong(0)); sessionDetail.setSessionid(row.getString(1)); sessionDetail.setPageid(row.getLong(2)); sessionDetail.setActionTime(row.getString(3)); sessionDetail.setSeachKeyWord(row.getString(4)); sessionDetail.setClickCategoryId(row.getLong(5)); sessionDetail.setClickProductId(row.getLong(6)); sessionDetail.setOrderCategoryIds(row.getString(7)); sessionDetail.setOrderProductIds(row.getString(8)); sessionDetail.setPayCategoryIds(row.getString(9)); sessionDetail.setPayProductIds(row.getString(11)); ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO(); sessionDetailDAO.insert(sessionDetail); } }); } /** * 计算各session范围占比,并写入MySQL * @param value * @param taskId */ private static void calculateAndPersistAggrStat(String value, long taskId) { //从Accumulate统计串中获取值 long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.SESSION_COUNT)); long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1s_3s)); long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_4s_6s)); long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_7s_9s)); long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10s_30s)); long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30s_60s)); long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_1m_3m)); long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_3m_10m)); long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_10m_30m)); long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.TIME_PERIOD_30m)); long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_1_3)); long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_4_6)); long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_7_9)); long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_10_30)); long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_30_60)); long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(value,"\\|",Constants.STEP_PERIOD_60)); //计算各个访问时长和步长的占比 double visit_length_1s_3s_ratio = NumberUtils.formatDouble((double) visit_length_1s_3s/session_count,2); double visit_length_4s_6s_ratio = NumberUtils.formatDouble((double)visit_length_4s_6s/session_count,2); double visit_length_7s_9s_ratio = NumberUtils.formatDouble((double)visit_length_7s_9s/session_count,2); double visit_length_10s_30s_ratio = NumberUtils.formatDouble((double)visit_length_10s_30s/session_count,2); double visit_length_30s_60s_ratio = NumberUtils.formatDouble((double)visit_length_30s_60s/session_count,2); double visit_length_1m_3m_ratio = NumberUtils.formatDouble((double)visit_length_1m_3m/session_count,2); double visit_length_3m_10m_ratio = NumberUtils.formatDouble((double)visit_length_3m_10m/session_count,2); double visit_length_10m_30m_ratio = NumberUtils.formatDouble((double)visit_length_10m_30m/session_count,2); double visit_length_30m_ratio = NumberUtils.formatDouble((double) visit_length_30m/session_count,2); double step_length_1_3_ratio = NumberUtils.formatDouble((double) step_length_1_3/session_count,2); double step_length_4_6_ratio = NumberUtils.formatDouble((double) step_length_4_6/session_count,2); double step_length_7_9_ratio = NumberUtils.formatDouble((double) step_length_7_9/session_count,2); double step_length_10_30_ratio = NumberUtils.formatDouble((double)step_length_10_30/session_count,2); double step_length_30_60_ratio = NumberUtils.formatDouble((double)step_length_30_60/session_count,2); double step_length_60_ratio = NumberUtils.formatDouble((double) step_length_60/session_count,2); // 将统计封装为Domain对象 SessionAggrStat sessionAggrStat = new SessionAggrStat(); sessionAggrStat.setSession_count(session_count); sessionAggrStat.setTaskid(taskId); sessionAggrStat.setVisit_length_1s_3s_ratio(visit_length_1s_3s_ratio); sessionAggrStat.setVisit_length_4s_6s_ratio(visit_length_4s_6s_ratio); sessionAggrStat.setVisit_length_7s_9s_ratio(visit_length_7s_9s_ratio); sessionAggrStat.setVisit_length_10s_30s_ratio(visit_length_10s_30s_ratio); sessionAggrStat.setVisit_length_30s_60s_ratio(visit_length_30s_60s_ratio); sessionAggrStat.setVisit_length_1m_3m_ratio(visit_length_1m_3m_ratio); sessionAggrStat.setVisit_length_3m_10m_ratio(visit_length_3m_10m_ratio); sessionAggrStat.setVisit_length_10m_30m_ratio(visit_length_10m_30m_ratio); sessionAggrStat.setVisit_length_30m_ratio(visit_length_30m_ratio); sessionAggrStat.setStep_length_1_3_ratio(step_length_1_3_ratio); sessionAggrStat.setStep_length_4_6_ratio(step_length_4_6_ratio); sessionAggrStat.setStep_length_7_9_ratio(step_length_7_9_ratio); sessionAggrStat.setStep_length_10_30_ratio(step_length_10_30_ratio); sessionAggrStat.setStep_length_30_60_ratio(step_length_30_60_ratio); sessionAggrStat.setStep_length_60_ratio(step_length_60_ratio); ISessionAggrStatDAO sessionAggrStatDAO = DAOFactory.getSessionAggrStatDAO(); sessionAggrStatDAO.insert(sessionAggrStat); } }