/**
* 获取top10热门品类
* @param filteredSessionid2AggrInfoRDD
* @param sessionid2actionRDD
*/
private static void getTop10Category(
long taskid,
JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD,
JavaPairRDD<String, Row> sessionid2actionRDD) {
/**
* 第一步:获取符合条件的session访问过的所有品类
*/
// 获取符合条件的session的访问明细
JavaPairRDD<String, Row> sessionid2detailRDD = filteredSessionid2AggrInfoRDD
.join(sessionid2actionRDD)
.mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Row> call(
Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
return new Tuple2<String, Row>(tuple._1, tuple._2._2);
}
});
// 获取session访问过的所有品类id
// 访问过:指的是,点击过、下单过、支付过的品类
JavaPairRDD<Long, Long> categoryidRDD = sessionid2detailRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String,Row>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Tuple2<Long, Long>> call(
Tuple2<String, Row> tuple) throws Exception {
Row row = tuple._2;
List<Tuple2<Long, Long>> list = new ArrayList<Tuple2<Long, Long>>();
Object clickCategoryId = row.get(6);
if(clickCategoryId != null) {
list.add(new Tuple2<Long, Long>(row.getLong ( 6 ), row.getLong ( 6 )));
}
String orderCategoryIds = row.getString(8);
if(orderCategoryIds != null) {
String[] orderCategoryIdsSplited = orderCategoryIds.split(",");
for(String orderCategoryId : orderCategoryIdsSplited) {
list.add(new Tuple2<Long, Long>(Long.valueOf(orderCategoryId),
Long.valueOf(orderCategoryId)));
}
}
String payCategoryIds = row.getString(10);
if(payCategoryIds != null) {
String[] payCategoryIdsSplited = payCategoryIds.split(",");
for(String payCategoryId : payCategoryIdsSplited) {
list.add(new Tuple2<Long, Long>(Long.valueOf(payCategoryId),
Long.valueOf(payCategoryId)));
}
}
return list.iterator ();
}
});
/**
* 第二步:计算各品类的点击、下单和支付的次数
*/
// 访问明细中,其中三种访问行为是:点击、下单和支付
// 分别来计算各品类点击、下单和支付的次数,可以先对访问明细数据进行过滤
// 分别过滤出点击、下单和支付行为,然后通过map、reduceByKey等算子来进行计算
// 计算各个品类的点击次数
JavaPairRDD<Long, Long> clickCategoryId2CountRDD =
getClickCategoryId2CountRDD(sessionid2detailRDD);
// 计算各个品类的下单次数
JavaPairRDD<Long, Long> orderCategoryId2CountRDD =
getOrderCategoryId2CountRDD(sessionid2detailRDD);
// 计算各个品类的支付次数
JavaPairRDD<Long, Long> payCategoryId2CountRDD =
getPayCategoryId2CountRDD(sessionid2detailRDD);
/**
* 第三步:join各品类与它的点击、下单和支付的次数
*
* categoryidRDD中,是包含了所有的符合条件的session,访问过的品类id
*
* 上面分别计算出来的三份,各品类的点击、下单和支付的次数,可能不是包含所有品类的
* 比如,有的品类,就只是被点击过,但是没有人下单和支付
*
* 所以,这里,就不能使用join操作,要使用leftOuterJoin操作,就是说,如果categoryidRDD不能
* join到自己的某个数据,比如点击、或下单、或支付次数,那么该categoryidRDD还是要保留下来的
* 只不过,没有join到的那个数据,就是0了
*
*/
JavaPairRDD<Long, String> categoryid2countRDD = joinCategoryAndData(
categoryidRDD, clickCategoryId2CountRDD, orderCategoryId2CountRDD,
payCategoryId2CountRDD);
/**
* 第四步:自定义二次排序key
*/
/**
* 第五步:将数据映射成<CategorySortKey,info>格式的RDD,然后进行二次排序(降序)
*/
JavaPairRDD<CategorySortKey, String> sortKey2countRDD = categoryid2countRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, CategorySortKey, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<CategorySortKey, String> call(
Tuple2<Long, String> tuple) throws Exception {
String countInfo = tuple._2;
long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_CLICK_COUNT));
long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_ORDER_COUNT));
long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_PAY_COUNT));
CategorySortKey sortKey = new CategorySortKey(clickCount,orderCount, payCount);
return new Tuple2<CategorySortKey, String>(sortKey, countInfo);
}
});
JavaPairRDD<CategorySortKey, String> sortedCategoryCountRDD =
sortKey2countRDD.sortByKey(false);
/**
* 第六步:用take(10)取出top10热门品类,并写入MySQL
*/
ITop10CategoryDAO top10CategoryDAO = DAOFactory.getTop10CategoryDAO();
List<Tuple2<CategorySortKey, String>> top10CategoryList =
sortedCategoryCountRDD.take(10);
for(Tuple2<CategorySortKey, String> tuple: top10CategoryList) {
String countInfo = tuple._2;
long categoryid = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_CATEGORY_ID));
long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_CLICK_COUNT));
long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_ORDER_COUNT));
long payCount = Long.valueOf(StringUtils.getFieldFromConcatString(
countInfo, "\\|", Constants.SESSION_PROJECT.FIELD_PAY_COUNT));
Top10Category category = new Top10Category();
category.setTaskid(taskid);
category.setCategoryid(categoryid);
category.setClickCount(clickCount);
category.setOrderCount(orderCount);
category.setPayCount(payCount);
category.setUUID ( UUID.randomUUID ().toString () );
top10CategoryDAO.insert(category);
}
}
/**
* 获取各品类点击次数RDD
* @param sessionid2detailRDD
* @return
*/
private static JavaPairRDD<Long, Long> getClickCategoryId2CountRDD(
JavaPairRDD<String, Row> sessionid2detailRDD) {
JavaPairRDD<String, Row> clickActionRDD = sessionid2detailRDD.filter(
new Function<Tuple2<String,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Row> tuple) throws Exception {
Row row = tuple._2;
if(row.get ( 6 )==null){
return false;
}
return Long.valueOf(row.getLong(6)) != null ? true : false;
}
});
JavaPairRDD<Long, Long> clickCategoryIdRDD = clickActionRDD.mapToPair(
new PairFunction<Tuple2<String,Row>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Row> tuple)
throws Exception {
long clickCategoryId = tuple._2.getLong(6);
return new Tuple2<Long, Long>(clickCategoryId, 1L);
}
});
JavaPairRDD<Long, Long> clickCategoryId2CountRDD = clickCategoryIdRDD.reduceByKey(
new Function2<Long, Long, Long> () {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
return clickCategoryId2CountRDD;
}
/**
* 获取各品类的下单次数RDD
* @param sessionid2detailRDD
* @return
*/
private static JavaPairRDD<Long, Long> getOrderCategoryId2CountRDD(
JavaPairRDD<String, Row> sessionid2detailRDD) {
JavaPairRDD<String, Row> orderActionRDD = sessionid2detailRDD.filter(
new Function<Tuple2<String,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Row> tuple) throws Exception {
Row row = tuple._2;
return row.getString(8) != null ? true : false;
}
});
JavaPairRDD<Long, Long> orderCategoryIdRDD = orderActionRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String,Row>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Tuple2<Long, Long>> call(
Tuple2<String, Row> tuple) throws Exception {
Row row = tuple._2;
String orderCategoryIds = row.getString(8);
String[] orderCategoryIdsSplited = orderCategoryIds.split(",");
List<Tuple2<Long, Long>> list = new ArrayList<Tuple2<Long, Long>>();
for(String orderCategoryId : orderCategoryIdsSplited) {
list.add(new Tuple2<Long, Long>(Long.valueOf(orderCategoryId), 1L));
}
return list.iterator ();
}
});
JavaPairRDD<Long, Long> orderCategoryId2CountRDD = orderCategoryIdRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
return orderCategoryId2CountRDD;
}
/**
* 获取各个品类的支付次数RDD
* @param sessionid2detailRDD
* @return
*/
private static JavaPairRDD<Long, Long> getPayCategoryId2CountRDD(
JavaPairRDD<String, Row> sessionid2detailRDD) {
JavaPairRDD<String, Row> payActionRDD = sessionid2detailRDD.filter(
new Function<Tuple2<String,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, Row> tuple) throws Exception {
Row row = tuple._2;
return row.getString(10) != null ? true : false;
}
});
JavaPairRDD<Long, Long> payCategoryIdRDD = payActionRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String,Row>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Tuple2<Long, Long>> call(
Tuple2<String, Row> tuple) throws Exception {
Row row = tuple._2;
String payCategoryIds = row.getString(10);
String[] payCategoryIdsSplited = payCategoryIds.split(",");
List<Tuple2<Long, Long>> list = new ArrayList<Tuple2<Long, Long>>();
for(String payCategoryId : payCategoryIdsSplited) {
list.add(new Tuple2<Long, Long>(Long.valueOf(payCategoryId), 1L));
}
return list.iterator ();
}
});
JavaPairRDD<Long, Long> payCategoryId2CountRDD = payCategoryIdRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
return payCategoryId2CountRDD;
}
/**
* 连接品类RDD与数据RDD
* @param categoryidRDD
* @param clickCategoryId2CountRDD
* @param orderCategoryId2CountRDD
* @param payCategoryId2CountRDD
* @return
*/
private static JavaPairRDD<Long, String> joinCategoryAndData(
JavaPairRDD<Long, Long> categoryidRDD,
JavaPairRDD<Long, Long> clickCategoryId2CountRDD,
JavaPairRDD<Long, Long> orderCategoryId2CountRDD,
JavaPairRDD<Long, Long> payCategoryId2CountRDD) {
// 解释一下,如果用leftOuterJoin,就可能出现,右边那个RDD中,join过来时,没有值
// 所以Tuple中的第二个值用Optional<Long>类型,就代表,可能有值,可能没有值
JavaPairRDD<Long, Tuple2<Long, org.apache.spark.api.java.Optional<Long>>> tmpJoinRDD =
categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD);
JavaPairRDD<Long, String> tmpMapRDD = tmpJoinRDD.mapToPair(
new PairFunction<Tuple2<Long,Tuple2<Long,Optional<Long>>>, Long, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, String> call(
Tuple2<Long, Tuple2<Long, Optional<Long>>> tuple)
throws Exception {
long categoryid = tuple._1;
Optional<Long> optional = tuple._2._2;
long clickCount = 0L;
if(optional.isPresent()) {
clickCount = optional.get();
}
String value = Constants.SESSION_PROJECT.FIELD_CATEGORY_ID + "=" + categoryid + "|" +
Constants.SESSION_PROJECT.FIELD_CLICK_COUNT + "=" + clickCount;
return new Tuple2<Long, String>(categoryid, value);
}
});
tmpMapRDD = tmpMapRDD.leftOuterJoin(orderCategoryId2CountRDD).mapToPair(
new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, String> call(
Tuple2<Long, Tuple2<String, Optional<Long>>> tuple)
throws Exception {
long categoryid = tuple._1;
String value = tuple._2._1;
Optional<Long> optional = tuple._2._2;
long orderCount = 0L;
if(optional.isPresent()) {
orderCount = optional.get();
}
value = value + "|" + Constants.SESSION_PROJECT.FIELD_ORDER_COUNT + "=" + orderCount;
return new Tuple2<Long, String>(categoryid, value);
}
});
tmpMapRDD = tmpMapRDD.leftOuterJoin(payCategoryId2CountRDD).mapToPair(
new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, String> call(
Tuple2<Long, Tuple2<String, Optional<Long>>> tuple)
throws Exception {
long categoryid = tuple._1;
String value = tuple._2._1;
Optional<Long> optional = tuple._2._2;
long payCount = 0L;
if(optional.isPresent()) {
payCount = optional.get();
}
value = value + "|" + Constants.SESSION_PROJECT.FIELD_PAY_COUNT + "=" + payCount;
return new Tuple2<Long, String>(categoryid, value);
}
});
return tmpMapRDD;
}
spark 大型项目实战(二十五):top10热门品类(二) --获取Top10 实现
猜你喜欢
转载自blog.csdn.net/u012957549/article/details/80723088
今日推荐
周排行