RabbitMQ的学习和使用
本文是基于CentOS 7.3系统环境,进行RabbitMQ的学习和使用
- rabbitmq-server-3.7.9-1
- CentOS 7.3
一、RabbitMQ简介
(1) 什么是RabbitMQ
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
(2) ACK的消息确认机制
ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的。
(3) RabbitMQ的五种工作模式
- 简单模式
一个生产者,一个消费者 - work模式
一个生产者,多个消费者,每个消费者获取到的消息唯一。 - 广播模式-fanout
一个生产者发送的消息会被多个消费者获取 - 路由模式-direct
发送消息到交换机并且要指定路由routingkey,消费者将队列绑定到交换机时需要指定路由routingkey - 订阅模式-topic
将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词
(4) RabbitMQ持久化
- 交换机持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
- 队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- 消息持久化
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
二、SpringBoot整合RabbitMQ
- 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 添加配置
spring:
rabbitmq:
host: 192.168.0.209
port: 5672
username: zs200a
password: zs200a
virtual-host: /zs200a
template:
exchange: ZS200A.USER.EXCHANGE
- 监听类
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key={"#.#"}
))
public void listen(String msg){
System.out.println(msg);
}
}
- 测试发送类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMQServiceApplication.class)
public class SendMsg {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "Hello, Spring boot amqp";
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b",msg);
Thread.sleep(10000);
}
}