RabbitMQ的学习和使用

RabbitMQ的学习和使用

本文是基于CentOS 7.3系统环境,进行RabbitMQ的学习和使用


一、RabbitMQ简介

(1) 什么是RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

(2) ACK的消息确认机制

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。

如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。

消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。

消息的ACK确认机制默认是打开的。

(3) RabbitMQ的五种工作模式

  • 简单模式
    一个生产者,一个消费者
  • work模式
    一个生产者,多个消费者,每个消费者获取到的消息唯一。
  • 广播模式-fanout
    一个生产者发送的消息会被多个消费者获取
  • 路由模式-direct
    发送消息到交换机并且要指定路由routingkey,消费者将队列绑定到交换机时需要指定路由routingkey
  • 订阅模式-topic
    将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词

(4) RabbitMQ持久化

  • 交换机持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
  • 队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  • 消息持久化
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

二、SpringBoot整合RabbitMQ

  • 引入依赖
<dependency>
	<groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 添加配置
spring:
  rabbitmq:
    host: 192.168.0.209
    port: 5672
    username: zs200a
    password: zs200a
    virtual-host: /zs200a
    template:
      exchange: ZS200A.USER.EXCHANGE
  • 监听类
@Component
public class Listener {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),

            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key={"#.#"}
    ))
    public void listen(String msg){
        System.out.println(msg);
    }
}
  • 测试发送类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMQServiceApplication.class)
public class SendMsg {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException {
        String msg = "Hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b",msg);
        Thread.sleep(10000);
    }
}

原创文章 98 获赞 143 访问量 167万+

猜你喜欢

转载自blog.csdn.net/qq_32599479/article/details/105975660