有时候我们希望的是当生产者发送一条消息的时候,与它相关的消费者都能接收到该消息,而不是每一条消息只能一个消费者消费。即相当于我们关注的公众号,公众号发送一条消息,但是所有关注它的用户都可以收到该条消息。
要实现这种模式,需要加入交换机来作为中间转换。即生产者发送消息不再直接发到队列,而是发送到交换机,再由交换机发送到对应的队列之中。但是每一个消费者都有一个自己的队列,并与之绑定,同时再将队列都绑定到交换机上,这样就可以实现一条消息有多个消费者来消费,也就是订阅模式。
生产者
package com.mmr.rabbitmq.fanout;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//获取MQ连接
Connection connection = ConnectionUtils.getConnection();
//获取信道
Channel channel = connection.createChannel();
//声明交换机,设置交换机类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "i am coming------>";
for (int i = 0; i < 20; i++) {
//向交换机中发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes());
System.out.println(message + i);
}
channel.close();
connection.close();
}
}
消费者1
package com.mmr.rabbitmq.fanout;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Rev1 {
private final static String EXCHANGE_NAME = "fanout_exchange";
private final static String QUEUE_NAME = "fanout_queue01";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//声明每次向消费者发送一条消息
channel.basicQos(1);
//将队列与交换机绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//创建一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "utf-8");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//一般用于异常处理,告知MQ消息处理失败,
//第二个参数为true,则该条消息被重新放回队列,为false则放弃该条消息
// channel.basicReject(envelope.getDeliveryTag(), false);
e.printStackTrace();
}finally {
System.out.println("Recv1---->" + msg + "---->Done");
//向MQ发送确认,告诉MQ该条消息已经消费,可以重新发送了
//第二个参数为false表示只确认当前这一条消息,
//如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认(批量确认针对的是整个信道)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消费者2
消费者2与消费者1类似
package com.mmr.rabbitmq.fanout;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Rev2 {
private final static String EXCHANGE_NAME = "fanout_exchange";
private final static String QUEUE_NAME = "fanout_queue02";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "utf-8");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Recv1---->" + msg + "---->Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
运行两个消费者,然后运行生产者,从控制台我们可以看到消费者1和消费者2同时都收到了生产者所发送的消息。