一、模型
- P(Producer):生产者——发送消息
- X(Exchange):交换机——一方面接受生产者发送的消息,另一方面是向队列中推送消息
- Q(Queue): 消息队列(图中红色方形)——存储消息
- C(consumer): 消费者——接收消息
订阅模式( Publish/Subscribe)即由生产者发送消息至交换机,然后交换机会将此消息推送到与之绑定的所有队列中去,最终由各个消费者在其监听的队列中获取消息。
模型解读:
- 一个生产者,多个消费者;
- 每一个消费者都有自己的队列;
- 生产者没有直接把消息发送到对列,而是发到了交换机;
- 每一个队列都要绑定到交换机上;
- 生产者发送的消息,经过交换机发送到对列,就能实现一个消息被多个消费者接收。
注:订阅模式( Publish/Subscribe)需将交换机(Exchange)类型定义为"Fanout"。
二、Fanout exchange(扇型交换机)
1、模型
2、含义
扇型交换机(Fanout Exchange)将消息发送至所有与该交换机绑定的队列中。不需要设置路由键(Routing Key),只需要提前将交换机与队列绑定好。当生产者发送消息到交换机之后,交换机会将消息推送到所有绑定的队列中去。由于扇形交换机不受路由键的影响,因此扇形交换机推送消息是最快的。
三、Java编程实现
1、导入AMQP协议jar包,以及创建RabbitMQ连接工具类,请查看 三、RabbitMQ之简单队列(Simple Queue);
2、创建生产者发送消息;
package com.rabbitMQ.publishSubscribe;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 订阅模式-生产者
* @author zhoujin
* @data 2019-1-22
*/
public class PublishProducer {
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
Connection conn = null;
Channel channel = null;
try {
// 1.获取连接
conn = ConnectionUtils.getConnection();
// 2.从连接中获取通道
channel = conn.createChannel();
// 3.声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 4.发送消息
String message = "This is public subscribe MQ!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("======================= Publish subscribe MQ send message end! 【Content:" + message + "】 =======================");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
ConnectionUtils.closeConnection(channel, conn);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
3、创建消费者1接收消息;
package com.rabbitMQ.publishSubscribe;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 订阅模式-第一个消费者
* @author zhoujin
* @data 2019-1-22
*/
public class PublicFirstConsumer {
private static final String QUEUE_NAME = "queue_fanout_first";
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
try {
// 1.获取连接
Connection conn = ConnectionUtils.getConnection();
// 2.从连接中获取通道
final Channel channel = conn.createChannel();
// 3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.将队列绑定到交换机上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 5.保证一次只接受一条消息
channel.basicQos(1);
// 6.设置消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("======================= First public subscribe received a message! 【Content:" + message + "】 =======================");
// 休眠,模拟业务处理
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 7.手动发送反馈回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 8.监听队列(关闭自动应答,即第二个参数设置为false)
channel.basicConsume(QUEUE_NAME, false, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
4、创建消费者2接收消息。
package com.rabbitMQ.publishSubscribe;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitMQ.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 订阅模式-第二个消费者
* @author zhoujin
* @data 2019-1-22
*/
public class PublicSecondConsumer {
private static final String QUEUE_NAME = "queue_fanout_second";
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) {
try {
// 1.获取连接
Connection conn = ConnectionUtils.getConnection();
// 2.从连接中获取通道
final Channel channel = conn.createChannel();
// 3.声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 5.保证一次只接受一条消息
channel.basicQos(1);
// 6.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("======================= Second public subscribe received a message! 【Content:" + message + "】 =======================");
// 休眠,模拟业务处理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 7.手动发送反馈回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 8.监听对列(关闭自动应答,即第二个参数设置为false)
channel.basicConsume(QUEUE_NAME, false, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
注: 若只在生产者代码中定义了交换机,而未在消费者中定义队列或定义了队列但未与交换机绑定,程序编译和运行都不会报错,但是消息会被丢失。因为交换机没有存储消息的能力,RabbitMQ中只有队列有存储消息的能力。
四、运行代码以及控制台输出
1、 运行生产者代码——定义交换机;
1.1、 在Web管理中查看定义的交换机。
① 为何这里需要先运行生产者的代码?
因为需要先在生产者中定义交换机。若先启动消费者,则会报错提示"no exchange"错误,原因是未定义交换机!
② 为何此次生产者发送的消息丢失了?
扫描二维码关注公众号,回复:
10444289 查看本文章
因为此时还没有队列与之绑定,而交换机本身是不具备存储消息的功能,因此会导致消息丢失。
2、运行两个消费者的代码;
2.1、在在Web管理中查看交换机与队列的绑定关系。
3、再次运行生产者代码——发送消息。
3.1、生产者控制台输出;
3.2、第一个消费者控制台输出;
3.3、第二个消费者控制台输出。