基本介绍
1、广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks
2、广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
3、与DataStreaming 中的Broadcast区别开来,DataStreaming 中的Broadcast是把元素广播给所有的分区,数据会被重复处理,类似于storm中的allGrouping(调用方式 dataStream.broadcast())
用法
1:初始化数据 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3)
2:广播数据 withBroadcastSet(toBroadcast, "broadcastSetName");
3:获取数据 Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
使用Demo
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBuffer
object BatchDemoBroadcast {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 1、初始化数据
val broadData = new ListBuffer[Tuple2[String, Int]]()
broadData.append(("zs", 18))
broadData.append(("ls", 20))
broadData.append(("ww", 17))
val tupleData = env.fromCollection(broadData)
val toBroadcastData = tupleData.map(data => Map(data._1 -> data._2))
val text = env.fromElements("zs", "ls", "ww")
text.map(new RichMapFunction[String, String] {
var allMap = Map[String,Int]()
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 3、获取数据
val listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastData")
val it = listData.iterator()
println("-------------------")
while (it.hasNext) {
allMap = allMap.++(it.next())
}
}
override def map(in: String): String = {
in + "_" + allMap(in)
}
}).withBroadcastSet(toBroadcastData, "broadcastData")// 2、广播数据
.print()
}
}