一.引言
除了操作的常规输入之外,广播变量 Broadcast Value 允许使一个数据集对操作的所有并行实例可用,即适合 task 都需要公用的变量,就像是 spark 中各个 executor 都需要访问的公共变量一样。这对于辅助数据集或依赖于数据的参数化非常有用。然后,该数据集将作为一个集合在操作员处进行访问。在 Flink 中,广播变量通过下述方法生成和获取 :
生成 : withBroadcastSet(DataSet, String) 前者为广播的数据集,后者为该数据集对应的名字标识
获取 : getRuntimeContext().getbroadcastvariable(String) 方法获取,参数为生成对应的名字标识
二.Boardcast 使用 demo
本例将在 DataSet 的 map 操作中调用 broadcast value 并打印其结果。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 1. The DataSet to be broadcast
// A.创建
val toBroadcastV1 = env.fromElements(1, 2, 3)
val toBroadcastV2 = env.fromElements(3, 4, 5)
case class Student(name: String)
val toBroadcastV3 = env.fromElements(Student("A"), Student("B"))
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String]() {
var broadcastSetV1: mutable.Buffer[String] = null
var broadcastSetV2: mutable.Buffer[String] = null
var broadcastSetV3: mutable.Buffer[Student] = null
// D.Common Collection
val notBroadcastSetV4: Array[Int] = Array(7, 8, 9)
override def open(config: Configuration): Unit = {
// 3. Access the broadcast DataSet as a Collection
// C.获取
broadcastSetV1 = getRuntimeContext.getBroadcastVariable[String]("broadcastSetNameV1").asScala
broadcastSetV2 = getRuntimeContext.getBroadcastVariable[String]("broadcastSetNameV2").asScala
broadcastSetV3 = getRuntimeContext.getBroadcastVariable[Student]("broadcastSetNameV3").asScala
}
override def map(in: String): String = {
broadcastSetV1.foreach(print)
broadcastSetV2.foreach(print)
broadcastSetV3.foreach(print)
notBroadcastSetV4.foreach(print)
"output: " + in
} // B.广播
}).withBroadcastSet(toBroadcastV1, name = "broadcastSetNameV1")
.withBroadcastSet(toBroadcastV2, name = "broadcastSetNameV2")
.withBroadcastSet(toBroadcastV3, name = "broadcastSetNameV3")
.print()
demo 里共有 A、B、C、D 四个标记点,对应四个情形:
A.创建
这里对应 DataSet 的创建,相关 DataSet 的创建可以参考 Flink DataSet 创建,这里除了可以广播基础数据类型 String,Double 等,也可以广播自定义的数据类型,但必须是包在 DataSet 中即广播变量必须是 DataSet[T] 的形式。
B.广播
在 DataSet 的 transformation 操作中如果需要加入广播变量则需要调用 .withBroadCast 方法,规定 DataSet 与之对应的 String 标识,常用的 transformation 操作可以参考 Flink DataSet Transformation。
C.获取
在 function 中先初始化对应广播变量,然后在 open 初始化函数中通过下述方法获取,T 代表 DataSet 中的数据类型 :
getRuntimeContext.getBroadcastVariable[T]("key")
D.Common Collection
只是一个常规的 DataSet[T] 在 Function 中直接初始化也可以直接使用,为什么还要这么复杂的广播呢,这其实涉及到广播的初衷,广播变量适用于所有公用 Task ,所以在外部初始化直接广播到各个节点,占用的内存也相对较小,不广播则会在每个 Task 上都初始化一个相同的数据集,都指向不同的内存指针,占用内存相对较多,如果数据集特别小直接初始化也没毛病,毕竟省事。除了广播变量外,open 初始化函数内适合初始化 client , socket ,connection,server 等等。
三.总结
由于广播变量的内容保存在每个节点的内存中,所以它不应该变得太大。对于像标量值这样简单的东西,你可以简单地将参数作为函数闭包的一部分,或者使用withParameters(…)方法来传递配置。Flink 更专注于流式数据的处理,后续会介绍流式数据的 BroadcastStream。