在配置Consumer的时候,通常会有这么一行代码:
/**
* 订阅指定topic下tagA消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("test_url", "tag_A");
这行代码表示该consumer订阅了test_url这个topic下面的tag_A类型消息。这一行代码至少提供了两点信息,哪个consumer订阅了哪个topic信息,这个topic信息需要如何筛选。
Consumer订阅之后就可以顺利接收到test_url这个topic下的消息。那么这个subscribe到底做了什么呢?我们可以先猜测,在broker端应该存放了consumer、topic、tag的一个映射关系,这样要传递消息到consumer的时候,就可以根据这个映射进行过滤,确定要不要发送给consumer。接下来,我们就来一探究竟。
我们以最常用的推模式为例,subscribe方法内部由DefaultMQPushConsumerImpl实现,如下所示:
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression); //1、 构造订阅信息
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); // 2、本地存一份
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 3、发送心跳到所有的broker,心跳里面包含订阅信息
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
从上面可以看到,一共做了3件事情:
1-构造订阅信息
3-本地的负载均衡服务存一份订阅信息
4-通过心跳服务,将订阅信息发送给所有的broker。
下面我们一个个解析。
构造订阅信息
订阅信息都存在了SubscrptionData这个Bean里面,如下所示:
public class SubscriptionData implements Comparable<SubscriptionData> {
public final static String SUB_ALL = "*"; // 常量,表示订阅该topic下所有类型消息
private boolean classFilterMode = false; // 开启类过滤模式,默认不开启
private String topic;
private String subString; // 订阅表达式
private Set<String> tagsSet = new HashSet<String>(); // 如果是tag模式就是tag列表
private Set<Integer> codeSet = new HashSet<Integer>();// 如果是tag模式就是tag的hashcode列表
private long subVersion = System.currentTimeMillis();
private String expressionType = ExpressionType.TAG; // 表达式类型,TAG和SQL两种
@JSONField(serialize = false)
private String filterClassSource; // 如果开启了类过滤模式,这里存放过滤类java代码
}
基本所有的字段都有注释,很好理解。这里的classFilterMode单独拿出来说一下,RockeMQ提供了用户自定义过滤的能力。开启classFilter需要在broker配置文件里面通过设置如下参数:
Broker 所在的机器会启动多个 FilterServer 过滤服务,Consumer 启动后,通过subscribe方法会向 FilterServer 上传一个实现了MessageFilter接口的过滤 类;此时消费消息同传统的就不一样了:Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。由于5.0.0版本后,filterServer模块将会被移除,因此classFilter也就没有用武之地了,这里我们就不对其进行展开。
构造订阅信息的方法如下:
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
构造方法十分简单,稍微复杂点的是Tag和hashcode的保存。如果没有给出subString或者为*,那就是订阅所有Tag类型的消息,然后按照||进行分割Tag,把名称和hascode都保存下来即可。
以上方法是Tag模式下的订阅信息构造,如果需要使用SQL模式构造,那么必须使用更为通用的订阅方式:
// SQL过滤
consumer.subscribe("test_url", MessageSelector.bySql("name = 'gameloft9'"));
过滤的语法就是我们平时使用的SQL语法,有没有觉得很简单?同样的,最开始的那行代码也可以写成下面这个样子:
consumer.subscribe("test_url", MessageSelector.byTag("*"));
需要注意的是,表达式是Tag类型的话,那么消息是根据tag进行过滤的。表达式是SQL类型的话,那么消息是根据propeties进行过滤的。此时为了支持SQL过滤类型,broker配置文件还需要加上这句话:
enablePropertyFilter=true
SQL和Tag模式的订阅信息构造大同小异,就不再贴代码了。
负载均衡服务保存订阅信息
构造完订阅信息后,consumer会在自己本地存一份,具体存放的地方在负载均衡服务里面,如下所示:
为什么要存一份呢?因为做消息的负载均衡的时候会用到它,我们后面分析Consumer消费消息的时候会再提到,这里暂且略过。
发送订阅信息到所有broker
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
发送订阅信息是通过客户端发送心跳带上去的,由于我们不关注classFilter过滤模式,这里的uploadFilterClassSource就不讲了,只关注sendHeartbeatToAllBroker方法。
这里额外插一句话,细心的同学可能发现了,这里对发送心跳进行了上锁操作,而且使用的是tryLock()。这里需要多一句嘴解释下。RocketMQ底层的用于通信的MQClientInstance是进行了复用的,只要groupName,instanceName和unitName一致,那么在同一个jvm里面的不同的Consumer下面使用的都是同一个ClientInstance。既然是复用的,那么就可能存在并发,因此这里进行了上锁操作。但是为什么使用的是tryLock呢?上锁失败就不管了?
其实发送心跳在ClientInstance中有一个单独的任务线程执行,如下所示:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
因此,即使在consumer.subscribe()中发送心跳失败了,后面也会每隔30秒发送心跳。具体的可以参考之前的文章:RocketMQ如何维持心跳?
心跳的内容和后续broker端对心跳的处理,在这篇文章中也讲过了:
Broker处理心跳是在ClientManageProcessor中处理的,对于ProducerData的内容处理很简单,直接注册producer,把producer的ClientChannelInfo保存下来,后面与producer通讯的时候会用到。对于Consumer的处理就稍微复杂一点,除了注册consumer之外,如果消费分组配置不为空的话,还会创建一个用于重试的topic,这个在消息重新消费时有用。
对于心跳中订阅信息的处理,我们这里再稍微展开一点点。
在处理心跳时,对订阅信息的处理,放在了注册consumer里面,如下所示:
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(), // 订阅信息
isNotifyConsumerIdsChangedEnable
);
然后我们再看registerConsumer方法:
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// 找到对应的group信息,如果没有就新建
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新netty channel信息,后续通讯会用到
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新订阅信息!!!
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
// 派发订阅信息并处理
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
首先会试图找到该group的consumerGroupInfo,如果是第一次订阅,就是新创建一个。然后就是更新其中的channel信息和订阅信息!!由此,我们可以知道订阅信息最终存放在BrokerController–》ConsumerManager–》consumerTable–》GroupInfo里面。如下所示:
到这里,有的小伙伴会有疑问,这里只是保存的订阅信息,consumer的信息去哪里了?怎么知道是哪个consumer订阅的呢?到时候发消息,都不知道往哪里发。
其实,在处理心跳的时候,我们还保存了clinetId和channel的映射信息,如下所示:
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
这个信息同样也存下来了,正好也放在ConsumerGroupInfo里面,如下所示:
这样在broker根据订阅信息筛选完消息后,就可以通过consumerGroup里的channel传送消息回consumer,后面的负载均衡也是基于这个实现。限于篇幅,broker端的负载均衡后续再讲。
说到筛选信息,这里订阅信息保存完毕后,会触发一个Reisgter事件并由ConsumerIdsChangeListner处理:
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
这里就会注册一个ConsumerFilter,后续筛选信息就会使用这个filter进行过滤。
限于篇幅,消息过滤后续再讲。
小结
Consumer通过subscribe订阅消息之后就可以顺利接收到topic下的消息。在broker端ConsumerManager中存放着订阅信息,这样要传递消息到consumer的时候,就可以根据这个这个订阅信息进行过滤,并找到consumer的channel发送过去。