通过前面的学习,可以发现,消息都是通过交换器发送至队列的,一条消息只能被一个消费者处理,实际开发中还会有一种情况,就是一条消息需要被多个消费者处理,就是广播的形式;广播的模式需要使用到 FanoutExchange (散列交换器),FanoutExchange 会将消息发送至每一个与之绑定的队列中
FanoutExchange
代码主体没有太大的改动,增加了 FanoutExchange,并且将队列绑定至 FanoutExchange
截图中被标记的部分如下
@Configuration
public class MQTopicConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.publisher-returns}")
private boolean publisherReturns;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherReturns(publisherReturns);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
public Queue coreQueue() {
return new Queue(Constant.HOSH_TOPIC);
}
@Bean
public Queue subCoreQueue() {
return new Queue(Constant.HOSH_TOPIC_NEW);
}
@Bean
public TopicExchange coreTopicExchange() {
return new TopicExchange(Constant.HOSH_TOPIC_EXC);
}
@Bean
FanoutExchange fanoutExchange() { // 广播交换器
return new FanoutExchange(Constant.HOSH_BROADCAST_EXC);
}
@Bean
public Binding bindingFanoutExchange() { // coreQueue 队列绑定至广播交换器
return BindingBuilder.bind(coreQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingSubFanoutExchange() { // subCoreQueue 队列绑定至广播交换器
return BindingBuilder.bind(subCoreQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingCoreExchange() {
return BindingBuilder.bind(coreQueue()).to(coreTopicExchange()).with(Constant.HOSH_TOPIC);
}
@Bean
public Binding bindingSubCoreExchange() {
return BindingBuilder.bind(subCoreQueue()).to(coreTopicExchange()).with(Constant.HOSH_TOPIC_NEW);
}
}
生产者
把消息发送至散列交换器 FanoutExchange
@Component
public class HoshMQSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
private RabbitTemplate rabbitTemplate;
@Autowired
public HoshMQSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
}
public void send(String str) {
sendMessageWithAck(str);
}
private void sendMessageWithAck(String str) {
// 消息内容
byte[] toSendBytes = str.getBytes();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(String.valueOf(System.currentTimeMillis()));
Message msg = new Message(toSendBytes, messageProperties);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
// rabbitTemplate.convertAndSend(Constant.HOSH_TOPIC_EXC, Constant.HOSH_TOPIC, msg, correlationData);
rabbitTemplate.convertAndSend(Constant.HOSH_BROADCAST_EXC, "", msg, correlationData);
}
// 队列内容发送到 MQ 的确认
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
// if (b) {
// System.out.println("recv HoshMQSender confirm id=" + correlationData.getId());
// } else {
// System.out.println("not recv " + s);
// }
}
/**
* exchange 到达 queue, 则 returnedMessage 不回调
* exchange 到达 queue 失败, 则 returnedMessage 回调
* 需要设置spring.rabbitmq.publisher-returns=true
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("sender return success" + message.toString()
+"\n replyCode "+replyCode+"\n replyText "+replyText
+"\n exchange "+exchange + "\n routingKey "+routingKey);
}
}
消费者
为了实现广播的效果,需要两个消费者
1、消费者1
@Component
public class HoshMqReceiver {
@RabbitHandler
@RabbitListener(queues = Constant.HOSH_TOPIC_NEW)
public void onReceiver(String info, Channel channel, Message msg) throws IOException {
System.out.println("HoshMqReceiver msg " + msg);
System.out.println("HoshMqReceiver info " + info);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}
2、消费者2
@Component
public class HoshMqReceiver2 {
@RabbitHandler
@RabbitListener(queues = Constant.HOSH_TOPIC)
public void onReceiver(String info, Channel channel, Message msg) {
try {
// 开启手动应答 ack 以后,只有当程序明确回复,数据已经被处理,
// 对应数据才会被 RabbitMQ server 清除,否则保留在 RabbitMQ server 上
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
System.out.println("HoshMqReceiver2 msg " + JSON.toJSONString(msg));
System.out.println("HoshMqReceiver2 info " + info);
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试效果
发送消息测试一下,先看看测试结果
可以发现,在使用 FanoutExchange 后,一条消息会发送至所有与其绑定的队列中,而后,监听了对应队列的消费者就可以获取到同一条消息