Flink通过以下方法对转换后的确切流分区进行了低级控制。
Rebalancing(Round-robin partitioning)
分区元素循轮询,为每个分区创建相等的负载。有助于在数据不对称的情况下优化性能。在存在数据偏斜的情况下对性能优化有用。
val env =StreamExecytionEnvironment.getExecutionEnvionment
env.socketTextStraem("Flink",9999)
.rebalance
.print()
.setParallelism(2)
println(env.getExecytionPlan)
env.execute("FlinkPartitionRebalancePolicy")
一般不写分区策略,默认使用的rebalance
Random partitioning
根据均匀分布对元素进行随机划分。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("Flink",9999)
.shuffle
.pritln()
.setParallelism(2)
println(env.getExecutionPlan)
env.eecute("FlinkPartitionRandomPolicy")
Rescaling
该策略和RoundRobin分区策略一样,Rescaling分区策略通过一种循环方式将数据发送给下游的任务节点。但是不同点是,RoundRobin分区会将上游的数据全局性的以轮询的方式分发给下游节点。而Rescaling分区仅仅会对下游继承的算子进行负载均衡。例如上游并行度是2下游并行度是4,上有就会按照下游分区,等比例分发给下游的任务。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("Flink",9999)
.rebalance
.map(t => "[" + t + "]")
.setParallelism(2)
.rescale
.print()
.setParallelism(4)
env.execute("FlinkPartitionRescalingPolicy")
Broadcasting
将上游的数据广播给下游所有任务。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("Flink", 9999)
.broadcast
.print()
.setParallelism
println(env.getExecutionPlan)
env.execute("FlinkPartitionBroadcastPolicy")
Custom partitioning
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("Flink", 9999)
.map(t=>(t,1))
.partitionCustom(new Partitioner[String]{
override def partition(key: String, numPartitions: Int): Int = {
var partition=( key.hashCode() & Integer.MAX_VALUE ) % numPartitions
println(s"${key}\t${partition}")
partition
}
},(t:(String,Int)) =>t._1)
.print()
.setParallelism(4)
env.execute("FlinkPartitionCustomPolicy")