1、SpringBoot整合
1.1、pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2、配置文件
# RabbitMQ配置
spring.rabbitmq.host=192.168.202.211
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
1.3、启动类开启
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {
public static void main(String[] args) {
SpringApplication.run(GulimallOrderApplication.class, args);
}
}
2、单元测试
2.1 创建交换器
/**
* 1、如何创建Exchange、Queue、Binding
* 1)、使用AmqpAdmin进行创建
*/
@Test
public void createExchange() {
Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:","hello-java-exchange");
}
使用代码创建和工具创建类似。
创建结果
2.2 创建队列
@Test
public void testCreateQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:","hello-java-queue");
}
创建结果
2.3 创建绑定
@Test
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:","hello-java-binding");
}
结果
2.4 发送消息1 (字符串)
@Test
public void sendMessageTestDemo1(){
String msg = "hello java";
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",msg);
log.info("消息发送完成{}",msg);
}
查看接收的消息
2.2 发送消息2 (对象)
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
//2、发送的对象类型的消息,可以是一个json
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送完成:{}",reasonEntity);
}
由于是序列化,需要添加配置,经过转换后才可以看到对象数据
@Configuration
public class MyRabbitConfig {
private RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
2.3 接收数据
/**
* queues:声明需要监听的队列
* channel:当前传输数据的通道
*/
@RabbitListener(queues = {
"hello-java-queue"})
public void revieveMessage(Message message,
OrderReturnReasonEntity content) {
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===内容:" + content);
}