写在前面:本文采用rabbitmq环境是docker单节点。
项目地址:https://github.com/Blankwhiter/AMQP
一、搭建rabbitmq环境
在centos窗口中,执行如下命令拉取镜像,以及创建容器:
docker pull rabbitmq:3.7-management
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq-single rabbitmq:3.7-management
注:5672 -> 客户端与rabbitmq的通信端口 15672 -> 图形化管理界面端口
创建容器后,在浏览器输入http://192.168.10.170:15672
注:192.168.10.170是虚拟机ip ,默认的username/password为guest/guest
二、springboot集成rabbitmq
1.template操作rabbitmq 收发消息。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>template</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>template</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--引入序列化
由于本次测试没有引入spring-boot-starter-web模块,故引入jackson-databind
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
<scope>compile</scope>
</dependency>
<!--引入amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
rabbitmq:
host: 192.168.9.219
port: 5672
username: guest
password: guest
AmqpConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
测试类,读者请按照步骤依次执行
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TemplateApplicationTests {
/**
* rabbitmq 给rabbitmq 收发消息
*/
@Autowired
RabbitTemplate rabbitTemplate;
/**
* amqpTemplate 给amqp中间件 收发消息, 推荐使用
*/
@Autowired
AmqpTemplate amqpTemplate;
/**
* 交换器、队列管理
* amqp系统管理功能组件
*/
@Autowired
AmqpAdmin amqpAdmin;
/**
* 第一步:
* 使用amqpAdmin初始化。创建交换器,队列,以及交换器绑定队列
* 执行完成。访问 http://192.168.9.219:15672/#/exchanges/%2F/exchange.direct 可以查看具体信息
*/
@Test
public void init() {
/**
* exchange 类型主要分DirectExchange FanoutExchange TopicExchange HeadersExchange
* 具体区别更多详情请查看 https://blog.csdn.net/belonghuang157405/article/details/83184388 关于amqp相关内容
*/
//创建交换器 点对点模式
amqpAdmin.declareExchange(new DirectExchange("exchange.direct"));
//创建队列 queue第二个参数:是否持久化
amqpAdmin.declareQueue(new Queue("direct-queue",true));
//交换器绑定队列
amqpAdmin.declareBinding(new Binding("direct-queue", Binding.DestinationType.QUEUE,"exchange.direct","fruit.apple",null));
}
/**
* 第二步:
* 发送消息
*/
@Test
public void sendMessage(){
//第一种.使用send 需要自己构造一个Message,定义消息内容以及消息头
//rabbitTemplate.send("exchange.direct","fruit.apple",new Message("apple-message-made".getBytes(),null));
//第二种.使用convertAndSend 第三个参数object默认当初消息体,自动序列化发送给rabbitmq
HashMap<Object, Object> map = new HashMap<>();
map.put("type","red apple");
map.put("data", Arrays.asList(1,2,3));
// convertAndSend 使用的转换器是SimpleMessageConverter 会采用jdk序列方式,
// 但往往大多数会想使用json方式,故多编写一个配置类:AmqpConfig
// 这时候再去http://192.168.9.219:15672/#/queues/%2F/direct-queue界面上Get messages时候查看消息就是json格式
rabbitTemplate.convertAndSend("exchange.direct","fruit.apple",map);
}
/**
* 第三步:
* 接收消息
*/
@Test
public void receiveMessage(){
Object message = rabbitTemplate.receiveAndConvert("direct-queue");
System.out.println(message.getClass());
System.out.println(message.toString());
}
}
2.使用@RabbitListene注解,监听消息
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>annotation</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>annotation</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--引入amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
rabbitmq:
host: 192.168.9.219
port: 5672
username: guest
password: guest
AmqpConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
测试类,读者请按照步骤依次执行
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
@RunWith(SpringRunner.class)
@SpringBootTest
public class AnnotationApplicationTests {
/**
* rabbitmq 给rabbitmq 收发消息
*/
@Autowired
RabbitTemplate rabbitTemplate;
/**
* amqpTemplate 给amqp中间件 收发消息, 推荐使用
*/
@Autowired
AmqpTemplate amqpTemplate;
/**
* 交换器、队列管理
* amqp系统管理功能组件
*/
@Autowired
AmqpAdmin amqpAdmin;
/**
* 第一步:
* 使用amqpAdmin初始化。创建交换器,队列,以及交换器绑定队列
* 执行完成。访问 http://192.168.9.219:15672/#/exchanges/%2F/exchange.fanout 可以查看具体信息
*/
@Test
public void init() {
/**
* exchange 类型主要分DirectExchange FanoutExchange TopicExchange HeadersExchange
* 具体区别更多详情请查看 https://blog.csdn.net/belonghuang157405/article/details/83184388 关于amqp相关内容
*/
//创建交换器 广播模式
amqpAdmin.declareExchange(new FanoutExchange("exchange.fanout"));
//创建队列 queue第二个参数:是否持久化
amqpAdmin.declareQueue(new Queue("fanout-queue1",true));
amqpAdmin.declareQueue(new Queue("fanout-queue2",true));
amqpAdmin.declareQueue(new Queue("fanout-queue3",true));
//交换器绑定队列
amqpAdmin.declareBinding(new Binding("fanout-queue1", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-1",null));
amqpAdmin.declareBinding(new Binding("fanout-queue2", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-2",null));
amqpAdmin.declareBinding(new Binding("fanout-queue3", Binding.DestinationType.QUEUE,"exchange.fanout","fruit.orange-3",null));
}
/**
* 第二步:
* 发送消息
*/
@Test
public void sendMessage(){
//第一种.使用send 需要自己构造一个Message,定义消息内容以及消息头
//rabbitTemplate.send("exchange.fanout","fruit.apple",new Message("apple-message-made".getBytes(),null));
//第二种.使用convertAndSend 第三个参数object默认当初消息体,自动序列化发送给rabbitmq
HashMap<Object, Object> map = new HashMap<>();
map.put("type","red orange");
map.put("data", Arrays.asList(1,2,3));
// convertAndSend 使用的转换器是SimpleMessageConverter 会采用jdk序列方式,
// 但往往大多数会想使用json方式,故多编写一个配置类:AmqpConfig
// 这时候再去http://192.168.9.219:15672/#/queues/%2F/fanout-queueX界面上Get messages时候查看消息就是json格式
rabbitTemplate.convertAndSend("exchange.fanout","",map);
}
}
按照步骤运行完上述代码,编写MessageService类,并启动项目。
MessageService.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@Service
public class MessageService {
@RabbitListener(queues = "fanout-queue1")
public void receiveQueue1Message(HashMap<String,Object> map){
System.out.println("fanout-queue1 begin");
Object message = map.getOrDefault("data", "no message");
System.out.println(message.toString());
System.out.println("fanout-queue1 end");
}
@RabbitListener(queues = "fanout-queue2")
public void receiveQueue2Message(HashMap<String,Object> map){
System.out.println("fanout-queue2 begin");
Object message = map.getOrDefault("data", "no message");
System.out.println(message.toString());
System.out.println("fanout-queue2 end");
}
@RabbitListener(queues = "fanout-queue3")
public void receiveQueue3Message(HashMap<String,Object> map){
System.out.println("fanout-queue3 begin");
Object message = map.getOrDefault("data", "no message");
System.out.println(message.toString());
System.out.println("fanout-queue3 end");
}
}
启动完成后 控制台打印如下信息:
fanout-queue3 end
fanout-queue1 begin
[1, 2, 3]
fanout-queue1 end
fanout-queue2 begin
[1, 2, 3]
fanout-queue2 end
fanout-queue3 begin
[1, 2, 3]
fanout-queue3 end