针对session粒度的聚合数据,按照使用者指定的筛选参数进行数据过滤
即对6.2中聚合后的数据,按照客户的刷选条件进行过滤。聚合数据<sessionid,(sessionid,searchKeywords,clickCategoryIds,age,professional,city,sex)>
(ce6983270b154f31a7ca67b56f6abcb3,sessionid=ce6983270b154f31a7ca67b56f6abcb3|searchKeywords=温泉|clickCategoryIds=51,79,80|age=2|professional=professional23|city=city25|sex=male)
(d489a32c190e4dd1a7e0fd29fb92c495,sessionid=d489a32c190e4dd1a7e0fd29fb92c495|searchKeywords=新辣道鱼火锅,温泉,蛋糕|clickCategoryIds=17,51,39,91,46,8|age=2|professional=professional23|city=city25|sex=male)
(3783c8f44c3b465884f64a4923368261,sessionid=3783c8f44c3b465884f64a4923368261|searchKeywords=火锅,温泉|clickCategoryIds=90|age=2|professional=professional23|city=city25|sex=male)
通过使用Fliter算子进行过滤
/**
* 按筛选参数对session粒度聚合数据进行过滤
* @param sessionid2AggrInfoRDD 聚合后的数据
* @param taskParam 过滤的条件
* @return
*/
private static JavaPairRDD<String, String> filterSession(
JavaPairRDD<String, String> sessionid2AggrInfoRDD,
JSONObject taskParam) {
//从数据库中获取过滤的参数
String startAge = ParamUtils.getParam(taskParam, Constants.PARAM_START_AGE);
String endAge = ParamUtils.getParam(taskParam, Constants.PARAM_END_AGE);
String searchKeys = ParamUtils.getParam(taskParam, Constants.PARAM_KEYWORDS);
System.out.println(startAge+"------"+endAge+"----"+searchKeys);
JavaPairRDD<String, String> filterRDD = sessionid2AggrInfoRDD.filter(new Function<Tuple2<String,String>, Boolean>() {
@Override
public Boolean call(Tuple2<String, String> tuple) throws Exception {
// 首先,从tuple中,获取聚合数据
String aggrInfo = tuple._2;
Boolean isFilter = false;
//1、按照年龄范围进行过滤(startAge, endAge)
int age = Integer.valueOf(
StringUtils.getFieldFromConcatString(aggrInfo, "\\|", Constants.FIELD_AGE));
if(startAge != null && endAge != null){
if(age>=Integer.valueOf(startAge) && age<=Integer.valueOf(endAge))
isFilter = true;
}
if(!isFilter)
return false;
//2、对搜素关键字进行过滤, 是否包含
String searchKeywords = StringUtils.getFieldFromConcatString(aggrInfo, "\\|",
Constants.FIELD_SEARCH_KEYWORDS);
if(searchKeywords.contains(searchKeys)){
isFilter = isFilter && true;
}else {
return false;
}
return isFilter;
}
});
return filterRDD;
}