RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用。RabbitMQ 凭借其高可靠、易扩展、高可用及丰富的功能特性收到越来越多企业的青睐。在实现的项目开发中,经常使用Json、Map格式数据。下面将介绍RabbitMQ实现Json、Map格式数据的发送与接收。
(1)创建 SpringBoot 项目,并整合 RabbitMQ框架
在pom.xml配置信息文件中,添加相关依赖文件:
<!-- AMQP客户端 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在 application.yml 配置文件中配置 RabbitMQ 服务:
spring:
# 项目名称
application:
name: rabbitmq-provider
# RabbitMQ服务配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 消息确认(ACK)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
(2)RabbitMQ 配置类
在项目中,创建配置类,配置消息确认,Json转换器,队列名称等,并将队列交由 IoC 管理。代码如下:
package com.pjb.config;
import com.pjb.receiver.AckReceiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置类
* @author pan_junbiao
**/
@Configuration
public class RabbitMqConfig
{
public static final String DIRECT_QUEUE_NAME = "direct_queue_name"; //队列名称
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange_name"; //交换器名称
public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
{
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置Json转换器
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
/**
* Json转换器
*/
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
/**
* 队列
*/
@Bean
public Queue directQueue()
{
return new Queue(DIRECT_QUEUE_NAME, true, false, false, null);
}
/**
* Direct交换器
*/
@Bean
public DirectExchange directExchange()
{
return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
}
/**
* 绑定
*/
@Bean
Binding bindingDirect(DirectExchange directExchange, Queue directQueue)
{
//将队列和交换机绑定, 并设置用于匹配键:routingKey
return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
}
/********************配置客户端消息确认Ack********************/
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private AckReceiver ackReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer()
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置一个队列
container.setQueueNames(DIRECT_QUEUE_NAME);
container.setMessageListener(ackReceiver);
return container;
}
}
1、JSON格式数据的发送与接收
(1)创建实体类(entity层)
在 com.pjb.entity 包中,创建UserInfo类(用户信息实体类)。
package com.pjb.entity;
/**
* 用户信息实体类
* @author pan_junbiao
**/
public class UserInfo
{
private int userId; //用户编号
private String userName; //用户姓名
private String blogUrl; //博客地址
private String blogRemark; //博客信息
//省略getter与setter方法...
}
(2)创建发送者(sender层)
在 com.pjb.sender 包中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息。
package com.pjb.sender;
import com.pjb.config.RabbitMqConfig;
import com.pjb.entity.UserInfo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 发送JSON数据
* @author pan_junbiao
**/
@SpringBootTest
public class JsonSender
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sender() throws AmqpException
{
//创建用户信息
UserInfo userInfo = new UserInfo();
userInfo.setUserId(1);
userInfo.setUserName("pan_junbiao的博客");
userInfo.setBlogUrl("https://blog.csdn.net/pan_junbiao");
userInfo.setBlogRemark("您好,欢迎访问 pan_junbiao的博客");
/**
* 发送消息,参数说明:
* String exchange:交换器名称。
* String routingKey:路由键。
* Object object:发送内容。
*/
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userInfo);
System.out.println("消息发送成功!");
}
}
(3)创建接收者(receiver层)
方式一:使用传统的@RabbitListener、@RabbitHandler注解实现消息的接收。
在 com.pjb.receiver 包中,创建创建接收者,使用传统的@RabbitListener、@RabbitHandler注解实现消息的接收。在方法的参数中, RabbitMQ会自动将JSON参数转换为实体对象类。
注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。
package com.pjb.receiver;
import com.pjb.entity.UserInfo;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.pjb.config.RabbitMqConfig;
import java.util.Map;
/**
* 接收者
* @author pan_junbiao
**/
@Component
@RabbitListener(queues=RabbitMqConfig.DIRECT_QUEUE_NAME)
public class JsonReceiver
{
@RabbitHandler
public void process(UserInfo userInfo)
{
System.out.println("接收者收到JSON格式消息:");
System.out.println("用户编号:" + userInfo.getUserId());
System.out.println("用户名称:" + userInfo.getUserName());
System.out.println("博客地址:" + userInfo.getBlogUrl());
System.out.println("博客信息:" + userInfo.getBlogRemark());
}
}
方式二:使用RabbitMQ消息确认机制(ACK)
如果项目中使用了RabbitMQ消息确认机制(ACK),则获取Json格式数据方法如下:
package com.pjb.receiver;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pjb.entity.UserInfo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* Ack接收者
* @author pan_junbiao
**/
@Component
public class AckReceiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
//将JSON格式数据转换为实体对象
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), UserInfo.class);
System.out.println("接收者收到JSON格式消息:");
System.out.println("用户编号:" + userInfo.getUserId());
System.out.println("用户名称:" + userInfo.getUserName());
System.out.println("博客地址:" + userInfo.getBlogUrl());
System.out.println("博客信息:" + userInfo.getBlogRemark());
//确认消息
channel.basicAck(deliveryTag, true);
}
catch (Exception e)
{
e.printStackTrace();
//拒绝消息
channel.basicReject(deliveryTag, true);
}
}
}
执行结果:
2、Map格式数据的发送与接收
(1)创建发送者(sender层)
在 com.pjb.sender 包中,创建发送者,利用 rabbitTemplate.convertAndSend() 方法发送消息。
package com.pjb.sender;
import com.pjb.config.RabbitMqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
/**
* 发送Map数据
* @author pan_junbiao
**/
@SpringBootTest
public class MapSender
{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sender() throws AmqpException
{
//创建用户信息Map
Map<String, Object> userMap = new HashMap<>();
userMap.put("userId", "1");
userMap.put("userName", "pan_junbiao的博客");
userMap.put("blogUrl", "https://blog.csdn.net/pan_junbiao");
userMap.put("userRemark", "您好,欢迎访问 pan_junbiao的博客");
/**
* 发送消息,参数说明:
* String exchange:交换器名称。
* String routingKey:路由键。
* Object object:发送内容。
*/
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
System.out.println("消息发送成功!");
}
}
(2)创建接收者(receiver层)
方式一:使用传统的@RabbitListener、@RabbitHandler注解实现消息的接收。
在 com.pjb.receiver 包中,创建创建接收者,使用传统的@RabbitListener、@RabbitHandler注解实现消息的接收。在方法的参数中, RabbitMQ会自动封装Map格式数据。
注意,发送者和接收者的 Queue 名称必须一致,否则不能接收消息。
package com.pjb.receiver;
import com.pjb.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 接收者
* @author pan_junbiao
**/
@Component
@RabbitListener(queues= RabbitMqConfig.DIRECT_QUEUE_NAME)
public class MapReceiver
{
@RabbitHandler
public void process(Map message)
{
System.out.println("接收者收到Map消息:");
System.out.println("用户编号:" + message.get("userId"));
System.out.println("用户名称:" + message.get("userName"));
System.out.println("博客地址:" + message.get("blogUrl"));
System.out.println("博客信息:" + message.get("userRemark"));
}
}
方式二:使用RabbitMQ消息确认机制(ACK)
如果项目中使用了RabbitMQ消息确认机制(ACK),则获取Map格式数据方法如下:
package com.pjb.receiver;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Ack接收者
* @author pan_junbiao
**/
@Component
public class AckReceiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
//将JSON格式数据转换为Map对象
ObjectMapper mapper = new ObjectMapper();
JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);
System.out.println("接收者收到Map格式消息:");
System.out.println("用户编号:" + resultMap.get("userId"));
System.out.println("用户名称:" + resultMap.get("userName"));
System.out.println("博客地址:" + resultMap.get("blogUrl"));
System.out.println("博客信息:" + resultMap.get("userRemark"));
//确认消息
channel.basicAck(deliveryTag, true);
}
catch (Exception e)
{
e.printStackTrace();
//拒绝消息
channel.basicReject(deliveryTag, true);
}
}
}
执行结果: