概述
Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向。Flink提供了8种StreamPartitioner:
- BroadcastPartitioner
- GlobalPartitioner
- RebalancePartitioner
- ShufflePartitioner
- RescalePartitioner
- ForwardPartitioner
- KeyGroupStreamPartitioner
- CustomPartitionerWrapper
StreamPartitioner继承自ChannelSelector接口。Channel是Flink对于数据写入目的地的简单抽象(下游并行算子的某subtask),我们可以直接认为它就是下游算子的并发实例。所有StreamPartitioner的子类都要实现selectChannel()方法,用来选择分区号。下面分别来看看他们的原理。
ChannelSelector
public interface ChannelSelector<T extends IOReadableWritable> {
//初始化下游channels数量,即下游subtask的数量
void setup(int var1);
/*
根据当前的record以及Channel总数,
决定应将record发送到下游哪个Channel。
不同的分区策略会实现不同的该方法。
*/
int selectChannel(T var1);
//是否以广播的形式发送到下游所有的算子实例
boolean isBroadcast();
}
StreamPartitioner
public abstract class StreamPartitioner<T> implements ChannelSelector<SerializationDelegate<StreamRecord<T>>>, Serializable
{
private static final long serialVersionUID = 1L;
protected int numberOfChannels;
public StreamPartitioner() {
}
//初始化下游channels数量,即下游subtask的数量
public void setup(int numberOfChannels) {
this.numberOfChannels = numberOfChannels;
}
//默认不以广播的方式发送到下游所有的算子实例
public boolean isBroadcast() {
return false;
}
public abstract org.apache.flink.streaming.runtime.partitioner.StreamPartitioner<T> copy();
}
BroadcastPartitioner
调用方法:dataStream.broadcast();
作用:发送到所有下游的算子实例。下游的实例都完整保存一份上游算子的数据,之后实例可以直接从本地获取数据。对于大的dataStream与小的dataStream关联处理,把小的dataStream广播给下游,能提高关联效率。
原理:不需要选择下游分区。
源码:
@Internal
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
public BroadcastPartitioner() {
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}
public boolean isBroadcast() {
return true;
}
public StreamPartitioner<T> copy() {
return this;
}
public String toString() {
return "BROADCAST";
}
}
GlobalPartitioner
调用方法:dataStream.global();
作用:只会将数据输出到下游算子的第一个实例()
原理:在selectChannel方法中固定返回编号为0的分区。
源码:
@Internal
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
public GlobalPartitioner() {
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
public StreamPartitioner<T> copy() {
return this;
}
public String toString() {
return "GLOBAL";
}
}
RebalancePartitioner
调用方法:dataStream.rebalance();
作用:通过循环的方式依次将数据发送给下游的实例
原理:随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,常用来处理有倾斜的原数据流。源码:
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int nextChannelToSendTo;
public RebalancePartitioner() {
}
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
//初始化channel的id,返回[0,numberOfChannels)的伪随机数
this.nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
this.nextChannelToSendTo = (this.nextChannelToSendTo + 1) % this.numberOfChannels;
return this.nextChannelToSendTo;
}
public StreamPartitioner<T> copy() {
return this;
}
public String toString() {
return "REBALANCE";
}
}
ShufflePartitioner
调用方法:dataStream.shuffle();
作用:将数据随机输出到下游算子的并发实例
原理:用java.util.Random随机地选择一个下游实例。
源码:
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private Random random = new Random();
public ShufflePartitioner() {
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return this.random.nextInt(this.numberOfChannels);
}
public StreamPartitioner<T> copy() {
return new ShufflePartitioner();
}
public String toString() {
return "SHUFFLE";
}
}
RescalePartitioner
调用方法:dataStream.rescale();;
作用:只会将数据输出到下游算子的第一个实例
原理:基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。
若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
源码:
@Internal
public class RescalePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int nextChannelToSendTo = -1;
public RescalePartitioner() {
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
if (++this.nextChannelToSendTo >= this.numberOfChannels) {
this.nextChannelToSendTo = 0;
}
return this.nextChannelToSendTo;
}
public StreamPartitioner<T> copy() {
return this;
}
public String toString() {
return "RESCALE";
}
}
ForwardPartitioner
调用方法:dataStream.forward();
作用:与GlobalPartitioner的实现相同。但它会将数据输出到本地运行的下游算子的第一个实例,而非全局。
原理:在selectChannel方法中固定返回编号为0的分区。
源码:
@Internal
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
public ForwardPartitioner() {
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
public StreamPartitioner<T> copy() {
return this;
}
public String toString() {
return "FORWARD";
}
}
KeyGroupStreamPartitioner
调用方法:dataStream.shuffle();
作用:就是keyBy()算子底层所采用的StreamPartitioner,
原理:先在key值的基础上经过了两重哈希得到key对应的哈希值,第一重是Java自带的hashCode(),第二重则是MurmurHash。然后将哈希值乘以算子并行度,并除以最大并行度,得到最终的分区ID
源码:
CustomPartitionerWrapper
调用方法:dataStream.partitionCustom(<Partitioner>);
作用:自定义的分区逻辑
原理:自定义的分区逻辑,通过继承Partitioner接口自己实现,并传入partitionCustom()方法。
如自定义分区:
dataStream.partitionCustom(new Partitioner<String>() {
@Override
public int partition(String key, int numPartitions) {
return key.length() % numPartitions;
}
},0);