1. Flink的并发执行
本章描述如何在Flink中配置程序的并发执行,一个Flink程序可以由不同的task(如:transformations/opterators,data sources及data sinks等)组成,一个task会分发到多个并发实例中运行,并且每个并发实例处理task的部分输入数据集。一个task的并发实例数叫做parallelism。在这之前有必要先了解下slot和parallelism的关系。
slot和parallelism
1.slot是指taskmanager的并发执行能力
taskmanager.numberOfTaskSlots:3
每一个taskmanager中的分配3个TaskSlot,3个taskmanager一共有9个TaskSlot
2.parallelism是指taskmanager实际使用的并发能力
parallelism.default:1
运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲。设置合适的并行度才能提高效率。
slot和parallelism总结
1.slot是静态的概念,是指taskmanager具有的并发执行能力
2.parallelism是动态的概念,是指程序运行时实际使用的并发能力
3.设置合适的parallelism能提高运算效率,太多了和太少了都不行
2.parallelism的设置
parallelism是可配置、可指定的。看下图:
parallelism的设定方式:
- 算子(operator)级别:可以通过设置flink的编程API修改过并行度;
- 运行环境级别:可以通过设置executionEnvironmentk的方法修改并行度;
- 客户端级别:可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度;
- 系统级别:可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度。
下面就分别举个例子吧:
算子(operator)级别
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
运行环境级别
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
客户端级别
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
系统级别
在./conf/flink-conf.yaml的parallelism.default项中指定。
优先级别
我们尽可能地不从系统上设置,而是针对不同的任务,自己内部设置。所以设置parallelism的防范优先级是:
算子(operator)级别 > 运行环境级别 > 客户端级别 > 系统级别