一、RabbitMQ介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开
发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景:
1、任务异步处理。
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
市场上还有哪些消息队列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
为什么使用RabbitMQ呢?
1、使得简单,功能强大。
2、基于AMQP协议。
3、社区活跃,文档完善。
4、高并发性能好,这主要得益于Erlang语言。
5、Spring Boot默认已集成RabbitMQ
二、使用MQ的好处
2.1实现异步处理
同步的通信:发出一个调用请求之后,在没有得到结果之前,就不返回。由调用者主动等待这个调用的结果。
异步通信:调用在发出之后,这个调用就直接返回了,所以没有返回结果。也就是说,当一个异步过程调用发出后,调用者不会马上得到结果。而是在调用发出后,
被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
2.2实现解耦
耦合是系统内部或者系统之间存在相互作用,相互影响和相互依赖。在我们的分布式系统中,一个业务流程涉及多个系统的时候,他们之间就会形成一个依赖关系。
在传统的通信方式中,订单系统发生了退货的动作,那么要依次调用所有下游系统的 API,比如调用库存系统的 API 恢复库存,因为这张火车票还要释放出去给其他乘客购买;调用支付系统的 API,不论是支付宝微信还是银行卡,要把手续费扣掉以后,原路退回给消费者;调用通知系统 API 通知用户退货成功。
// 伪代码 public void returnGoods(){ stockService.updateInventory (); payService.refund(); noticeService.notice();
这个过程是串行执行的,如果在恢复库存的时候发生了异常,那么后面的代码都不会执行。由于这一系列的动作,恢复库存,资金退还,发送通知,本质上没有一个严格的先后顺序,也没有直接的依赖关系,也就是说,只要用户提交了退货的请求,后面的这些动作都是要完成的。库存有没有恢复成功,不影响资金的退还和发送通知。
使用多线程
多线程或者线程池是可以实现的,但是每一个需要并行执行的地方都引入线程,又会带来线程或者线程池的管理问题。所以使用MQ
订单系统只需要把退货的消息发送到消息队列上,由各个下游的业务系统自己创建队列,然后监听队列消费消息。
在这种情况下订单系统里面就不需要配置其他系统的 IP、端口、接口地址了,因为它不需要关心消费者在网络上的什么位置,所以下游系统改 IP 没有任何影响。
甚至不需要关心消费者有没有消费成功,它只需要把消费发到消息队列的服务器上就可以了。这样,我们就实现了系统之间依赖关系的解耦。
2.3实现流量削锋
在很多的电商系统里面,有一个瞬间流量达到峰值的情况,比如京东的 618,淘宝的双 11,还有小米抢购。普通的硬件服务器肯定支撑不了这种百万或者千万级别的并发量,就像 2012 年的小米一样,动不动服务器就崩溃。如果通过堆硬件的方式去解决,那么在流量峰值过去以后就会出现巨大的资源浪费。那要怎么办呢?如果说要保护我们的应用服务器和数据库,
限流也是可以的,但是这样又会导致订单的丢失,没有达到我们的目的。
引入MQ,MQ是队列,一定有队列的特性,(先进先出)就可以先把所有的流量承接下来,转换成 MQ 消息发送到消息队列服务器上,业务层就可以根据自己的消费速率去处理这些消息,
处理之后再返回结果。就像我们在火车站排队一样,大家只能一个一个买票,不会因为人多就导致售票员忙不过来。如果要处理快一点,大不了多开几个窗口(增加几个消费者)。
总结起来:
1) 对于数据量大或者处理耗时长的操作,我们可以引入 MQ 实现异步通信,减少客户端的等待,提升响应速度。
2) 对于改动影响大的系统之间,可以引入 MQ 实现解耦,减少系统之间的直接依赖。
3) 对于会出现瞬间的流量峰值的系统,我们可以引入 MQ 实现流量削峰,达到保护应用和数据库的目的。
三、 RabbitMQ 中的概念模型
MQ的本质:消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息队列模型, 可以在分布式环境下扩展进程的通信
MQ的特点:
1、 是一个独立运行的服务。生产者发送消息,消费者接收消费,需要先跟服务器建立连接。
2、 采用队列作为数据结构,有先进先出的特点。
3、 具有发布订阅的模型,消费者可以获取自己需要的消息。
消息模型:
所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,
最后将消息发送到监听的消费者。
RabbitMQ的基本概念
下图是RabbitMQ的基本结构:
组成部分说明如下:
- Broker :消息队列服务,此进程包括两个部分:Exchange和Queue。
- Exchange :消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。队列使用绑定键(Binding Key)跟交换机建立绑定关系。
- Queue :消息队列,存储消息的队列,消息到达队列并转发给指定的消费方,它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Producer :消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
- Consumer :消息消费者,即消费方客户端,接收MQ转发的消息。
- Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Connection:无论是生产者发送消息,还是消费者接收消息,都必须跟Broker之间建立一个连接,这个是TCP长连接
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
Virtual Host理解如下图:
相关名词:
包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。
ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:
消息发布接收流程:
-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
四、 下载安装
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open
Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需
要安装Erlang/OTP,并保持版本匹配,如下图:
RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
1)下载erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe,以管理员方式运行此文件,安装。
erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添
加%ERLANG_HOME%\bin;
2)安装RabbitMQhttps://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3,以管理员方式运行此文件,安装
3)启动
- 安装成功后会自动创建RabbitMQ服务并且启动。
从开始菜单启动RabbitMQ,完成在开始菜单找到RabbitMQ的菜单:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动
2.如果没有开始菜单则进入安装目录下sbin目录手动启动:
1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
3) 注意事项:
1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang
搜索RabbitMQ、ErlSrv,将对应的项全部删除。
五、java操作队列
1、消息队列RabbitMQ的五种形式队列
1).点对点(简单)的队列
2).工作(公平性)队列模式
3.发布订阅模式
4.路由模式Routing
5.通配符模式Topics
2、简单队列
1)功能:一个生产者P发送消息到队列Q,一个消费者C接收
P表示为生产者 、C表示为消费者 红色表示队列。
点对点模式分析:
Maven依赖:
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> </dependencies>
封装Connection:
/** * 封装Connection */ public class MQConnectionUtils { public static Connection getConnection(){ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务器地址 factory.setHost("localhost"); //设置端口号 factory.setPort(5672); //设置用户名 factory.setUsername("guest"); //设置密码 factory.setPassword("guest"); //设置vhost factory.setVirtualHost("/admin_yehui"); try { //创建连接 return factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
参数详解:
1)声明交换机的参数
String type:交换机的类型,direct, topic, fanout 中的一种。
boolean durable:是否持久化,代表交换机在服务器重启后是否还存在。
2)声明队列的参数
boolean durable:是否持久化,代表队列在服务器重启后是否还存在。
boolean exclusive:是否排他性队列。排他性队列只能在声明它的 Connection中使用(可以在同一个 Connection 的不同的 channel 中使用),连接断开时自动删除。
boolean autoDelete:是否自动删除。如果为 true,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,队列会自动删除。
Map<String, Object> arguments:队列的其他属性
3)消息属性 BasicProperties
以下列举了一些主要的参数:
生产者:
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接,是否排他性队列。排他性队列只能在声明它的 Connection中使用(可以在同一个 Connection 的不同的 channel 中使用), 连接断开时自动删除。
* 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); String msg = "test_yehui_rabbitmq"; /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("消息发送体:"+msg); channel.close(); connection.close(); } }
消费者:
public class Consumer01 { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //定义消费方法 DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,true,consumer); } }
3、消息队列RabbitMQ应答模式
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。 如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。 没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。 消息应答是默认打开的。
我们通过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。
如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。
4、工作队列
work queues与简单队列相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
P表示为生产者 、C表示为消费者 红色表示队列。
工作队列分析
均摊消费
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
测试:
1、使用简单队列,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
RabbitMQ的公平转发
目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工作量很小,造成的原因就是近当消息到达队列进行转发消息。 并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。 为了解决这样的问题,我们可以使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。 换句话说,只有在消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息, 也就是等待消费者处理完毕并自己对刚刚处理的消息进行确认之后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去清闲。 通过 设置channel.basicQos(1);
生产者
public class Producer { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 /** * 参数1:队列的名称 * 参数2:是否持久化 * 参数3:是否独占此链接 * 参数4:队列不在使用时是否自动删除 * 参数5:队列参数 * */ channel.queueDeclare(QUEUE_NAME, false,false, false, null); /** * 发送消息 * 参数1: Exchange的名称,如果没有指定,则使用Default Exchange * 参数2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列 * 参数3:消息包含的属性 * 参数4:消息体 * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 * 示绑定或解除绑定认的交换机,routingKey等于队列名称 */ channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 for(int i=0;i<10;i++){ String msg = "test_yehui_rabbitmq"+i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } System.out.println("消息发送完毕"); channel.close(); connection.close(); } }
消费者1:
public class Consumer01 { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明队列 channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 //定义消费方法 DefaultConsumer consumer = new DefaultConsumer (channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); try { //睡眠1s Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
消费者2
public class Consumer02 { //队列名称 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 Channel channel = connection.createChannel(); //声明队列 如果Rabbit中没有此队列将自动创建 channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //得到交换机 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); //消息id long deliveryTag = envelope.getDeliveryTag(); //消息内容 String message = new String(body, "utf-8"); System.out.println("消费者消费:"+message); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //监听队列 /** * 参数1:队列名称 * 参数2: 设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 * 为false则需要手动回复 * 参数3:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_NAME,false,consumer); } }
结果;
消费者1比消费者2消费得少
5、RabbitMQ交换机的作用
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,
这和我们之前学习Nginx有点类似。 交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的,队列与直连类型的交换机绑定,需指定一个精确的绑定键,
生产者发送消息时会携带一个路由键。只有当路由键与其中的某个绑定键完全匹配时,这条消息才会从交换机路由到满足路由关系的此队列上。
例如:channel.basicPublish(“MY_DIRECT_EXCHANGE”,”spring”,”msg1”); 只有第一个队列能收到消息。
Fanout exchange(广播交换机)主题类型的交换机与队列绑定时,不需要指定绑定键。因此生产者发送消息到广播类型的交换机上,也不需要携带路由键。消息达到交换机时,所有与之绑定了的队列,都会收到相同的消息的副本。
例如:
channel.basicPublish("MY_FANOUT_EXCHANGE", "", "msg 4"); 三个队列都会收到 msg 4。
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列,
队列与主题类型的交换机绑定时,可以在绑定键中使用通配符。两个通配符:
# 0 个或者多个单词
* 不多不少一个单词
单词(word)指的是用英文的点“.”隔开的字符。例如 abc.def 是两个单词。
解读:第一个队列支持路由键以 junior 开头的消息路由,后面可以有单词,也可以没有。
第二个队列支持路由键以 netty 开头,并且后面是一个单词的消息路由。
第三个队列支持路由键以 jvm 结尾,并且前面是一个单词的消息路由。
例如:
channel.basicPublish("MY_TOPIC_EXCHANGE","junior.fjd.klj","msg 2"); 只有第一个队列能收到消息。
channel.basicPublish("MY_TOPIC_EXCHANGE","junior.jvm", "msg 3"); 第 一个队列和第三个队列能收到消息。
p是生产者 X是交换机 C1 、C2 是消费者
6、发布/订阅模式Publish/Subscribe
基本概念:
这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展。
功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
思路解读(重点理解):
(1)一个生产者,多个消费者
(2)每一个消费者都有自己的一个队列
(3)生产者没有直接发消息到队列中,而是发送到交换机
(4)每个消费者的队列都绑定到交换机上
(5)消息通过交换机到达每个消费者的队列 该模式就是Fanout Exchange(广播交换机)将消息路由给绑定到它身上的所有队列 以用户发邮件案例讲解
注意:交换机没有存储消息功能,如果消息发送到没有绑定消费队列的交换机,消息则丢失。
工作原理图:
生产者:
public class ProducerFanout { //交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel();
//声明交换机 /** * 参数1:交换机名称 * 参数2:交换机类型 */ channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //发送消息 /** * 参数1:交换机名称 * 参数2:路由key * 参数3:消息属性参数 * 参数4:消息实体 */ channel.basicPublish(EXCHANGE_NAME,"",null,"fanout_exchange_msg".getBytes()); channel.close(); connection.close(); } }
邮件消费者
public class ConsumerEmailFanout { //邮件队列 private static final String EMAIL_QUEUE = "email_queue"; //交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("邮件消费者"); //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel();
//声明一个队列 channel.queueDeclare(EMAIL_QUEUE,false,false,false,null); //绑定交换机 // 4.消费者绑定交换机 参数1 队列 名称 参数2交换机名称 参数3 routingKey channel.queueBind(EMAIL_QUEUE,EXCHANGE_NAME,"");
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; //消费者监听队列消息 channel.basicConsume(EMAIL_QUEUE,true,consumer); } }
短信消费者
public class ConsumerSMSFanout { //短信队列 private static final String SMS_QUEUE = "sms_queue"; //交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException { System.out.println("短信消费者"); //得到连接 Connection connection = MQConnectionUtils.getConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明一个队列 channel.queueDeclare(SMS_QUEUE,false,false,false,null); //绑定交换机 // 4.消费者绑定交换机 参数1 队列 名称 参数2交换机名称 参数3 routingKey channel.queueBind(SMS_QUEUE,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取生产者消息:" + msg); } }; //消费者监听队列消息 channel.basicConsume(SMS_QUEUE,true,consumer); } }
7、路由模式RoutingKey
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
生产者:
/** * 生产者 */ public class DirctProduct { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 //3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare("dirctExchange","direct"); //声明队列 channel.queueDeclare("emailDirectQueue",true,false,false,null); channel.queueDeclare("smsDirectQueue",true,false,false,null); //绑定交换机 //交换机和队列进行绑定 /** * 参数1:队列的名称 * 参数2:交换机的名称 * 参数3:路由key */ channel.queueBind("emailDirectQueue","dirctExchange","emailRoutKey"); channel.queueBind("smsDirectQueue","dirctExchange","smsRoutKey"); //发送消息 channel.basicPublish("dirctExchange","emailRoutKey",null,"Email邮件发送".getBytes()); channel.basicPublish("dirctExchange","smsRoutKey",null,"Sms发送发送".getBytes()); channel.close(); connection.close(); } }
邮件消费者
/** * 邮件消费者 */ public class EamilConsomer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("emailDirectQueue",true,consumer); } }
短信消费者
/** * 短信消费者 */ public class SmsConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("消息:" + msg); } }; channel.basicConsume("smsDirectQueue",true,consumer); } }
8、通配符模式Topics
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配,由交换机根据routingkey来转发消息到指定的队列。
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
生产者:
public class ProducerTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare("topicExchange","topic"); //声明队列 channel.queueDeclare("emailQueueTopic",true,false,false,null); channel.queueDeclare("smsQueueTopic",true,false,false,null); //绑定 channel.queueBind("emailQueueTopic","topicExchange","email.*"); channel.queueBind("smsQueueTopic","topicExchange","sms.*"); //发送消息 channel.basicPublish("topicExchange","email.log",null,"email邮件".getBytes()); channel.basicPublish("topicExchange","sms.log",null,"sms邮件".getBytes()); channel.close(); connection.close(); } }
邮件消费者:
public class EmailTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("邮件队列:"+new String(body)); } }; channel.basicConsume("emailQueueTopic",true,consumer); } }
短信消费者:
public class SmsTopic { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("短信队列:"+new String(body)); } }; channel.basicConsume("smsQueueTopic",true,consumer); } }
9、SpringBoot整合RabbitMQ
生产者:
maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>2.0.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- fastjson 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.29</version> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> </dependencies>
yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui
定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。
案例是用的是fanout交换机类型
@Configuration public class RabbitMQConfig { // 邮件队列 public static String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue"; // 短信队列 public static String FANOUT_SMS_QUEUE = "fanout_sms_queue"; //交换机 public static String EXCHANGE_NAME = "fanoutExchange"; //定义邮件队列 @Bean("fanoutEamilQueue") public Queue fanoutEamilQueue(){ return new Queue(FANOUT_EMAIL_QUEUE); } //定义短信队列 @Bean("fanoutSmsQueue") public Queue fanoutSmsQueue(){ return new Queue(FANOUT_SMS_QUEUE); } //定义交换机 @Bean("fanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE_NAME); } //将邮件队列绑定交换机 @Bean("bindingEmailExchange") public Binding bindingEmailExchange(@Qualifier("fanoutEamilQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //将邮件队列绑定交换机 @Bean("bindingSmsExchange") public Binding bindingSmsExchange(@Qualifier("fanoutSmsQueue")Queue queue, @Qualifier("fanoutExchange")FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
生产者投递消息
/** * 发送消息类 */ @Component public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EMAIL_QUEUE,msg); } }
控制层调用代码
@RestController public class RabbitController { @Autowired private FanoutProducer fanoutProducer; @RequestMapping("/index") public String index(){ fanoutProducer.send("邮件消息"); fanoutProducer.send("短信消息"); return "index"; } }
消费者
maven依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-parent</artifactId> <version>2.0.6.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- fastjson 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> </dependencies>
application.yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui server: port: 8081
监听消息
/** * 监听队列 */ @Component public class ReceiveHandler { /** * 邮箱 * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("邮箱消费者获取生产者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消费者获取生产者消息msg:" + msg); } }
10、消息确认机制
问题产生背景: 生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。而且有的时候我们在发送消息之后,
后面的逻辑出问题了,我们不想要发送之前的消息了,需要撤回该怎么做。
解决方案: 1.AMQP 事务机制
2.Confirm 模式
事务模式:
txSelect 将当前
channel设置为
transaction模式
txCommit 提交当前事务
txRollback 事务回滚
11、RabbitMQ消息重试机制
消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这时候应该如何处理?
答案:使用消息重试机制。(演示重试机制)
如何合适选择重试机制:
情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? (需要重试机制)
情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试?(不需要重试机制)需要发布进行解决。
如何实现重试机制 总结:
对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿
重试机制案例:
生产者代码就按照上面的案例就可以了,
消费者:
yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####开启消费者重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 server: port: 8081
/** * 监听队列 */ @Component public class ReceiveHandler { /** * 邮箱 * @param msg */ //rabbitmq 默认情况下 如果消费者程序出现异常的情况下,会自动实现补偿机制 //重试机制都是间隔性的 // 补偿(重试机制) 队列服务器 发送补偿请求 // 如果消费端 程序业务逻辑出现异常消息会消费成功吗? 是不能消费者成功的 //@RabbitListener 底层 使用Aop进行拦截,如果程序没有抛出异常,自动提交事务 // 如果Aop使用异常通知拦截 获取异常信息的话,自动实现补偿机制 ,该消息会缓存到rabbitmq服务器端进行存放,一直重试到不抛异常为准。 // 修改重试机制策略 一般默认情况下 间隔5秒重试一次 @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(String msg){ System.out.println("出现异常"); int i = 1/0; System.out.println("邮箱消费者获取生产者消息msg:" + msg); } /** * 短信 * @param msg */ @RabbitListener(queues={"fanout_sms_queue"}) public void receive_sms(String msg){ System.out.println("短信消费者获取生产者消息msg:" + msg); } }
调用第三方接口重试机制分析图:
重试机制调用第三方接口 @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); // 重试机制都是间隔性 JSONObject jsonObject = JSONObject.parseObject(msg); String email = jsonObject.getString("email"); String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email; System.out.println("邮件消费者开始调用第三方邮件服务器,emailUrl:" + emailUrl); JSONObject result = HttpClientUtils.httpGet(emailUrl); // 如果调用第三方邮件接口无法访问,如何实现自动重试. if (result == null) { throw new Exception("调用第三方邮件服务器接口失败!"); } System.out.println("邮件消费者结束调用第三方邮件服务器成功,result:" + result + "程序执行结束"); // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); } // 默认是自动应答模式 }
12、消费者如果保证消息幂等性,不被重复消费
产生原因:网络延迟传输中,消费出现异常或者是消费延迟消费,会造成MQ进行重试补偿,
在重试过程中,可能会造成重复消费。 消费者如何保证消息幂等性,
不被重复消费 解决办法:
①使用全局MessageID判断消费方使用同一个,解决幂等性。
②或者使用业务逻辑保证唯一(比如订单号码)
基于全局消息id区分消息,解决幂等性(重复消费)
生产者:
@RequestMapping("/send") public String send(){ String msg = "my_fanout_msg:" + System.currentTimeMillis(); //设置全局ID Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build(); System.out.println(msg + ":" + msg); fanoutProducer.send(message); return null; }
消费者
/** * 邮箱 使用全局ID * @param msg */ @RabbitListener(queues={"fanout_eamil_queue"}) public void receive_email(Message message){ System.out.println("出现异常"); String messageId = message.getMessageProperties().getMessageId(); int i = 1/0; System.out.println("邮箱消费者获取生产者消息msg:" + messageId); }
yml文件
spring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_yehui listener: simple: retry: ####开启消费者重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 3000 server: port: 8081
启动测试,重试的时候没有发生变化
13、SpringBoot整合RabbitMQ签收模式
//邮件队列 @Component public class FanoutEamilConsumer { @RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { System.out .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8") + ",messageId:" + message.getMessageProperties().getMessageId()); // 手动ack Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); // 手动签收 channel.basicAck(deliveryTag, false); } }
开启手动应答
pring: rabbitmq: ####连接地址 host: 127.0.0.1 ####端口号 port: 5672 ####账号 username: guest ####密码 password: guest ### 地址 virtual-host: /admin_host listener: simple: retry: ####开启消费者异常重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 2000 ####开启手动ack acknowledge-mode: manual
14属性设置
1.TTL(Time To Live)
1.1消息的过期时间
有两种设置方式:
1) 通过队列属性设置消息过期时间所有队列中的消息超过时间未被消费时,都会过期。
/** * 设置过期时间 * @return */ @Bean("ttlQueue") public Queue ttlQueue(){ Map<String,Object> map = new HashMap<>(); map.put("x-message-ttl", 11000);//队列中的消息未被消费11s后过期 return new Queue("ttlQueue",true,false,false,map); }
2)设置单条消息的过期时间
在发送消息的时候指定消息属性。
MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的过期属性,单位ms messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message("这条消息4秒后过期".getBytes(), messageProperties); rabbitTemplate.send(RabbitMQConfig.FANOUT_EMAIL_QUEUE,message); // 随队列的过期属性过期,单位ms rabbitTemplate.convertSendAndReceive(RabbitMQConfig.FANOUT_EMAIL_QUEUE, "消息发送");
如果同时指定了 Message TTL 和 Queue TTL,则小的那个时间生效。
2.死信队列
2.1概述
死信队列 听上去像 消息“死”了 其实也有点这个意思,死信队列 是 当消息在一个队列 因为下列原因:
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
队列超载变成了 “死信” 后 被重新投递(publish)到另一个Exchange 该Exchange 就是DLX 然后该Exchange 根据绑定规则 转发到对应的 队列上
监听该队列 就可以重新消费,说白了就是 没有被消费的消息 换个地方重新被消费
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
什么情况下消息会变成死信?
1)消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue== false
2)消息过期
3)队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes
(字节数),最先入队的消息会被发送到 DLX。
2.2.应用场景
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上
2.3使用死信交换机
定义业务(普通)队列的时候指定参数
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
死信队列配置
生产者
@Component public class FanoutConfig { /** * 定义死信队列相关信息 */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信队列 交换机标识符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; // 邮件队列 private String FANOUT_EMAIL_QUEUE = "fanout_email_queue"; // 短信队列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // fanout 交换机 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定义邮件队列 @Bean public Queue fanOutEamilQueue() { // 将普通队列绑定到死信队列交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args); return queue; } // 2.定义短信队列 @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定义交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.队列与交换机绑定邮件队列 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.队列与交换机绑定短信队列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } /** * 配置死信队列 * * @return */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }
消费者
@RabbitListener(queues = "fanout_email_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); Integer timestamp = jsonObject.getInteger("timestamp"); try { int result = 1 / timestamp; System.out.println("result:" + result); // 通知mq服务器删除该消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); // // 丢弃该消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } @Component public class DeadConsumer { @RabbitListener(queues = "dead_queue") public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("死信邮件消费者获取生产者消息msg:" + msg + ",消息id:" + messageId); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
消息留转图
六、消息可靠性
RabbitMQ 的可靠性投递,也就是在使用 RabbitMQ 实现异步通信的时候,消息丢了怎么办,消息重复消费怎么办
在 RabbitMQ 里面提供了很多保证消息可靠投递的机制,这个也是 RabbitMQ 的一个特性。
在理解消息可靠性投递的时候,必须明确一个问题,因为效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。所以如果是一
些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。比如发送通知或者记录日志的这种场景,如果用户没有收到通知,不会造成业务影响,只要再次发送就可以了。
6.1RabbitMq的工作模型
使用 RabbitMQ 收发消息的时候,有几个主要环节:
1 代表消息从生产者发送到 Broker
生产者把消息发到 Broker 之后,怎么知道自己的消息有没有被 Broker 成功接收?
2 代表消息从 Exchange 路由到 Queue
Exchange 是一个绑定列表,如果消息没有办法路由到正确的队列,会发生什么事情?应该怎么处理?
3 代表消息在 Queue 中存储
队列是一个独立运行的服务,有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,那么消息要一直存储在队列里面。如果队列出了问题,
消息肯定会丢失。怎么保证消息在队列稳定地存储呢?
4 代表消费者订阅 Queue 并消费消息
队列的特性是什么?FIFO。队列里面的消息是一条一条的投递的,也就是说,只有上一条消息被消费者接收以后,才能把这一条消息从数据库删掉,继续投递下一条
消息。那么问题来了,Broker 怎么知道消费者已经接收了消息呢?
6.2消息发送到 RabbitMQ 服务器
第一个环节是生产者发送消息到 Broker。可能因为网络或者 Broker 的问题导致消息
发送失败,生产者不能确定 Broker 有没有正确的接收。在 RabbitMQ 里面提供了两种机制服务端确认机制,也就是在生产者发送消息给
RabbitMQ 的服务端的时候,服务端会通过某种方式返回一个应答,只要生产者收到了
这个应答,就知道消息发送成功了。
第一种是 Transaction(事务)模式,第二种 Confirm(确认)模式。
Transaction(事务)模式
事务模式怎么使用呢?
通过一个 channel.txSelect()的方法把信道设置成事务模式,然后就可以发布消息给 RabbitMQ 了,如果 channel.txCommit();的方法调用成功,就说明事务提交成功,
则消息一定到达了 RabbitMQ 中。如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback()方法来实现事务回滚。
在事务模式里面,只有收到了服务端的 Commit-OK 的指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消
息没有发送完毕,不能发送下一条消息,它会榨干 RabbitMQ 服务器的性能。所以不建议在生产环境使用。
代码:
public class TransactionProducer { private final static String QUEUE_NAME = "ORIGIN_QUEUE"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(ResourceUtil.getKey("rabbitmq.uri")); // 建立连接 Connection conn = factory.newConnection(); // 创建消息通道 Channel channel = conn.createChannel(); String msg = "Hello world, Rabbit MQ"; // 声明队列(默认交换机AMQP default,Direct) // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { channel.txSelect(); // 发送消息 // String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes()); // int i =1/0; channel.txCommit(); System.out.println("消息发送成功"); } catch (Exception e) { channel.txRollback(); System.out.println("消息已经回滚"); } channel.close(); conn.close(); } }
springboot中的设置:
rabbitTemplate.setChannelTransacted(true);
Confirm(确认)模式
确认模式有三种,一种是普通确认模式。
在生产者这边通过调用 channel.confirmSelect()方法将信道设置为 Confirm 模式,然后发送消息。一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者,也就是调用 channel.waitForConfirms()返回 true,这样生产者就知道消息被服务端接收了。这种发送 1 条确认 1 条的方式消息还不是太高,所以我们还有一种批量确认的方式。
批 量 确 认 , 就 是 在 开 启 Confirm 模 式 后 , 先 发 送 一 批 消 息 。 只 要channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息都被服务端接收了。
批量确认的方式比单条确认的方式效率要高,但是也有两个问题,第一个就是批量的数量的确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升
不上去。数量多的话,又会带来另一个问题,比如我们发 1000 条消息才确认一次,如果前面 999 条消息都被服务端接收了,如果第 1000 条消息被拒绝了,那么前面所有的消息都要重发。
异步确认模式需要添加一个 ConfirmListener,并且用一个 SortedSet 来维护没有
被确认的消息。Confirm 模式是在 Channel 上开启的,因为 RabbitTemplate 对 Channel 进行了封装,叫做 ConfimrCallback。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("发送消息失败:" + cause); throw new RuntimeException("发送异常:" + cause); } } });
6.3消息在队列中的存储
是消息在队列存储,如果没有消费者的话,队列一直存在在数据库中。如果 RabbitMQ 的服务或者硬件发生故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,
所以要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘。
解决方案:
1.持久化队列
@Bean("durableQueue") public Queue queue(){ return new Queue("durableQueue",true,false,false,new HashMap<>()); }
2.持久化交换机
@Bean("fanoutDurableExchange") public FanoutExchange fanoutDurableExchange(){ // exchangeName, durable, exclusive, autoDelete, return new FanoutExchange("fanoutDurableExchange",true,false,new HashMap<>()); }
3.消息持久化
MessageProperties messageProperties = new MessageProperties(); //消息持久化 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration("4000"); // 消息的过期属性,单位ms messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 随队列的过期属性过期,单位ms rabbitTemplate.convertAndSend("faoutEmailQueue","324243232",messageProperties);
6.4消息投递到消费者
如果消费者收到消息后没来得及处理即发生异常,或者处理过程中发生异常,会导致失败。服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递
这条消息给其他消费者。RabbitMQ 提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送 ACK 给服务端。
没有收到 ACK 的消息,消费者断开连接后,RabbitMQ 会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。
消费者在订阅队列时,可以指定 autoAck参数,当 autoAck 等于 false 时,RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息。
如何设置手动 ACK?
application.properties
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
注意这三个值的区别:
NONE:自动 ACK
MANUAL: 手动 ACK
AUTO:如果方法未抛出异常,则发送 ack。
当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,
且不重新入队。当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会
发送 ACK。其他的异常,则消息会被拒绝,且 requeue = true 会重新入队。
public class SecondConsumer { @RabbitHandler public void process(String msgContent, Channel channel, Message message) throws IOException { System.out.println("Second Queue received msg : " + msgContent); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
如果消息无法处理或者消费失败,也有两种拒绝的方式,Basic.Reject()拒绝单条,Basic.Nack()批量拒绝。如果 requeue 参数设置为 true,可以把这条消息重新存入队列,
以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况。可以投递到新的队列中,或者只打印异常日志)。
思考:服务端收到了 ACK 或者 NACK,生产者会知道吗?即使消费者没有接收到消息,或者消费时出现异常,生产者也是完全不知情的。
例如,我们寄出去一个快递,是怎么知道收件人有没有收到的?因为有物流跟踪和签收反馈,所以寄件人可以知道。
在没有用上电话的年代,我们寄出去一封信,是怎么知道收信人有没有收到信件?只有收到回信,才知道寄出的信被收到了。
所以,这个是生产者最终确定消费者有没有消费成功的两种方式:
1) 消费者收到消息,处理完毕后,调用生产者的 API(思考:是否破坏解耦?)
2) 消费者收到消息,处理完毕后,发送一条响应消息给生产者
6.5消费者回调
1) 调用生产者 API
例如:提单系统给其他系统发送了碎屏保消息后,其他系统必须在处理完消息后调用提单系统提供的 API,来修改提单系统中数据的状态。只要 API 没有被调用,数据状态没有被修改,
提单系统就认为下游系统没有收到这条消息。
2) 发送响应消息给生产者
例如:商业银行与人民银行二代支付通信,无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。
6.6补偿机制
如果生产者的 API 就是没有被调用,也没有收到消费者的响应消息,怎么办?不要着急,可能是消费者处理时间太长或者网络超时。
生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有到响应的消息,可以设置一个定时重发的机制,但要发送间隔和控制次数,比如每隔 2分钟发送一次,最多重发 3 次,否则会造成消息堆积。重发可以通过消息落库+定时任务来实现。重发,是否发送一模一样的消息?
参考:
ATM 机上运行的系统叫 C 端(ATMC),前置系统叫 P 端(ATMC),它接收 ATMC
的消息,再转发给卡系统或者核心系统。
1)如果客户存款,没有收到核心系统的应答,不知道有没有记账成功,最多发送 5次存款确认报文,因为已经吞钞了,所以要保证成功;
2)如果客户取款,ATMC 未得到应答时,最多发送 5 次存款冲正报文。因为没有吐钞,所以要保证失败。
6.7消息幂等性
如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用 API 的时候出了问题,会不会出现消息的重复处理?例如:存款 100 元,ATM 重发了 5 次,核心系统
一共处理了 6 次,余额增加了 600 元。所以,为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ 服务端是没有这种控制的(同一批的消息有个递增的 DeliveryTag),
它不知道你是不是就要把一条消息发送两次,只能在消费端控制。如何避免消息的重复消费?消息出现重复可能会有两个原因:
1、生产者的问题,生产者重复发送消息,比如在开启了 Confirm 模式但未收到确认,消费者重复投递。
2、消费者出了问题,由于消费者未发送 ACK 或者其他原因,消息重复投递。
3、生产者代码或者网络问题。对于重复发送的消息,可以对每一条消息生成一个唯一的业务 ID,通过日志或者消息落库来做重复控制。