说到metaq的消费者balance策略,不得不说一下分区的有关信息。一个topic可以划分为n个分区。每个分区是一个有序的、不可变的、顺序递增的队列。
分区一方面是为了增大消息的容量(可以分布在多个分区上存,而不会限制在单台机器存储大小里),二方面可以类似看成一种并行度。
消费者的负载均衡与topic的分区数据紧密相关,需要考虑几种情况:
1、单个分组内的消费者数目如果比总得分区数目多的话,则多出来的消费者不参与消费。每个分区针对每个消费者group只挂一个消费者,同一个group的多余消费者不参与消费。
2、如果分组内的消费者数目比分区数目小,则有部分消费者要额外承担消息的消费任务。当分区数目n大于单个group的消费者数目m时,则有n%m个消费者需要额外承担1/n的消费任务。n足够大的时候可以认为负载平均分配。
综上所述,单个分组内的消费者集群的负载均衡策略如下:
①每个分区针对一个group只挂载一个消费者
②如果同一个group的消费者数目大于分区数目,则多出来的消费者不参与消费
③如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。
meta客户端处理消费者的负载均衡方式:将消费者列表和分区列表分别排序,然后按照上述规则做合理的挂载。如果某个消费者故障,其他消费者会感知到这一变化,然后重新进行负载均衡,保证所有分区都有消费者进行消费。
Consumer的balance策略实现在metaq中提供了两种:ConsisHashStrategy和DefaultLoadBalanceStrategy。
首先,来看DefaultLoadBalanceStrategy的实现:
public List<String> getPartitions(final String topic, final String consumerId, final List<String> curConsumers, final List<String> curPartitions) { // 每个订阅者平均挂载的partition数目 final int nPartsPerConsumer = curPartitions.size() / curConsumers.size(); // 挂载到额外partition的consumer数目 final int nConsumersWithExtraPart = curPartitions.size() % curConsumers.size(); log.info("Consumer " + consumerId + " rebalancing the following partitions: " + curPartitions + " for topic " + topic + " with consumers: " + curConsumers); final int myConsumerPosition = curConsumers.indexOf(consumerId); if (myConsumerPosition < 0) { log.warn("No broker partions consumed by consumer " + consumerId + " for topic " + topic); return Collections.emptyList(); } assert myConsumerPosition >= 0; // 计算当前consumer挂载的分区起点 final int startPart = nPartsPerConsumer * myConsumerPosition + Math.min(myConsumerPosition, nConsumersWithExtraPart); //计算当前consumer共挂载的分区数=每个consumer的挂载数+额外承担的分区数 final int nParts = nPartsPerConsumer + (myConsumerPosition + 1 > nConsumersWithExtraPart ? 0 : 1); if (nParts <= 0) { log.warn("No broker partions consumed by consumer " + consumerId + " for topic " + topic); return Collections.emptyList(); } final List<String> rt = new ArrayList<String>(); for (int i = startPart; i < startPart + nParts; i++) { final String partition = curPartitions.get(i); rt.add(partition); } return rt; }