上篇文章给大家讲解了Flink中常用的到算子 ☛(Flink中的17种TransFormAction算子)那您写的代码如何进行优化那,提高效率?那接下来我们使用分布式缓存、广播变量来提高代码的效率。
一、Flink 的广播变量(重点 )
介绍Flink广播变量及试用场景
Flink 支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中, 这样可以减缓大量的 shuffle 操作; 比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存 中,可以直接在内存中拿数据,避免了大量的 shuffle, 导致集群性能下降; 广播变量创 建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要 记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个 dataset 数据集广 播出去, 然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。 如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集, 比较浪费内存(也 就是一个节点中可能会存在多份 dataset 数据)。
注意事项: 因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会 出 现 OOM 这样的问题。
- 可以理解广播变量就是一个公共的变量
- 将一个数据集广播后,不同的Task都可以在节点上获取到
- 每个节点 只保留一份
- 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪法。
用法:
在需要使用广播的操作后,使用 withBroadcastSet 创建广播
在操作中,使用 getRuntimeContext.getBroadcastVariable [广播数据类型] ( 广播名 )获取广播变量
示例:
创建一个 学生 数据集,包含以下数据
|学生 ID | 姓名 |
List((1, “张三”), (2, “李四”), (3, “王五”))
将该数据,发布到广播。
再创建一个 成绩 数据集
|学生 ID | 学科 | 成绩 |
List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))
通过获取广播变量中的信息将数据转为
List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))
实现步骤
- 获取批处理运行环境
- 分别创建两个数据集(学生信息、成绩信息)
- 使用RichMapFuncation 对成绩数据进行map转换
- 在数据集调用map方法后,调用 withBroadcastSet将学生信息创建广播
- 实现RichMapFunction
a. 将成绩数据(学生 ID,学科,成绩)-> (学生姓名,学科,成绩)
b.重写 open 方法中的,获取广播数据
c.导入 import scala.collection.JavaConverters._ 隐式转换
d.将广播变量使用asScala 转换为Scala集合,在只用toList转为scala toMap集合
e.在map方式用使用广播变量进行转换 - 打印输出
代码参考
import java.util
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
/**
* 需求: 创建一个 学生数据集,包含以下数据
* List((1, "张三"), (2, "李四"), (3, "王五"))
* 再创建一个 成绩数据集,
* |学生ID | 学科| 成绩|
* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))
* 请通过广播获取到学生姓名,将数据转换为
* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))
*
* @author
* @date 2020/9/18 23:15
* @version 1.0
*/
object BatchBroadcast {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val student = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))
val score = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))
//3.使用RichMapFunction 对成绩数据集进行map转换
val result = score.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {
// 定义一个map用来存放广播变量中的信息
var studentMap: Map[Int, String] = null
override def open(parameters: Configuration): Unit = {
// 导入工具类将java代码转为scala
import scala.collection.JavaConversions._
// 获取广播变量中的信息
val studentList: util.List[(Int, String)] = getRuntimeContext.getBroadcastVariable[(Int, String)]("student")
studentMap = studentList.toMap
}
// 重写map方法返回指定数据
override def map(value: (Int, String, Int)): (String, String, Int) = {
val stuName = studentMap.getOrElse(value._1, "")
(stuName, value._2, value._3)
}
}).withBroadcastSet(student, "student")
// 结果输出
result.print()
/*(张三,语文,50)
(李四,数学,70)
(王五,英文,86)
*/
}
}
二、Flink 的分布式缓存(重点 )
介绍分布式缓存:
Flink 提供了一个类似于 Hadoop 的分布式缓存,让并行运行实例的函数可以在本地访 问。这 个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等! 缓存的使用流程: 使用 ExecutionEnvironment 实例对本地的或者远程的文件(例如:HDFS 上的文件),为缓 存 文件指定一个名字注册该缓存文件!当程序执行时候,Flink 会自动将复制文件或者目 录到所有 worker 节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检 索该文件!
注意: 广播是将变量分发到各个 worker 节点的内存上,分布式缓存是将文件缓存到各个 worker 节点上;
用法:
使用 Flink 运行时环境的 registerCachedFile 注册一个分布式缓存 在操作中
使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布 式缓存
示例:
List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))
使用分布式缓存获取数据将数数据转为
List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))
注意:student.txt测试文件保存了学生 ID 以及学生姓名
实现步骤:
- 将创建student.txt 文件
- 获取批处理运行环境
- 创建成绩数据集
- 对 成绩 数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
a. RichMapFunction 的 open 方法中,获取分布式缓存数据
b. 在 map 方法中进行转换 - 实现 open 方法
a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
b. 使用 Scala.fromFile 读取文件,并获取行 c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List - 实现 map 方法
a. 从分布式缓存中根据学生 ID 过滤出来学生
b. 获取学生姓名
c. 构建最终结果元组 - 打印测试
代码参考
import java.io.File
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import scala.io.Source
/** * 需求:
* 创建一个 成绩 数据集
* List( (1, "语文", 50),(2, "数学", 70), (3, "英文", 86))
* 请通过分布式缓存获取到学生姓名,将数据转换为
* List( ("张三", "语文", 50),("李四", "数学", 70), ("王五", "英文", 86))
* 注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名
*
* @author
* @date 2020/9/18 23:51
* @version 1.0
*/
object BatchDisCachedFile {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据集
val scoreDataSet = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))
//3.注册分布式缓存
env.registerCachedFile("./data/student.txt", "student")
val result = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {
//定义一个map用来存储分布式缓存中的数据
var studentMap: Map[Int, String] = null
// 初始化操作
override def open(parameters: Configuration): Unit = {
// 获取缓存中的信息
val student: File = getRuntimeContext.getDistributedCache.getFile("student")
// 读取据按照每行数据返回
val liens = Source.fromFile(student).getLines()
//遍历数据进行返回
studentMap = liens.map(s => {
val arr = s.split(",")
(arr(0).toInt, arr(1))
}).toMap
}
override def map(value: (Int, String, Int)): (String, String, Int) = {
val studentName = studentMap.getOrElse(value._1, "")
(studentName, value._2, value._3)
}
})
result.print()
}
}
三、Flink累加器(Accumulators 了解)
介绍:
Accumulator 即累加器,与 Mapreduce counter 的应用场景差不多,都能很好地观察 task 在运行期间的数据变化 可以在 Flink job 任务中的算子函数中操作累加器,但是只 能在任务执行结束之后才能获得累加器的最终结果。 Counter 是 一 个 具 体 的 累 加 器 (Accumulator) 实 现 IntCounter, LongCounter 和 DoubleCounter
示例
需求: 给定一个数据源 “a”,“b”,“c”,“d” 通过累加器打印出多少个元素
实现步骤:
- 创建累加器
- 注册累加器
- 使用累加器
- 获取累加器的结果
代码参考
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
/** 需求:
* 给定一个数据源
* "a","b","c","d"
* 通过累加器打印出多少个元素
*
* @author
* @date 2020/9/19 0:17
* @version 1.0
*/
object BatchCounter {
def main(args: Array[String]): Unit = {
//1.构建运行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.构建数据源
val dataSet = env.fromElements("a", "b", "c", "d")
val resultDataSet = dataSet.map(new RichMapFunction[String, String] {
//定义一个累加器
val counter: IntCounter = new IntCounter()
override def open(parameters: Configuration): Unit = {
getRuntimeContext.addAccumulator("MyAccumulator", counter)
}
override def map(value: String): String = {
counter.add(1)
value
}
})
resultDataSet.writeAsText("./data/BatchCounter")
val result = env.execute("BatchCounter")
// 获取累加数据
val value = result.getAccumulatorResult[Int]("MyAccumulator")
println("累加器的最终结果是:" + value)
}
}