一、RabbitMQ的七大通信模型
RabiitMQ 目前有七大通信模型,分别是:
Hello World
模型(最简单的模型,点对点通过队列直接通信)
Work Queues
工作队列模型(在多个工人之间分配消息,竞争消费消息,能者多劳,消费快的工人能够消费更多的消息)
Publish / Subscribe
发布与订阅模型(Fanout 广播模式,一条消息可以被多个消费者消费)
Route
路由模型(Direct模式,根据消息类型,有选择性的消费消息)
Topic
动态路由模型(与Route路由模型类似,有选择性的消费消息,不同之处在于它可以定义动态路由)
RPC
通信模型(这里不作介绍)
Publisher Confirms
发布者确认模型(这里不作介绍)
二、模型介绍及代码实现
引入依赖:
<!-- rabbitmq -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
1. HelloWorld 模型
Hello World 模型是最基本的通信模型,它通过队列直接点对点进行通信。
生产者
public class Provider {
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("121.41.179.236");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
// 创建通道,交换机和队列都是通过通道来传递信息的
channel = connection.createChannel();
// 通道绑定队列
// 参数1:队列名称,若队列不存在,则创建
// 参数2:队列是否持久化
// 参数3:是否独占队列
// 参数4:队列消费完消息后,是否自动删除队列
// 参数5:额外传递的参数
channel.queueDeclare("hello", true, false, false, null);
// 发送消息
// 参数1:交换机名称,若不往交换机发送消息,则留空
// 参数2:队列名称或路由名称
// 参数3:消息的额外设置(如设置消息的持久化,MessageProperties.TEXT_PLAIN表示消息持久化)
// 参数4:发送的消息
channel.basicPublish("", "hello", null, "我是hello模型发送的信息".getBytes());
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
消费者
public class Consumer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("121.41.179.236");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
try {
Connection connection = factory.newConnection();
// 创建通道,交换机和队列都是通过通道来传递信息的
Channel channel = connection.createChannel();
// 通道绑定队列(参数设置需要与消息生成者绑定的队列保持一致)
// 参数1:队列名称,若队列不存在,则创建
// 参数2:队列是否持久化
// 参数3:是否独占队列
// 参数4:队列消费完消息后,是否自动删除队列
// 参数5:额外传递的参数
channel.queueDeclare("hello", true, false, false, null);
// 从队列中消费消息
// 参数1:消费的队列名称
// 参数2:是否自动确认(拿到消息后,是否自动确认消息已消费完成)
// 参数3:消费时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到来自hello模型的消息为:" + new String(body));
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
由于每次创建连接对象的代码都是一样的,所以这里封装成一个工具类,方便使用。
public class ConnectUtils {
private static Connection connection = null;
/**
* 单例设计模式(饱汉式)
* @return 连接对象
*/
public static Connection getConnection(){
if (connection == null){
synchronized (ConnectUtils.class) {
if (connection == null) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("121.41.179.236");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");
try {
connection = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return connection;
}
/**
* 关闭资源
* @param connection 连接对象
* @param channel 通道
*/
public static void close(Connection connection, Channel channel) {
try {
if (channel != null){
channel.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
2. WorkQueues 模型
Work Queues 工作队列模型(在多个工人之间分配消息,竞争消费消息,能者多劳,即消费快的工人能够消费更多的消息,如:一个队列中有10条消息,甲消费的快,消费了8条,而乙只消费了2条)
生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定队列
channel.queueDeclare("work", true, false, false, null);
// 模拟生成20条消息
for (int i = 0; i < 20; i++) {
// 发送消息
// 参数1:交换机名称,若不往交换机发送消息,则留空
// 参数2:队列名称或路由名称
// 参数3:消息的额外设置(如设置消息的持久化,MessageProperties.TEXT_PLAIN表示消息持久化)
// 参数4:发送的消息
channel.basicPublish("", "work", MessageProperties.TEXT_PLAIN, ("我是来自work queue工作模型的消息-" + i).getBytes());
}
ConnectUtils.close(connection, channel);
}
}
消费者1
public class Consumer01 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定队列
channel.queueDeclare("work", true, false, false, null);
// 设置一次只能消费一条消息
// 否则尽管该消费者消费消息的速度比较慢,但rabbitmq还是会将所有的消息平分给所有消费者进行处理
channel.basicQos(1);
// 从队列中消费消息
// 将参数2设置为false,即关闭消息自动确认机制,转为手动确认,可实现能者多劳模式,谁处理的快,谁就多处理些。
channel.basicConsume("work", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
// 模拟消费者1的处理速度较慢
Thread.sleep(500);
}catch (Exception e) {
e.printStackTrace();
}
System.out.println("消费者1:接收到的消息为:" + new String(body));
// 手动确认消息已消费完成
// 参数1:确认队列中哪个具体消息
// 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
消费者2
public class Consumer02 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定队列
channel.queueDeclare("work", true, false, false, null);
// 设置一次只能消费一条消息
// 否则尽管该消费者消费消息的速度比较慢,但rabbitmq还是会将所有的消息平分给所有消费者进行处理
channel.basicQos(1);
// 从队列中消费消息
// 将参数2设置为false,即关闭消息自动确认机制,转为手动确认,可实现能者多劳模式,谁处理的快,谁就多处理些。
channel.basicConsume("work", false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2:接收到的消息为:" + new String(body));
// 手动确认消息已消费完成
// 参数1:确认队列中哪个具体消息
// 参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
3. Publish / Subscribe 模型
发布与订阅模型,即 fanout 广播模式,一条消息可以被多个消费者消费。
生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
// 参数1:交换机名称
// 参数2:交换机的类型 fanout、direct、topic
channel.exchangeDeclare("logs", "fanout");
// 发送消息
// 参数1:交换机名称
// 参数2:自定义路由名称,若未使用路由,则留空
// 参数3:传递消息的额外设置(如设置消息的持久化)
// 参数4:发送的消息
channel.basicPublish("logs", "", null, "我是来自fanout模型的消息".getBytes());
// 关闭资源
ConnectUtils.close(connection, channel);
}
}
消费者1
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("logs", "fanout");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将临时队列绑定到交换机上
// 参数1:队列名称
// 参数2:交换机名称
// 参数3:路由名称,未使用时则留空
channel.queueBind(queue, "logs", "");
// 从临时队列中消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1-接收到的消息为:" + new String(body));
}
});
消费者2
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("logs", "fanout");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将临时队列绑定到交换机上
// 参数1:队列名称
// 参数2:交换机名称
// 参数3:路由名称,未使用时则留空
channel.queueBind(queue, "logs", "");
// 从临时队列中消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2-接收到的消息为:" + new String(body));
}
});
4. Route 模型
Route路由模型,即 direct 路由模式,消费者根据消息类型,有选择性的消费消息。
生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 通道绑定交换机(direct类型交换机)
channel.exchangeDeclare(exchangeName, "direct");
// 发送消息(error 和 info 消息各一条)
// 参数1:交换机名称
// 参数2:自定义路由名称,表示发送的消息类型
// 参数3:传递消息的额外设置(如设置消息的持久化)
// 参数4:发送的消息
channel.basicPublish(exchangeName, "error", null, "我是来自direct模式下的error类型的消息".getBytes());
channel.basicPublish(exchangeName, "info", null, "我是来自direct模式下的info类型的消息".getBytes());
// 关闭资源
ConnectUtils.close(connection, channel);
}
}
消费者1
public class Consumer01 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
String exchangeName = "logs_direct";
channel.exchangeDeclare(exchangeName, "direct");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将临时队列绑定到交换机上,并声明接收的消息类型为 error和 info
// 参数1:队列名称
// 参数2:交换机名称
// 参数3:自定义接收的路由名称
channel.queueBind(queue, exchangeName, "error");
channel.queueBind(queue, exchangeName, "info");
// 从队列中消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1-接收到的消息为:" + new String(body));
}
});
}
}
消费者2
public class Consumer02 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
String exchangeName = "logs_direct";
channel.exchangeDeclare(exchangeName, "direct");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将临时队列绑定到交换机上,并声明接收的消息类型为 info
// 参数1:队列名称
// 参数2:交换机名称
// 参数3:自定义接收的路由名称
channel.queueBind(queue, exchangeName, "info");
// 从队列中消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2-接收到的消息为:" + new String(body));
}
});
}
}
5. Topic 模型
动态路由模型,即 topic 动态路由模式,它与Route路由模型类似,也是有选择性的消费消息,不同之处在于它可以定义动态路由,即使用通配符的形式定义路由名称。例如:
user.*
、user.#
*
代表匹配一个任意字符
#
代表匹配 0 到多个任意字符
生产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
String exchangeName = "logs_topic";
// 通道绑定交换机(topic类型交换机)
channel.exchangeDeclare(exchangeName, "topic");
// 发送消息
// 参数1:交换机名称
// 参数2:自定义路由名称,表示发送的消息类型
// 参数3:传递消息的额外设置(如设置消息的持久化)
// 参数4:发送的消息
channel.basicPublish(exchangeName, "user", null, "我是来自topic模式下的user类型的消息".getBytes());
channel.basicPublish(exchangeName, "user.insert", null, "我是来自topic模式下的user.insert类型的消息".getBytes());
channel.basicPublish(exchangeName, "user.info.findName", null, "我是来自topic模式下的user.info.findName类型的消息".getBytes());
// 关闭资源
ConnectUtils.close(connection, channel);
}
}
消费者1
public class Consumer01 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
String exchangeName = "logs_topic";
channel.exchangeDeclare(exchangeName, "topic");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将临时队列绑定到交换机上,并使用通配符的形式声明接收的消息类型
// 参数1:队列名称
// 参数2:交换机名称
// 参数3:自定义接收的路由名称
// user.* 可接收 user.insert 等类型的消息
channel.queueBind(queue, exchangeName, "user.*");
// 从队列中消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1-接收到的消息为:" + new String(body));
}
});
}
}
消费者2
public class Consumer02 {
public static void main(String[] args) throws IOException {
Connection connection = ConnectUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定交换机
String exchangeName = "logs_topic";
channel.exchangeDeclare(exchangeName, "topic");
// 创建临时队列
String queue = channel.queueDeclare().getQueue();
// 将临时队列绑定到交换机上,并使用通配符的形式声明接收的消息类型
// 参数1:队列名称
// 参数2:交换机名称
// 参数3:自定义接收的路由名称
// user.# 可接收 user、user.insert、user.info.findName 等类型的消息
channel.queueBind(queue, exchangeName, "user.#");
// 从队列中消费消息
channel.basicConsume(queue, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2-接收到的消息为:" + new String(body));
}
});
}
}
三、SpringBoot整合RabbitMQ
引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件:
spring:
# rabbitmq配置
rabbitmq:
host: 121.41.179.236
port: 5672
username: admin
password: 123456
virtual-host: /
在SpringBoot中通过自动注入,可直接使用
RabbitTemplate
来操作 RabbitMQ
1. HelloWorld 模型
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/hello-world")
public String helloWorld(String message){
// 点对点通过队列直接通信
// 参数1:队列名称 参数2:发送的消息
rabbitTemplate.convertAndSend("hello", message);
return "ok";
}
消费者
@Component
public class HelloConsumer {
// 监听 hello队列 的消息
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "true"))
public void receive(String message){
System.out.println("消费者-接收到消息:" + message);
}
}
2. WorkQueues 模型
生产者
@GetMapping("/work-queues")
public String workQueues(String message){
// 发送10条消息,模拟 work queues 工作队列,多个工人(消费者)竞争消费
for (int i = 0; i < 10; i++) {
// 参数1:队列名称 参数2:发送的消息
rabbitTemplate.convertAndSend("work", message + i);
}
return "ok";
}
消费者1
// 消费者1
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String msg, Message message, Channel channel){
try {
// 模拟处理消息的延迟
Thread.sleep(1000);
System.out.println("消费者1-接收到消息:" + msg);
// 手动确认消息已消费(使用手动确认消息时,需要在配置文件中设置acknowledge-mode=manual)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
消费者2
// 消费者2
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String msg, Message message, Channel channel){
try {
System.out.println("消费者2-接收到消息:" + msg);
// 手动确认消息已消费(使用手动确认消息时,需要在配置文件中设置acknowledge-mode=manual)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
结果
消费者2-接收到消息:消息0
消费者2-接收到消息:消息2
消费者2-接收到消息:消息3
消费者2-接收到消息:消息4
消费者2-接收到消息:消息5
消费者2-接收到消息:消息6
消费者2-接收到消息:消息7
消费者2-接收到消息:消息8
消费者2-接收到消息:消息9
消费者1-接收到消息:消息1
注意:
实现能者多劳模式时:
需要在配置文件中设置
spring.rabbitmq.listener.simple.prefetch=1
实现手动确认消息时:
需要在配置文件中设置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3. Publish / Subscribe 模型
生产者
@GetMapping("/fanout")
public String fanout(String message){
// fanout广播模式,一条消息可以被多个消费者消费。
// 发送一条消息到交换机中,所有绑定了该交换机的队列,都将收到消息,进而被对应的消费者消费
// 参数1:交换机名称 参数2:路由名称(消息的类型) 参数3:发送的消息
rabbitTemplate.convertAndSend("logs", "", message);
return "ok";
}
消费者1
// 消费者1
@RabbitListener(bindings = {
@QueueBinding(
// 创建临时队列
value = @Queue,
// 绑定的交换机
exchange = @Exchange(value = "logs", type = "fanout")
)
})
public void receive1(String message){
System.out.println("消费者1-接收到消息:" + message);
}
消费者2
// 消费者2
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs", type = "fanout")
)
})
public void receive2(String message){
System.out.println("消费者2-接收到消息:" + message);
}
结果
消费者1-接收到消息:我是fanout消息
消费者2-接收到消息:我是fanout消息
4. Route 模型
生产者
@GetMapping("/direct")
public String direct(String message){
// direct路由模式,它也具有广播模式的特点,一条消息可以被多个消费者消费。
// 不同之处在于消费者可以根据消息的类型,有选择性的消费消息。
// 参数1:交换机名称 参数2:路由名称(消息的类型) 参数3:发送的消息
// 发送三种不同类型的消息
rabbitTemplate.convertAndSend("logs_direct", "error", message + "-error");
rabbitTemplate.convertAndSend("logs_direct", "warning", message + "-warning");
rabbitTemplate.convertAndSend("logs_direct", "info", message + "-info");
return "ok";
}
消费者1
// 消费者1
@RabbitListener(bindings = {
@QueueBinding(
// 创建临时队列
value = @Queue,
// 绑定的交换机(type默认是direct,可不写)
exchange = @Exchange(value = "logs_direct"),
// 指明消费的消息类型(路由名称)
key = "info"
)
})
public void receive1(String message){
System.out.println("消费者1-接收到消息:" + message);
}
消费者2
// 消费者2
@RabbitListener(bindings = {
@QueueBinding(
// 创建临时队列
value = @Queue,
// 绑定的交换机(type默认是direct,可不写)
exchange = @Exchange("logs_direct"),
// 指明消费的消息类型(路由名称)
key = "error"
),
@QueueBinding(
// 创建临时队列
value = @Queue,
// 绑定的交换机(type默认是direct,可不写)
exchange = @Exchange("logs_direct"),
// 指明消费的消息类型(路由名称)
key = "warning"
)
})
public void receive2(String message){
System.out.println("消费者2-接收到消息:" + message);
}
结果
# 消费者1可接收info类型的消息
消费者1-接收到消息:我是direct消息-info
# 消费者2可接收error、warning类型的消息
消费者2-接收到消息:我是direct消息-error
消费者2-接收到消息:我是direct消息-warning
5. Topic 模型
生产者
@GetMapping("/topic")
public String topic(String message){
// topic动态路由模式,与 direct路由模式基本相同,也是根据消息的类型,有选择性的消费消息。
// 不同之处在于,topic模式下,消费者可以通过使用通配符(* #)来指定消费的消息类型。
// 参数1:交换机名称 参数2:路由名称(消息的类型) 参数3:发送的消息
// 发送三种不同类型的消息
rabbitTemplate.convertAndSend("logs_topic", "user", message + "-user");
rabbitTemplate.convertAndSend("logs_topic", "user.search", message + "-user.search");
rabbitTemplate.convertAndSend("logs_topic", "user.info.findName", message + "-user.info.findName");
return "ok";
}
消费者1
// 消费者1
@RabbitListener(bindings = {
@QueueBinding(
// 创建临时队列
value = @Queue,
// 绑定的交换机
exchange = @Exchange(value = "logs_topic", type = "topic"),
// 指明消费的消息类型(路由名称)
key = "user.*"
)
})
public void receive1(String message){
System.out.println("消费者1-接收到消息:" + message);
}
消费者2
// 消费者2
@RabbitListener(bindings = {
@QueueBinding(
// 创建临时队列
value = @Queue,
// 绑定的交换机
exchange = @Exchange(value = "logs_topic", type = "topic"),
// 指明消费的消息类型(路由名称)
key = "user.#"
)
})
public void receive2(String message){
System.out.println("消费者2-接收到消息:" + message);
}
结果
# 消费者1可接收user.*类型的消息 (*代表一个任意字符)
消费者1-接收到消息:我是topic消息-user.search
# 消费者2可接收user.#类型的消息 (#代表0-多个任意字符)
消费者2-接收到消息:我是topic消息-user
消费者2-接收到消息:我是topic消息-user.search
消费者2-接收到消息:我是topic消息-user.info.findName
四、RabbitMQ集群
集群相关信息可搜索其他资料