SpringBoot利用Docker整合RabbitMQ
一、RabbitMQ介绍
大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。
消息服务中两个重要概念:消息代理(message broker)和目的地(destination);
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
消息队列主要有两种形式的目的地,即:
- 队列(queue):点对点消息通信(point-to-point)
- 主题(topic):发布(publish)/订阅(subscribe)消息通信
异步处理图:
点对点式:
- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
- 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者,即可以有多个接收者,但只要一个接收者可以接收到消息
发布订阅式:
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息
常见的有两种消息代理,即JMS和AMQP
JMS基于JVM消息代理的规范,ActiveMQ、HornetMQ是JMS实现
AMQP是高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现
JMS与AMQP的比较
Spring支持
- spring-jms提供了对JMS的支持
- spring-rabbit提供了对AMQP的支持
- 需要ConnectionFactory的实现来连接消息代理
- 提供JmsTemplate、RabbitTemplate来发送消息
- @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
- @EnableJms、@EnableRabbit开启支持
SpringBoot自动配置
- JmsAutoConfiguration
- RabbitAutoConfiguration
RabbitMQ中的核心概念
Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系,一个交换器可以绑定多个队列,一个队列也可以被多个交换器绑定。
Connection:网络连接,比如一个TCP连接。
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁
TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost 本质上就是一个迷你 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。一个消息队列服务器上可以有多个虚拟主机,一个虚拟主机中也可以有多个交换器。
Broker:表示消息队列服务器实体
关系图:
RabbitMQ运行机制
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
- Exchange类型:
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers ,headers 类型现在几乎不使用。
- direct类型:点对点类型,消息中的路由键(routingkey)如果和 Binding中的 bindingkey 完全一致,交换器就将消息发到对应的队列中。
- fanout类型:广播类型,每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
- topic类型:消息订阅类型,topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”,井号匹配0个或多个单词,星号匹配一个单词。
二、Docker操作
docker是安装在虚拟机中的CentenOS中的,docker的安装可以参考菜鸟教程。
启动docker
systemctl start docker
用docker拉取rabbitMQ镜像
# 之所以选择3-management,是因为management版本的带有web界面 [root@localhost ~]docker pull rabbitmq:3-management REPOSITORY TAG IMAGE ID CREATED SIZE rabbitmq 3-management df80af9ca0c9 4 weeks ago 149MB
查看拉取下来的rabbitMQ镜像
[root@localhost ~]docker images
创建并运行rabbitMQ容器
# -d是后台启动;-p是端口映射,第一个端口是linux服务器的端口,第二个端口是docker容器的端口,将linux服务器的端口与创建的rabbitmq容器的端口进行映射,5672是rabbitmq默认的端口,15672是rabbitmq web服务的端口;--name是给容器取名字;df80af9ca0c9是rabbitmq镜像的id [root@localhost ~]docker run -d -p 5672:5672 -p 15672:15672 --name myRabbitMQ df80af9ca0c9
查看docker中正在运行中的容器
[root@localhost ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 2082efdf44c0 df80af9ca0c9 "docker-entrypoint.s…" 5 minutes ago Up 5 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672-> 5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp myRabbitMQ
在window中打开浏览器输入http://192.168.0.156:15672访问,192.168.0.156是linux服务器的ip,可以使用ip addr命令查看,输入用户名:guest,密码:guest,就可以进入rabbitmq的web管理界面,如图所示:
三、Spring Boot整合RabbitMQ
1. 通过web界面向RabbitMQ中加入交换器和队列,以及交换器与队列之间的绑定规则
1)、添加交换器
点击Exchanges进入交换器管理页面,添加交换器
输入交换器的名字name,交换器的类型type,是否持久化默认是durable持久化(下次运行RabbitMQ时数据还存在),点击add exchnge添加,分别添加direct(单播)、fanout(广播)、topic(订阅)三种类型的交换器进行测试
2)、添加消息队列
点击Queues进入消息队列管理页面,添加消息队列
输入消息队列的名字name,是否持久化默认是durable持久化,点击add exchnge添加
3)、将交换器与消息队列绑定
点击Exchanges进入交换器管理页面,点击名字为exchange.direct的交换器,对该交换器进行管理
my01.queues是消息队列的名字,Routing key是路由键,点击Bind按钮进行绑定操作
4)、创建SpringBoot项目,引入RabbitMQ依赖
- 在application.properties配置文件中加入RabbitMQ的配置
# rabbitmq配置,注释的配置都是必须配置的,但我们使用默认值 spring.rabbitmq.host=192.168.0.114 #spring.rabbitmq.username=guest #spring.rabbitmq.password=guest #spring.rabbitmq.port=5672 #spring.rabbitmq.virtual-host=/
测试
@RunWith(SpringRunner.class) @SpringBootTest public class SpringBootAmqpApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendMessage04Test() { Map<String, Object> map = new HashMap<>(); map.put("message", "这是我们的第一个消息"); map.put("data", Arrays.asList("hello world", 123, "china")); /** * 单播,direct,其他模式也是一样的 * 可以通过convertAndSend发送消息 * 第一个参数是交换器的名字; * 第二个参数是路由键; * 第三个参数是消息对象,消息对象默认使用javajdk自带的序列化方式进行序列化,我们可以写自己的MessageConverter,让消息序列化成json串 */ rabbitTemplate.convertAndSend("exchange.direct", "my01.queues", map); } }
查看RabbitMQ中的消息
消息对象默认使用javajdk自带的序列化方式进行序列化,我们可以写自己的MessageConverter,让消息序列化成json串
序列化消息实体
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyRabiitConfig { /** * 将消息序列化成json串 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
序列化测试
重新运行测试类,不需要修改代码
2.通过程序创建交换器和队列,以及交换器与队列之间的绑定规则
@RunWith(SpringRunner.class) @SpringBootTest public class SpringBootAmqpApplicationTests { @Autowired private RabbitAdmin rabbitAdmin; @Test public void create() { // 创建一个交换器 rabbitAdmin.declareExchange(new TopicExchange("hsg.exchange.topic")); // 创建一个消息队列 rabbitAdmin.declareQueue(new Queue("hsg.queue")); /** * 创建绑定 * 第一个参数:目的地,可以是绑定的消息队列的名字 * 第二个参数:目的地类型,QUEUE、EXCHANGE * 第三个参数:交换器的名字 * 第四个参数:路由键,发送消息时的路由键与绑定时的路由键匹配,从而决定将消息发送给哪些消息队列 * 第五个参数:arguments的Map */ rabbitAdmin.declareBinding(new Binding("hsg.queue", Binding.DestinationType.QUEUE, "hsg.exchange.topic", "hsg.*", null)); } }
3.监听消息队列中的消息
开启RabbitMQ,在启动类上面加上@EnableRabbit注解
@SpringBootApplication @EnableRabbit public class SpringBootAmqpApplication {
监听指定的消息队列,在监听方法上面加上@RabbitListener注解
@Service public class BookService { private final static Logger log = LoggerFactory.getLogger(BookService.class); /** * queues是消息队列的名字,是一个数组,可以监听多个消息队列 * 参数是消息的类型 * @param bookEntity */ @RabbitListener(queues={"my01.queues"}) public void receive(BookEntity bookEntity) { log.info("接收到的消息:{}", bookEntity); } }
测试
向与消息队列名为“my01.queues”绑定的交换器中发送消息,消息的路由键要能匹配到”my01.queues”消息队列,当消息到达”my01.queues”消息队列中时,监听”my01.queues”消息队列的方法就会获取到消息并执行方法