正常发送消息
public MessageQueue selectOneMessageQueue() {
// 拿到 sendWhichQueue
int index = this.sendWhichQueue.incrementAndGet();
// 对 messageQueue 列表长度取余
int pos = Math.abs(index) % this.messageQueueList.size();
// 计算出来的 pos 如果小于 0, 就给个兜底 = 0
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
通过sendWhichQueue.incrementAndGet()获得一个随机值并保存
之后每次调用会使index值++
假设第一次随机到的是index=11,队列数为5,那么采用取余计算11%5为1,所以第一次会选择下标为1的队列,之后每次调用都会使index++,选择的队列顺序为:
1,2,3,4,0,1,2,3,4,0
实现了线性轮询的负载均衡算法
它可以将流量均匀地分发给不同的 MessageQueue,而 MessageQueue 分布在不同的 Broker 上,这样也达到了对最终 Message 存储的负载均衡,避免造成数据倾斜。
下面是获取index方法的内部
public int incrementAndGet() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
this.threadLocalIndex.set(index);
}
this.threadLocalIndex.set(++index);
return Math.abs(index & POSITIVE_MASK);
}
在首次进入 incrementAndGet() 逻辑时,index 肯定是 null,所以这里会随机生成一个数,而后续的调用都会在最初生成的随机值上自增。
在发送模式为SYNC时,重试发送消息
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
可以看到这里的逻辑也被 lastBrokerName 分成了两部分,这个 lastBrokerName 代表上次选择的 MessageQueue 所在的 Broker,并且它只会在第一次投递失败之后的后续重试流程中有值。
if (topicPublishInfo != null && topicPublishInfo.ok()) {
//......
MessageQueue mq = null;
for (; times < timesTotal; times++) {
// 第 2、3 次循环才会有值
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
//......
}
}
可以看到变量 mq 是定义在循环外的,所以在第一次正常投递 Message 时,它肯定为 null。只有在第 2、3 次循环时 mq 才有值,而进行到了 2、3 次就说明首次投递失败,需要重新进行选择。