yml:
spring:
#RibbitMQ
rabbitmq:
host: 192.168.31.97
port: 5672
virtual-host: /
username: admin
password: admin
#开启发送端消息抵达队列的确认、确认被路由到队列
publisher-returns: true
#只要抵达队列,以异步方式优先回调returnConfirm
template:
mandatory: true
#手动ack确认收到消息
listener:
simple:
acknowledge-mode: manual
RabbitConfig:
package com.jeesun.gc.jeesungoods.config;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
public class RabbitConfig {
@Resource
RabbitTemplate rabbitTemplate;
/**
* 注意消息 json转换器
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitTemplate() {
/**
* 返回通知
* 1、只要消息抵达服务器交换机就ack=true
* correlationData:当前消息的唯一关联数据(拥有唯一id)
* ack:消息是否成功被交换机接收
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 异常通知
* 只要消息没有路由到队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
消费者:
package com.jeesun.gc.jeesungoods.mq;
import com.jeesun.gc.jeesuncommon.entity.jeesungoods.entity.GoodsBase;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
//1.会把申请的队列和交换机进行绑定
//2.确定消息的模式 fanout direct topic
//3.确定队列queue的持久性
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order.direct.queue",autoDelete="false"),
exchange = @Exchange(value = "order.direct.exchange",type = ExchangeTypes.DIRECT),
key = "order"
))
public class OrderListener {
@RabbitHandler
public void sendMessage(Message message,String str, Channel channel) throws IOException {
long dliveryTag=message.getMessageProperties().getDeliveryTag();
//签收
channel.basicAck(dliveryTag,false);
//退货
//内部消息id 是否批量签收 是否丢弃
//第三个参数当为true的时候直接丢弃本条信息,当为false的时候将它重新发送至队列中
//channel.basicNack(dliveryTag,false,false);
System.out.println("order1111====>"+str);
}
@RabbitHandler
public void sendMessage(Message message,byte [] str, Channel channel) throws IOException {
long dliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(dliveryTag,false);
System.out.println("order3333====>"+str);
}
@RabbitHandler
public void sendMessage(Message message,GoodsBase gb, Channel channel) throws IOException {
long dliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(dliveryTag,false);
System.out.println("order2222====>"+gb.getGoodsName());
}
}
生产者:
@Autowired
private RabbitTemplate rabbitTemplate;
GoodsBase gb =new GoodsBase();
gb.setId(100L);
gb.setGoodsName("apple");
rabbitTemplate.convertAndSend("order.direct.exchange","order",gb);