Spark core算子aggregateByKey实例

groupbykey、reducebykey以及aggregateByKey

groupbykey是全局聚合算子,将所有map task中的数据都拉取到shuffle中将key相同的数据进行聚合,它存在很多弊端,例如:将大量的数据进行网络传输,浪费大量的资源,最重要的是如果数据量太大还会出现GC和OutOfMemoryError的错误,如果数据某个key的数据量远大于其他key的数据,在进行全局聚合的时候还会出现数据倾斜的问题。

所以在实际的项目中要谨慎使用groupbykey,随着数据量的增加,groupbykey暴露出来的问题会越来越多。

reducebykey是在map阶段进行本地聚合以后才会到shuffle中进行全局聚合,相当于是进入shuffle之前已经做了一部分聚合,那么它的网络传输速度会比groupbykey快很多而且占用资源也会减少很多,但是算子本身就如它的名字一样,主要是进行计算的将相同key的数据进行计算,返回计算结果。

但是实际的项目中在进行聚合之后我们不一定只是要计算,还会找聚合后某个字段的最大值,最小值等等操作,groupbykey聚合后返回的是(K,Iterable[V]),我们可以把iterable[V]这个集合的数据进行二次处理来实现我们实际的项目需求,但是上面已经提到了groupbykey的诸多问题,reducebykey也是只有在单纯的对数据进行计算的时候才能和groupbykey有等价效果。既想像reducebykey那样进行本地聚合,又想像groupbykey那样返回一个集合便于我们操作。

说了这么多也就引出来了我们今天的主题aggregatebykey。

aggregatebykey和reducebykey一样首先在本地聚合,然后再在全局聚合。它的返回值也是由我们自己设定的。

aggregatebykey使用需要提供三个参数:

zeroValue: U 这个参数就会决定最后的返回类型
seqOp: (U, V) => U 将V(数据)放入U中进行本地聚合
combOp: (U, U) => U 将不同的U进行全局聚合

可能说的大家有点晕,那么我介绍一个工作中遇到的需求以及使用aggregatebykey来解决问题的实例:

首先看数据的结构:

List(
      ("84.174.205.5",("2018-11-10 23:23:23",2)),
      ("221.226.113.146",("2018-09-11 23:23:23",3)),
      ("84.174.205.5",("2018-12-15 23:23:23",5)),
      ("108.198.168.20",("2018-01-03 23:23:23",2)),
      ("108.198.168.20",("2018-11-21 23:23:23",4)),
      ("221.226.113.146",("2018-11-01 23:23:23",6)),
        ("221.226.113.146",("2018-12-06 23:23:23",6))
      )

key为IP V为日期以及访问的次数。

现在的需求:

1、将ip的访问次数进行累加操作

2、聚合后只保留最早的时间

废话不多说上代码:

val spark = SparkSession.builder().appName(test.getClass.getName).master("local").getOrCreate()
//准备数据
    val pairRdd: RDD[(String, (String, Int))] = spark.sparkContext.parallelize(
      List(
      ("84.174.205.5",("2018-11-10 23:23:23",2)),
      ("221.226.113.146",("2018-09-11 23:23:23",3)),
      ("84.174.205.5",("2018-12-15 23:23:23",5)),
      ("108.198.168.20",("2018-01-03 23:23:23",2)),
      ("108.198.168.20",("2018-11-21 23:23:23",4)),
      ("221.226.113.146",("2018-11-01 23:23:23",6)),
        ("221.226.113.146",("2018-12-06 23:23:23",6))
      ),2)
//运用aggregatebykey
//1、U定义为ArrayBuffer
    val juhe = pairRdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[(String, Int)]())((arr,value)=>{
//2、将value放入集合U中
      arr += value
//3、将所有的集合进行合并
    },_.union(_))
    val juhesum = juhe.mapPartitions(partition=>{
      partition.map(m=>{
        val key = m._1
        val date = m._2.map(m=>m._1).toList.sortWith(_<_)(0)
        val sum = m._2.map(m=>m._2).sum
        Row(key,date,sum)
      })
    })
    juhesum.foreach(println(_))

最后的输出结果:

[108.198.168.20,2018-01-03 23:23:23,6]

[84.174.205.5,2018-11-10 23:23:23,7]

[221.226.113.146,2018-09-11 23:23:23,15]

到此实现需求。

希望本文会对大家有帮助,欢迎大家评论交流,谢谢!

猜你喜欢

转载自blog.csdn.net/qq_24674131/article/details/85114760