按session粒度进行数据聚合

6.2.1、 获取请求的taskid, 从数据库中查询到请求的参数

        // 创建需要使用的DAO组件
        ITaskDAO taskDAO = DAOFactory.getTaskDAO();

        // 首先得查询出来指定的任务,并获取任务的查询参数
        long taskid = 1L;
        Task task = taskDAO.findById(taskid);
        JSONObject taskParam = new  JSONObject(task.getTaskParam());

6.2.2、获取指定日期范围内的用户访问行为数据

    /**
     * 获取指定日期范围内的用户访问行为数据
     * @param sqlContext SQLContext
     * @param taskParam 任务参数
     * @return 行为数据RDD
     */
    private static JavaRDD<Row> getActionRDDByDateRange(
            SQLContext sqlContext, JSONObject taskParam) {
        //解析请求参数
        String startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE);

        String sql = 
                "select * from user_visit_action "
                        + "where start>='" + startDate + "' "
                        + "and start<='" + endDate + "'";  

        DataFrame actionDF = sqlContext.sql(sql);
        actionDF.show();
        return actionDF.javaRDD();
    }

6.2.3、对行为数据按session粒度进行聚合

6.2.3.1、将每条数据进行映射为<sessionid,Row>的格式

DataFrame转化为RDD, RDD中元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索,
我们现在需要将这个Row映射成

 JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(
 /**
  * PairFunction
  * 第一个参数,相当于是函数的输入
  * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值
  */
 new PairFunction<Row, String, Row>() {

    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, Row> call(Row row) throws Exception {
        return new Tuple2<String, Row>(row.getString(2), row);
    }

 });

6.2.3.2、按照session_id分组,是为了更小粒度的进行聚合, 一个会话中的记录,是同一个人的行为信息, 所以在下一步骤中,可以直接映射user_id

    // 对行为数据按session_id粒度进行分组
    JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD = 
                sessionid2ActionRDD.groupByKey();

6.2.3.3、为了能够和user info进行聚合(join), 必须将可以映射为user_id


 // 对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
 // 到此为止,获取的数据格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
 JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(

        new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
                    throws Exception {
                String sessionid = tuple._1;
                Iterator<Row> iterator = tuple._2.iterator();

                StringBuffer searchKeywordsBuffer = new StringBuffer("");
                StringBuffer clickCategoryIdsBuffer = new StringBuffer("");

                Long userid = null;

                // 遍历session所有的访问行为
                while(iterator.hasNext()) {
                    // 提取每个访问行为的搜索词字段和点击品类字段
                    Row row = iterator.next();
                    if(userid == null) {
                        userid = row.getLong(1);
                    }
                    String searchKeyword  = null;
                    Long clickCategoryId = -1L;
                    searchKeyword = row.getString(5);
                    try {
                        clickCategoryId = row.getLong(6);
                    } catch (Exception e) {
                        //e.printStackTrace();
                        //getLong的时候 if (isNullAt(i)) throw new NullPointerException(s"Value at index $i in null")
                        clickCategoryId = null;
                    }

                    // 实际上这里要对数据说明一下
                    // 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的
                    // 其实,只有搜索行为,是有searchKeyword字段的
                    // 只有点击品类的行为,是有clickCategoryId字段的
                    // 所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的

                    // 我们决定是否将搜索词或点击品类id拼接到字符串中去
                    // 首先要满足:不能是null值
                    // 其次,之前的字符串中还没有搜索词或者点击品类id

                    if(StringUtils.isNotEmpty(searchKeyword)) {
                        if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
                            searchKeywordsBuffer.append(searchKeyword + ",");  
                        }
                    }
                    if(clickCategoryId != null) {
                        if(!clickCategoryIdsBuffer.toString().contains(
                                String.valueOf(clickCategoryId))) {   
                            clickCategoryIdsBuffer.append(clickCategoryId + ",");  
                        }
                    }
                }

                String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
                String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());

                // 大家思考一下
                // 我们返回的数据格式,即使<sessionid,partAggrInfo>
                // 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合
                // 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid
                // 就应该是userid,才能够跟<userid,Row>格式的用户信息进行聚合
                // 如果我们这里直接返回<sessionid,partAggrInfo>,还得再做一次mapToPair算子
                // 将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举

                // 所以,我们这里其实可以直接,返回的数据格式,就是<userid,partAggrInfo>
                // 然后跟用户信息join的时候,将partAggrInfo关联上userInfo
                // 然后再直接将返回的Tuple的key设置成sessionid
                // 最后的数据格式,还是<sessionid,fullAggrInfo>

                // 聚合数据,用什么样的格式进行拼接?
                // 我们这里统一定义,使用key=value|key=value
                String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
                        + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
                        + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds;

                return new Tuple2<Long, String>(userid, partAggrInfo);
            }

        });

6.2.4、获取user info信息

    /**
     * 获取user info, 并转化为以user_id为key的RDD, 为了方便下一步进行join
     * @return
     */
    private static JavaPairRDD<Long, Row> getUserInfo() {
        // 查询所有用户数据,并映射成<userid,Row>的格式
        String sql = "select * from user_info";  
        JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();

        JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(

                new PairFunction<Row, Long, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Row> call(Row row) throws Exception {
                        return new Tuple2<Long, Row>(row.getLong(0), row);
                    }

                });
        return userid2InfoRDD;
    }

6.2.5、进行聚合操作

        // 将session粒度聚合数据,与用户信息进行join
        JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD = 
                userid2PartAggrInfoRDD.join(userid2InfoRDD);

6.2.5、对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据

        // 对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<Long, Tuple2<String, Row>> tuple)
                                    throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });

猜你喜欢

转载自blog.csdn.net/wuxintdrh/article/details/80961257