Author:Allen_Huang
Version:1.0.0
分布式消息中间件-RabbitMQ
一、引言
主流的MQ产品:
RabbitMQ、Kafka、RocketMQ
为什么需要学习消息中间件?
消息中间件,顾名思义就是在多个服务之间传递消息的。通来说,Feign可以在微服务之间传递消息,但是Feign是同步调用的方式。消息中间件,其实就是一种异步的消息发送方式,来弥补Feign的调用的形式。
消息中间件的一些运用场景**(重点)**
1、消息异步化
2、消息广播
3、延迟消费
4、请求削峰
二、RabbitMQ的安装
1)准备docker-compose.yml模板文件
version: "3.1"
#管理各种容器
services:
#管理rabbitmq容器
rabbitmq:
image: rabbitmq:3.8.3-management
container_name: rabbitmq
restart: always
ports:
- 5672:5672
- 15672:15672
5672:是我们程序操作RabbitMQ的端口
15672:RabbitMQ提供的后台管理程序的端口
2)执行命令,启动容器
docker-compose up -d
3)打开浏览器访问RabbitMQ的控制程序
4)登录后台管理页面
账号密码:默认是guest
三、官网提供的RabbitMQ的各种模型图
地址:https://www.rabbitmq.com/getstarted.html
四、Java代码调用RabbitMQ
3.1 提供端
1)添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
2)编写提供者类
//1、建立和RabbitMQ服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.195.148");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//设置虚拟主机
//获得rabbitmq的连接
Connection connection = connectionFactory.newConnection();
//2、通过连接获得Channel对象(后续所有MQ的操作,都是通过Channel对象操作的)
Channel channel = connection.createChannel();
//3、创建队列
channel.queueDeclare("myqueue", false, false, false, null);
//4、给队列中发送消息
String msg = "Hello World!!!!!!";
channel.basicPublish("", "myqueue", null, msg.getBytes("utf-8"));
System.out.println("消息已经发送,提供者任务完成!!!!");
//关闭连接
connection.close();
3.2 消费端
1)添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
2)编写消费端代码
//1、建立和RabbitMQ服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.195.148");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//设置虚拟主机
//获得rabbitmq的连接
Connection connection = connectionFactory.newConnection();
//2、通过连接获得channel
Channel channel = connection.createChannel();
//3、通过Channel监听队列
channel.basicConsume("myqueue", true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//当有队列有消息时就会触发该方法消费队列中的消息
System.out.println("消费者获得消息:" + new String(body, "utf-8"));
}
});
3.3 注意
1、消费端需要实时的监控队列,所以不能关闭连接,一旦关闭则监控就会断开
2、提供者 和 消费者 那边需要先运行?- 那边创建队列,那边就需要先运行,在实际开发过程中,为了消除这种启动顺序,可以在提供端和消费端都进行同名队列和同名交换机的创建
3、消费端消费消息是同步的还是异步的?- 同步消费(单线程消费),在实际开发中,为了提高消费端的消费效率,会在消费端使用线程池并发消费
//创建一个容量为10的线程池 private static ExecutorService executorService = Executors.newFixedThreadPool(10); .... //3、通过Channel监听队列 channel.basicConsume("myqueue", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, final byte[] body) throws IOException { executorService.submit(new Runnable() { public void run() { try { //当有队列有消息时就会触发该方法消费队列中的消息 System.out.println("消费者获得消息:" + new String(body, "utf-8")); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } }); } });
4、RabbitMQ中,交换机(Exchange)没有保存数据的能力,如果一个消息发送给一个没有绑定队列的交换机,则消息直接丢失。
5、可以重复的创建同名的队列和交换机,但是如果第二次同名的队列属性不同了,就会报错
比如:先创建了一个名为myqueue的非持久化队列,如果再创建一个名为myqueue的持久化队列就会报错
五、RabbitMQ中方法参数的解释
声明交换机的方法参数
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
exchange - 声明的交换机名称(该交换机可以存在)
type - 交换机的类型,可选值 fanout|direct | topic | headers
durable - 交换机持久化,true表示交换机为持久化的,默认为false
autoDelete - 是否自动删除,默认为false,一个自动删除的交换机,如果有一个队列和该交换机绑定,然后再次解绑,则交换机自动删除
internal - 是否为内置交换机,默认为false,一个内置交换机只能从其他的交换机获得消息,提供者不能直接给该交换机发送消息
arguments - 用来设置一些额外属性
声明队列的方法参数
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete,Map<String, Object> arguments)
queue - 队列的名称(该队列可以存在)
durable - 是否持久化,默认false
exclusive - 是否为排他队列,默认为false。一个排他队列,只有创建这个队里的连接可以操作该队列,其他的连接是不可见该队列的
1)其他连接虽然不可见该排他队列,但是也不能再创建一个同名的队列了。
2)排他队列是和连接绑定在一起的,同一个连接下的多个Channel是可以共享排他队列的。autoDelete - 是否自动删除,默认为false。一个自动删除的队列,当最后一个消费者断开连接时就删除。
arguments - 设置一个队列的额外参数
六、消息的过期时间 - TTL(重要)
6.1 消息的过期时间(重要)
6.1.1 通过队列的方式,设置消息的过期时间(对队列中的所有消息有效)
//创建队列
Map<String, Object> map = new HashMap<>();
//设置队列中所有消息都有5秒的过期时间
map.put("x-message-ttl", 5000);
channel.queueDeclare("ttlqueue", true, false, false, map);
6.1.2 通过给消息本身设置过期时间(对当前消息有效)
//给单条消息设置过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.expiration("5000")
.build();
channel.basicPublish("", "durablequeue", properties, "Hello".getBytes("utf-8"));
注意:
1、队列只会移除队头过期的消息,如果过期的消息在队中,则不会被移除,当该消息到达队头时才会移除。
2、如果是通过队列给消息设置过期时间,那么过期的消息一定会出现在队头,不可能出现队中过期的情况。
6.2 队列的过期时间(了解)
Map<String, Object> map = new HashMap<>();
//设置队列本身的过期时间,到期后,队列自动删除
map.put("x-expires", 10000);
channel.queueDeclare("ttlqueue", true, false, false, map);
七、队列的类型
7.1 优先级队列
1)设置队列的优先级限制范围
Map<String, Object> map = new HashMap<>();
map.put("x-max-priority", 100);//最大优先级可选范围:0~255
channel.queueDeclare("myqueue", true, false, false, map);
2)设置消息本身的优先级
//设置消息本身的优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.priority(pri) //优先级的值
.build();
channel.basicPublish("myexchange", "", properties, ("消息!!!" + pri).getBytes("utf-8"));
注意:优先级队列,必须有消息堆积时才有意义,如果消息的消费速度 > 消息的发布速度,优先级队列是没有任何作用的
7.2 死信队列
什么是死信队列?
死信消息:一个普通的队列中,如果发生了消息过期的问题(有可能是其他原因),那么这个消息就会被丢弃,被丢弃的消息就称之为死信消息
死信路由:一个普通的队列,如果绑定了死信路由,则所有死信消息不会直接丢弃,而是转发给死信路由
死信队列:和死信路由绑定在一起的队列就称之为死信队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L28ZkXkq-1607930377822)(img/image-20200522095941084.png)]
什么时候会产生死信消息?
1、消息过期
2、队列已满,继续添加新的元素
3、消息被拒绝,并且requeue参数设置为false
死信队列的实现
//创建一个死信交换机
channel.exchangeDeclare("dead-exchange", "fanout",
true, false, false, null);
//创建死信队列
channel.queueDeclare("dead-queue", true, false, false, null);
//死信队列和死信交换机绑定
channel.queueBind("dead-queue", "dead-exchange", "");
//----------------------------------------------------------------
//创建一个普通的交换机
channel.exchangeDeclare("normal-exchange",
"fanout", true, false, false, null);
//创建一个普通的队列
//给普通的队列设置消息的过期时间,当消息过期后,就会产生死信消息了
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 5000);
//TODO 给普通的队列绑定死信交换机
map.put("x-dead-letter-exchange", "dead-exchange");
channel.queueDeclare("normal-queue", true, false, false, map);
//普通队列绑定普通交换机
channel.queueBind("normal-queue", "normal-exchange", "");
7.3 延迟队列
什么是延迟队列?
顾名思义,延迟队列就是指,消息发送出去后,过一段时间再进行消费(不是立刻消费),这种方式就是所谓的延迟队列。但是RabbitMQ官方没有提供延迟队列的实现,开发者可以通过ttl + 死信队列自己实现一个延迟队列的效果。
延迟队列的实现:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eC6xSQSp-1607930377825)(img/image-20200522102820637.png)]
注意:
1、因为过期的消息只有在队头才能变成死信效果,所以延迟队列必须配合队列的消息过期时间使用,不能采用消息本身的过期时间
2、如果业务需要很多不同的过期时间,那么就需要准备多个队列了
延迟队列的实际运用结构:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dj3YAFpV-1607930377826)(img/image-20200522110817654.png)]
延迟队列的实际运用场景:
订单超时关闭
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wUfEPtiA-1607930377828)(img/image-20200522112539952.png)]
注意:队列的延迟并不能解决一切的定时问题,延迟队列特别适合用来解决那种超过固定时间的问题,比如刚刚说的超过30分钟订单关闭的问题。
什么情况不合适延迟队列?
1、延迟的时间特别随机(5分38秒、2分29秒、1小时8分56秒…)
2、固定时间点的定时任务(下午3点、中午12点、凌晨2点…)方案评审
思考:
订单30分钟未支付超时关闭,假设有一个用户,下单后,进入支付页面(支付宝、微信支付…),忘记支付了,刚好到30分钟时,用户开始进行支付,同时后台进行状态的判断,用户未支付,关闭订单,但是同时用户支付成功,这种情况怎么解决?
八、消息的持久化
什么是消息的持久化?
消息的持久化也就是说当rabbitmq服务关闭后,消息不丢失,默认Rabbitmq的消息是非持久化的
消息持久化的方式
//设置消息的持久化 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder() .deliveryMode(2)//消息持久化模式 .build(); //也可直接使用该常量 //MessageProperties.PERSISTENT_TEXT_PLAIN; channel.basicPublish("normal-exchange", "", properties, msg.getBytes("utf-8"));
注意:
1、队列持久化不意味着消息持久化
2、队列如果不设置为持久化,消息持久化毫无意义
3、消息持久化其实就是将消息写入硬盘,如果所有的消息都持久化,必然会降低RabbitMQ服务的消息吞吐量,所以实际开发过程中,尽量只让需要的消息持久化
思考:
交换机持久化了、队列持久化了、消息也持久化了,是否就可以保证消费端一定能成功消费消息?
九、RabbitMQ的消息确认机制
9.1 提供者的消息确认机制
9.1.1 事务模式
//事务模式确认消息
channel.txSelect();//将channel切换成事务模式
try {
//设置消息的持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.deliveryMode(2)//消息持久化模式
.build();
channel.basicPublish("normal-exchange", "", properties, msg.getBytes("utf-8"));
//提交事务
channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
//回滚事务
channel.txRollback();
//进行消息的重试与补偿
}
9.1.2 publisher confirms模式
9.1.2.1 同步模式
//Publish Confirm模式
channel.confirmSelect();//设置为confirm模式
//发布消息
channel.basicPublish("normal-exchange", "", null, msg.getBytes("utf-8"));
//同步等待rabbitmq的响应
boolean falg = true;
try {
falg = channel.waitForConfirms();
} catch (InterruptedException e) {
e.printStackTrace();
falg = false;
}
if(!falg){
//进行重试或者补偿
}
9.1.2.2 异步模式
//Publish Confirm异步模式 - 推荐
channel.confirmSelect();
//设置异步的监听方法
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//如果消息成功,就会回调该方法
//deliveryTag - 表示成功的消息id
//multiple - 表示是否批量,如果为true表示批量成功,如果为false,表示单条成功
if (multiple){
System.out.println("id为" + deliveryTag + "消息以及之前的消息都成功了!");
//批量成功,将后续未成功的消息赋值给treeMap
treeMap = (TreeMap<Long, String>) treeMap.tailMap(deliveryTag + 1);
} else {
System.out.println("id为" + deliveryTag + "消息成功了!");
//消息处理成功,直接移除该消息
treeMap.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//如果消息失败,就回调该方法
//deliveryTag - 表示失败的消息id
//multiple - 表示是否批量,如果为true表示批量失败,如果为false,表示单条失败
if (multiple){
System.out.println("id为" + deliveryTag + "消息以及之前的消息都失败了!");
TreeMap failtreeMap = (TreeMap) treeMap.headMap(deliveryTag);
//对该失败的消息进行批量重试
} else {
System.out.println("id为" + deliveryTag + "消息失败了!");
String msg = treeMap.get(deliveryTag);
//对失败的消息msg,进行重试
}
}
});
//发布消息
for (int i = 0; i < 10000; i++) {
String msg = "消息" + i;
//获得下一条消息的发送编号
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("normal-exchange", "", null, msg.getBytes("utf-8"));
//发送消息后,缓存发送出去的消息
treeMap.put(seqNo, msg);
}
9.2 消费端的消息确认/拒绝机制
//手动确认必须把参数二设置为false
channel.basicConsume("dead-queue", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body, "utf-8"));
System.out.println("接收到消息的时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date()));
//进行相关的业务操作
//TODO 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
//TODO 手动拒绝
//参数三:requeue - true表示拒绝的消息重新放回队头,false拒绝的消息不会放回队头,直接变成死信消息
//参数三的意义到底在哪里?- 如果设置为true就一定要配合多消费者,不然可能出现死循环消费的情况
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
});
//实际开发过程中消息处理的正确逻辑
//获得消息
if(判断条件){
//处理业务
//手动确认消息
} else {
//手动拒绝消息
}
9.3 消息的限制机制
//限制消息的处理数量
channel.basicQos(100);
channel.basicConsume("dead-queue", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body, "utf-8"));
//没有手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
9.4 消息的补偿和重试机制
RabbitMQ本身没有提供任何的重试和补偿机制,需要开发者自行实现
重试:就在消息失败后,重新发送即可,一般来说不会无限制的重试,比如重试3次后,消息还是投递失败,那么就需要用到补偿机制
补偿机制:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ug9FTp6q-1607930377830)(img/image-20200522170654910.png)]
思考:结合消息的发送端确认机制、消费端确认机制以及重试补偿机制,有没有可能导致某个消息被发送了多次?
完全有可能出现消息重复投递或者重复消费的情况,通常来说需要将消息的消费端的业务设计成幂等方法
**什么是幂等方法?**对于相同的数据,反复调用,对最终结果不造成任何影响
幂等方法的设计方式:数据库字段设置唯一性、redis记录消息的标识等
ws IOException {
System.out.println(“接收到消息:” + new String(body, “utf-8”));
//没有手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
#### 9.4 消息的补偿和重试机制
> RabbitMQ本身没有提供任何的重试和补偿机制,需要开发者自行实现
**重试**:就在消息失败后,重新发送即可,一般来说不会无限制的重试,比如重试3次后,消息还是投递失败,那么就需要用到补偿机制
**补偿机制**:
[外链图片转存中...(img-Ug9FTp6q-1607930377830)]
**思考:**结合消息的**发送端确认机制**、**消费端确认机制**以及**重试补偿机制**,有没有可能导致某个消息被发送了多次?
> 完全有可能出现消息重复投递或者重复消费的情况,通常来说需要将消息的**消费端**的业务设计成**幂等方法**
>
> **什么是幂等方法?**对于相同的数据,反复调用,对最终结果不造成任何影响
>
> 幂等方法的设计方式:数据库字段设置唯一性、redis记录消息的标识等