文章目录
一、rocketmq作用
流量削峰:延时消费
数据采集:日志等数据的采集
异步解耦:系统之间解耦
二、rocketmq工作原理
消息的生产
路由表:实际是一个map,Key为topic名称,value是一个QueueData列表。简单说,路由表的key为topic名称,value为所有涉及该topic的BrokerName列表。
三、rocketmq的应用
发送消息
发送消息的三种方式
同步发送
异步发送
需要指定一个回调函数
单向发送:只发不收,效率最高,可靠性差
消息发送的四种状态
SEND_OK 发送成功
FLUSH_DISK_TIMEOUT 刷盘超时 刷盘策略为同步才会出现
FLUSH_SLAVE_TIMEOUT 主从同步超时 master-slave模式下才会出现
SLAVE_NOT_AVAILABLE 没有可用的从机 master-slave模式下才会出现
消费消息
消息消费者举例
消息消费的方式
广播模式
所有consumer都消费全部的消息
集群模式
每个consumer只消费自己需要的消息(默认模式)
顺序消费
问题描述
默认情况下,生产者会把消息以RoundRobin轮询方式发送到不同的queue分区队列;而消费消息时会从多个queue上拉取消息,这种情况下,生产和消费的顺序就得不到保证了。如果只将消息生产到一个队列,然后消费也只从一个队列取消息消费,则可以保证顺序。
有序分类
全局有序
当发送和消费参与的queue只有一个时,保证的有序是整个topic的有序,称为全局有序。
也就是一个topic只有一个queue。
分区有序
如果有多个queue参与,其仅可保证在该分区队列queue上的有序性,称为分区有序。
如何实现queue的选择
在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。
在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key,也可以是其他数据,但必须是唯一的。
一般性的选择算法是,让选择key(或其hash值)与该topic所包含的queue数量取模,其结果即为选择的Queue的QueueId。 一般性的作法是,从消息中获取到选择key,对其进行判断,若是当前Consumer需要消费的消息,则消费,不是则不做处理。
不属于那个Consumer的消息被拉取走了,那么应该消费这条消息的Consumer还能否拿到这条消息吗?同一个Queue中的消息,不可能被同一个Group中的不同Consumer消费。所以消费同一个Queue的不同选择key的消息的Consumer一定属于不同的Group,而不同的Group中的Consumer之间的消费是相互隔离的,互不影响的。
示例
producer:
rocketmq如何保证有序总结
首先单个分区(queue)是有序的,但多个分区,就无法保证consumer能顺序去取消息消费。此时需要在生产者的生产消息上加一个选择器selector,(此处举例订单号)对订单号取模投放分区,就能保证同一个订单的相关消息都投送到同一个分区,同一个分区的消息是有序的,所以此时consumer去消费就是有序的了。
如果consumer是集群分布的,那么就会有多个一样的consumer去消费同一个分区,此时就需要对分区加锁,来保证同时只能有一个consumer进入分区消费。
延时消费
概念
当消息写入到broker后,在经过指定时长后才会被消费的消息,称为延时消息。
采用rocketMQ的延时消息实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭的场景,12306订单超时未支付取消订票的场景。
在电商平台中,订单创建会发送一条延时消息,在30分钟后去后台业务系统(consumer),如果该条订单已经关闭,则不做处理;如果未关闭(未完成支付),则商品返回库存,关闭订单。
延时等级
延时消息的延时时长并不支持任意时长的延迟,而是按照等级划分的。延时等级定义在rocketMQ服务端的MessageStoreConfig类中。
即延时等级为1代表1s,延时等级为3代表10s
延时消息producer举例
与普通发送消息的区别就是加了一个延时等级。
事务消息
一个场景的解决方案
在这样一个场景中,如果1,2,3步都成功,也就是工行系统已经成功扣款后,而5失败了,那A用户已经少了钱,而B用户没有加钱,是否就会出现数据不一致呢?
此时就需要事务消息了。
基础概念
分布式事务
事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务的解决方案,一种分布式事务处理模式。
半事务消息
暂不能投递的消息,发送方已经成功将消息发给了broker,但是broker没有收到最终确认指令,此时该消息被标记为“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息称为“半事务消息”。
消息回查
消息回查,即重新查询本地事务的执行状态,本例就是重新到DB中查看预扣款操作是否成功。
XA模式三剑客
XA是一种分布式事务解决方案,分布式事务处理模式,基于XA协议。
XA中有三个重要组件:TC,TM,RM
TC
Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。
RocketMQ中broker充当着TC
TM
Transaction Manager,事务管理器。定义全局事务的范围,开始全局事务,提交或回滚全局事务。它实际是全局事务的发起者。
RocketMQ中producer充当着TM
RM
Resource Manager,资源管理器。管理分支事务处理的资源,驱动分支事务的注册和回滚。
RocketMQ中broker和producer均是RM
XA模式架构
执行原理
- TM向TC发起指令,开启一个全局事务
- 根据业务要求,RM会逐个向TC注册分支事务,TC会逐个向RM发送预执行指令
- RM在接收到预执行指令后,会在进行本地事务预执行
- RM将预执行结果返回给TC,当然可能成功也可能失败
- TC在接收到各个RM的预执行结果后,汇总给TM,TM根据汇总结果给TC下达指令。如果各个RM的预执行结果都成功,则发送global commit指令,如果有一个失败,则发送global rollback
- TC在接受到指令后,再次向RM发送确认指令
*注意:
- 事务消息不支持延时
- 事务消息要做好幂等性,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)
代码示例
事务监听器:
结果:tagB由于回滚了,所以失败了,而tagC回查的结果是成功。所以最终消费者可以成功消费消费的消息是tagA和tagC
批量消息
发送限制
- 批量发送的消息必须具有相同的topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息或事务消息
代码示例
为了防止批量发送的消息大于4M(单次消息最大容量),定义一个消息列表分割器。
消费者:
消息过滤
消费者在进行消息订阅时,除了可以指定要订阅消息的topic外,还可以通过消息过滤来指定比topic更细粒度的过滤条件。主要有两种方式:sql过滤和tag过滤。
tag过滤
通过consumer的subscribe()方法指定要订阅的消息tag,如果要订阅多个,可以用||来连接
sql过滤
通过特定的表达式对事先埋入到消息中的用户属性进行筛选过滤,通过sql过滤,可以实现复杂的消息过滤,不过,只有push模式的消费者才可以使用。
消息发送重试机制
producer对发送失败的消息重新发送的机制称为消息发送重试机制,也称为消息重投机制。
对于消息重投,需要注意以下几点:
- 生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway方式不会重试。
- 只有普通消息具有发送重试机制,顺序消息没有
- 消息重投机制可以尽可能保证消息发送成功,不丢失。但有可能造成消息重复,消息重复是rocketmq中无法避免的问题。
- 消息重复一般不会发生,当出现消息量大,网络抖动时,消息重复就会成为大概率事件
- producer主动重发,consumer负载变化(发生rebanlance,不会导致消息重复,但可能会导致重复消费)也会导致消息重复
- 消息重复不可避免,但要尽可能避免重复消费
- 避免消息重复消费的解决方案是,给消息增加唯一标识(例如消息key),使消费者对消息进行消费判断来避免消息重复消费
- 消息发送重试有三种策略可以选择:同步发送失败策略,异步发送失败策略,消息刷盘失败策略
消息发送重试,会尽量选择别的broker来重发,也即会发送到别的topic的别的queue。比如上一次发送消息失败的broker是brokerA,那么rocketmq认为这个brokerA重复发送失败的可能性大,所以选择brokerB来重发这条消息。即broker具有失败隔离功能。
一种实现失败隔离功能的思路:
Producer中维护一个concurrentHashmap,其中key是发送失败的时间戳,value是broker实例,再维护一个set,里面存着的是未发生失败的broker实例。选择目标broker是从该set集合里选择的,再定义一个定时任务,定期从map集合中将长期未发生异常的broker清理出去,移到set集合里。
顺序消息的重试
注意:顺序消息没有发送失败重试策略,但有消费失败重试策略
无序消息的重试
无序消息(普通消息,延时消息,事务消息),无序消息的重试只对集群消费方式生效,广播方式不提供重试。
死信队列
死信队列概念
若一条消息在一直消费失败的前提下,会在正常消费4小时46分后进行第16次重试,若仍然消费失败,则该消息会被投递到死信队列。
死信队列特征
- 死信队列中的消息不会再被消费者正常消费,即DLQ对消费者是不可见的
- 死信队列的存储期与普通队列一样,均为3天,3天后会被自动删除
- 死信队列就是一个特殊的topic,名称为%DLQ%consumerGroup@consumerGroup,即每个消费者组都有一个死信队列
- 如果一个消费者组没有产生死信消息,那么也不会为其创建对应的死信队列
四、rocketMQ本地搭建
代码地址
https://gitee.com/gbss/when-iwasfree 中的rocketmq-demo
启动rocketmq
bin目录下启动nameserver
.\mqnamesrv.cmd
bin目录下启动broker
mqbroker -n localhost:9876 -c ../conf/broker.conf autoCreateTopicEnable=true
启动rocketmq-console
gitee(或者github)下载rocketmq-console
可参考:
https://gitee.com/ralph81/rocketmq-console?_from=gitee_search
修改一下配置文件的地址和端口即可
为方便执行可打成jar包,之后只需要java -jar 即可
无法创建topic的错误:
解决方案:
https://my.oschina.net/u/3476125/blog/897429
这样broker就不会连接到远端的ip了
成功!:
多个消费者消费消息示例
现状是每个组(消费者生产者)都只有一个实例,所以生产和消费都是这俩(因为会选定topic)。那么当一个组内有多个消费者时,是谁来消费呢?还是都消费?
当不同组的消费者去抢同一个消息,又会怎么样呢?
要验证这个问题,那么是肯定需要在同一个消费者组创建多个实例的,那么就需要一个消费者工厂。
/**
* 消费者生产工厂
*/
@Component
public class ConsumerFactory {
// 存放所有的consumer实例,key为唯一标识
public static Map<String, DefaultMQPushConsumer> allDefaultMQPushConsumer;
// 存放所有的consumer实例名
public static List<String> allDefaultMQPushConsumerNames;
private static String nameSvrAddr = "localhost:9876";
public ConsumerFactory() {
allDefaultMQPushConsumer = new HashMap<String, DefaultMQPushConsumer>();
allDefaultMQPushConsumerNames = new ArrayList<String>();
}
/**
* 获取所有consumer名
* @return
*/
public static List<String> getAllDefaultMQPushConsumerNames(){
return allDefaultMQPushConsumerNames;
}
/**
* 获取已存在的consumer
* @param consumerGroup
* @param instanceName
*/
public static DefaultMQPushConsumer getSingleDefaultMQPushConsumer(String consumerGroup, String instanceName){
String key = consumerGroup + instanceName;
if(!allDefaultMQPushConsumer.containsKey(key)){
System.out.println("不包含该key");
return null;
}
return allDefaultMQPushConsumer.get(key);
}
/**
* 创建一个consumer
* @param consumerGroup 消费者组名
* @param topic 消费的topic
* @param tag 消费的tag 都消费填 "*"
* @return
*/
public static String getSingleDefaultMQPushConsumer(String consumerGroup, String instanceName, String topic, String tag) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameSvrAddr);
consumer.setInstanceName(instanceName);
// 指定topic和tag
consumer.subscribe(topic, tag);
// consumerGroup + instanceName作为key
final String key = consumerGroup + instanceName;
try {
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(key + " Received Message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("consumer init 成功");
allDefaultMQPushConsumerNames.add(key);
allDefaultMQPushConsumer.put(key, consumer);
} catch (MQClientException e) {
System.out.println("创建consumer失败" + e.getErrorMessage());
}
return "创建consumer成功";
}
/**
* 修改nameSvrAddr
*/
public void changeNameSvrAddr(String newName) {
nameSvrAddr = newName;
}
@PreDestroy
private void destroyAllConsumer() {
if (allDefaultMQPushConsumer != null && allDefaultMQPushConsumer.size() != 0) {
for(String name : allDefaultMQPushConsumerNames){
DefaultMQPushConsumer defaultMQPushConsumer = allDefaultMQPushConsumer.get(name);
defaultMQPushConsumer.shutdown();
}
}
System.out.println("所有consumer实例已经全部关闭");
}
}
两个场景:
同一个consumergroup内的多个consumer去抢同一个消息
初始化多个consumergroup,topic,tag一样的consumer
consumer:
{
"consumerGroup":"defaultGroup",
"instanceName":"consumer01",
"topic":"topic_test",
"tag":"*"
}
{
"consumerGroup":"defaultGroup",
"instanceName":"consumer02",
"topic":"topic_test",
"tag":"*"
}
{
"consumerGroup":"defaultGroup",
"instanceName":"consumer03",
"topic":"topic_test",
"tag":"*"
}
此时去发消息,得到如下结果:
可以看到,同一个消费者组的消费者,只有一个能消费到消息
多个consumergroup的consumer去抢同一个消息
初始化多个consumergroup不同但topic,tag一样的consumer
{
"consumerGroup":"Group01",
"instanceName":"consumer",
"topic":"topic_B",
"tag":"*"
}
{
"consumerGroup":"Group02",
"instanceName":"consumer",
"topic":"topic_B",
"tag":"*"
}
{
"consumerGroup":"Group03",
"instanceName":"consumer",
"topic":"topic_B",
"tag":"*"
}
这次我们消费topic_B
可以看到,同一条消息,每个消费者组的消费者都有一个消费到了。
结论
如果你想要一条消息被多个服务消费,那么这些服务必须散落在不同的consumergroup
同一条消息最多只能被一个consumergroup中的一个消费者消费