服务异常通讯高级篇三(惰性队列、MQ集群)
消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有三种思路:
- 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
要提升队列容积,把消息保存在内存中显然是不行的。
1、惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
1.1、基于命令行设置lazy-queue
而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一个策略Lazy
:策略名称,可以自定义"^lazy-queue$"
:用正则表达式匹配队列的名字'{"queue-mode":"lazy"}'
:设置队列模式为lazy模式--apply-to queues
:策略的作用对象,是所有的队列
1.2、基于SperingAMQP代码完成惰性设置的两种方式
基于@Bean声明lazy-queue
package cn.itcast.mq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 项目名称:mq-advanced-demo
* 描述:惰性队列创建
*
* @author zhong
* @date 2022-06-01 19:12
*/
@Configuration
public class LazyConfig {
/**
* 创建惰性队列
* @return
*/
@Bean
public Queue layzQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy()
.build();
}
/**
* 创建普通队列
* @return
*/
@Bean
public Queue normalQueue() {
return QueueBuilder.durable("norma.queue")
.build();
}
}
基于@RabbitListener声明LazyQueue
/**
* 基于注解的开发惰性队列
* @param msg
*/
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到lazy.queue的消息:{}"+msg);
}
启动消费者项目创建队列
1.3、测试向两个队列发100万条数据
/**
* 测试惰性队列
*/
@Test
public void testlayzMessage() {
// 1、准备消息
for (int i = 0; i < 100000;i++) {
Message build = MessageBuilder.withBody("hellow lazy.queue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 2、发送消息
rabbitTemplate.convertAndSend("lazy.queue",build);
}
}
/**
* 测试普通队列
*/
@Test
public void testnormaMessage() {
// 1、准备消息
for (int i = 0; i < 100000;i++) {
Message build = MessageBuilder.withBody("hellow norma.queue".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
// 2、发送消息
rabbitTemplate.convertAndSend("norma.queue",build);
}
}
运行两个程序,查看管理页面的队列消息如下图所示:
norma.queue普通队列
layz.queue惰性队列
1.4、总结
消息堆积问题的解决方案?
- 队列上绑定多个消费者,提高消费速度
- 使用惰性队列,可以再mq中保存更多消息
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
2、MQ集群
集群的部署
需要参考部署文档
2.1、集群分类
RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:
• 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
• 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。
2.2、普通集群
普通集群,或者叫标准集群(classic cluster),具备下列特征:
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
结构如图:
2.3、镜像集群
镜像集群:本质是主从模式,具备下面的特征:
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主
结构如图:
2.4、冲裁队列
集群特征
仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
Java代码创建仲裁队列
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}
SpringAMQP连接MQ集群
注意,这里用address来代替host、port方式
spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073
username: itcast
password: 123321
virtual-host: /