Flink 中的Physical partitioning(物理分区)及示例代码

Flink通过以下方法对转换后的确切流分区进行了低级控制。

Rebalancing(Round-robin partitioning)

分区元素循轮询,为每个分区创建相等的负载。有助于在数据不对称的情况下优化性能。在存在数据偏斜的情况下对性能优化有用。

val env =StreamExecytionEnvironment.getExecutionEnvionment

	env.socketTextStraem("Flink",9999)
	.rebalance
	.print() 
	.setParallelism(2)

	println(env.getExecytionPlan)
	env.execute("FlinkPartitionRebalancePolicy")

一般不写分区策略,默认使用的rebalance

Random partitioning

根据均匀分布对元素进行随机划分。

val env = StreamExecutionEnvironment.getExecutionEnvironment
	env.socketTextStream("Flink",9999)
	.shuffle
	.pritln()
	.setParallelism(2)

	println(env.getExecutionPlan)
	env.eecute("FlinkPartitionRandomPolicy")

Rescaling

该策略和RoundRobin分区策略一样,Rescaling分区策略通过一种循环方式将数据发送给下游的任务节点。但是不同点是,RoundRobin分区会将上游的数据全局性的以轮询的方式分发给下游节点。而Rescaling分区仅仅会对下游继承的算子进行负载均衡。例如上游并行度是2下游并行度是4,上有就会按照下游分区,等比例分发给下游的任务。

val env = StreamExecutionEnvironment.getExecutionEnvironment

	env.socketTextStream("Flink",9999)
	.rebalance
	.map(t => "[" + t + "]")
	.setParallelism(2)
	.rescale
	.print()
	.setParallelism(4)

env.execute("FlinkPartitionRescalingPolicy")

Broadcasting

将上游的数据广播给下游所有任务。

val env = StreamExecutionEnvironment.getExecutionEnvironment

	 env.socketTextStream("Flink", 9999)
	 .broadcast
	 .print()
	 .setParallelism
	 
	 println(env.getExecutionPlan)
	 env.execute("FlinkPartitionBroadcastPolicy")

Custom partitioning

val env = StreamExecutionEnvironment.getExecutionEnvironment

	  env.socketTextStream("Flink", 9999)
	  .map(t=>(t,1))
	  .partitionCustom(new Partitioner[String]{
    
    
	    override def partition(key: String, numPartitions: Int): Int = {
    
    
	
	      var partition=( key.hashCode() & Integer.MAX_VALUE ) % numPartitions
	
	        println(s"${key}\t${partition}")
	
	        partition
	    }
	  },(t:(String,Int)) =>t._1)
	  .print() 
	  .setParallelism(4)

env.execute("FlinkPartitionCustomPolicy")

猜你喜欢

转载自blog.csdn.net/gym02/article/details/105923091