数据字段: 时间戳 ,省份, 城市 , userId, adId
统计每一个省份每一个小时的广告id访问量top3
package com.jxlg.RDD
import org.apache.spark.{
SparkConf, SparkContext}
import org.joda.time.DateTime
/**
* 数据字段: 时间戳 省份 城市 userId adId
* 统计每一个省份每一个小时的广告id访问量top3
*/
object AdventTop3_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AdventTop3_2").setMaster("local[2]")
val sc =new SparkContext(conf)
val logsArr = sc.textFile("hdfs://192.168.157.132:9000/user/hdfs/add.txt").map(_.split("\t"))
//将数据整合成元组,便于聚合,key= 省份+小时+广告id
val proAndHourAndAdId = logsArr.map(arr => {
val province = arr(1)
val hour = getHour(arr(0))
val adId = arr(4)
(province+"_"+hour+"_"+adId,1)
})
//聚合生成每一个省份的每一个小时的广告点击量
val aggred = proAndHourAndAdId.reduceByKey(_+_)
//重新整合数据,便于接下来的分组排序
val proAndHourTup = aggred.map(tup => {
val splited = tup._1.split("_")
val pro = splited(0)
val hour = splited(1)
val adId = splited(2)
((pro,hour),adId,tup._2)
})
//用省份和小时进行分组
val grouped = proAndHourTup.groupBy(_._1)
//开始组内排序
val res = grouped.mapValues(x => x.toList.sortWith(_._3>_._3).take(3))
println(res.collect.toBuffer)
sc.stop()
}
/**
* 获取时间戳的小时方法
* @param time_long
* @return
*/
def getHour(time_long : String): String ={
val dateTime = new DateTime(time_long.toLong)
dateTime.getHourOfDay.toString
}
}