累加器
累加器实现
自定义Int类型的累加器
结果为6
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
object Acc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("add1")
val sc = new SparkContext(conf)
val list1 = List(30, 50, 70, 60, 10, 20)
val rdd1 = sc.parallelize(list1, 2)
//先注册自定义的累加器
//如果换成object就不用new MyIntAcc
val acc = new MyIntAcc
sc.register(acc,"first")
val rdd2 = rdd1.map(x => {
acc.add(1)
x
})
rdd2.collect()
println(acc.value)
sc.stop()
}
}
//1.对什么值进行累加 2.累加器最终的值
class MyIntAcc extends AccumulatorV2[Int,Int]{
private var sum = 0
//判零,对缓冲区的值进行判"零"
override def isZero: Boolean = sum == 0
//把当前的累加器复制为新的累加器
override def copy(): AccumulatorV2[Int, Int] = {
val acc = new MyIntAcc
acc.sum = sum
acc
}
//重叠累加器(把缓冲区的值重置为"零")
override def reset(): Unit = sum = 0
//真正的累加方法(分区内的累加)
override def add(v: Int): Unit = sum += v
//分区间的合并
override def merge(other: AccumulatorV2[Int, Int]): Unit ={
other match {
case acc:MyIntAcc => sum + acc.sum
case _ => this.sum += 0
}
}
//返回累加后的最终值
override def value: Int = sum
}
自定义map类型的累加器
结果:Map(sum -> 240.0, count -> 6, avg -> 40.0)
不可变Map
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
object MapAcc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("add1")
val sc = new SparkContext(conf)
val list1 = List(30, 50, 70, 60, 10, 20)
val rdd1 = sc.parallelize(list1, 2)
val acc = new MapAcc
sc.register(acc)
rdd1.foreach(x=>acc.add(x))
println(acc.value)
sc.stop()
}
}
//将来累加器的值同时包含 sum,count,avg
//(sum,count,avg)
//Map("sum"->1000,"count"->10 "avg"->100)
class MapAcc extends AccumulatorV2[Double,Map[String,Any]]{
private var map = Map[String,Any]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[Double, Map[String, Any]] = {
val acc = new MapAcc
acc.map = map
acc
}
//不可变集合,直接赋值一个空的集合
override def reset(): Unit = map = Map[String,Any]()
override def add(v: Double): Unit = {
//对sum和count进行累加,avg在最后value函数进行计算
map += "sum" -> ((map.getOrElse("sum",0D)).asInstanceOf[Double] + v)
map += "count" -> ((map.getOrElse("count",0L)).asInstanceOf[Long] + 1L)
}
override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {
//合并两个map
other match {
case o: MapAcc =>
map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + o.map.getOrElse("sum", 0D)
.asInstanceOf[Double])
map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + o.map.getOrElse("count", 0L)
.asInstanceOf[Long])
case _ => throw new UnsupportedOperationException
}
}
override def value: Map[String, Any] = {
map += "avg" -> map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 0L).asInstanceOf[Long]
map
}
}
可变map
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object MapAcc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("add1")
val sc = new SparkContext(conf)
val list1 = List(30, 50, 70, 60, 10, 20)
val rdd1 = sc.parallelize(list1, 2)
val acc = new MapAcc
sc.register(acc)
rdd1.foreach(x=>acc.add(x))
println(acc.value)
}
}
class MapAcc extends AccumulatorV2[Double,mutable.Map[String,Any]] {
private val map = mutable.Map[String,Any]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[Double, mutable.Map[String, Any]] = {
val acc = new MapAcc
map.synchronized{
acc.map ++= map
}
acc
}
override def reset(): Unit = map.clear()
override def add(v: Double): Unit = {
//对sum和count进行累加,avg在最后value函数进行计算
map += "sum" -> ((map.getOrElse("sum",0D)).asInstanceOf[Double] + v)
map += "count" -> ((map.getOrElse("count",0L)).asInstanceOf[Long] + 1L)
}
override def merge(other: AccumulatorV2[Double, mutable.Map[String, Any]]): Unit = {
//合并两个map
other match {
case o: MapAcc =>
map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + o.map.getOrElse("sum", 0D)
.asInstanceOf[Double])
map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + o.map.getOrElse("count", 0L)
.asInstanceOf[Long])
case _ => throw new UnsupportedOperationException
}
}
override def value: mutable.Map[String, Any] = {
map += "avg" -> map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 0L).asInstanceOf[Long]
map
}
}
为什么copy要用synchronized?
广播变量
为什么要使用广播变量,作用是什么,看下面两张图就明白了
不使用广播变量
使用广播变量
执行代码