(内容均来自RabbitMQ官网:https://www.rabbitmq.com/tutorials/tutorial-three-java.html)
上一篇学习了WorkQueue,这一篇学习Publish/Subscribe(发布订阅模式)。Publish/Subscribe(发布订阅模式)引入了ExChange(路由器),前面两篇我们都没有看路由器出现(不是没有,是使用了系统默认的路由器)。由此,我们RabbitMQ出现的组件我们大概总结下:连接、通道、交换机、队列、生产者、消费者
图例
队列(queue)的声明在前两篇已经知道了:
channel.queueDeclare("QUEUE_NAME", false, false, false, null);
交换机(exchange)的声明和队列差不多:
需要指定交换接名字和类型(这里是fanout类型,)
channel.exchangeDeclare("EXCHANGE_NAME", "fanout");
生产者
这里做的事情:
- 获取一个连接
- 一个通道
- 然后创建一个交换机(类型为fanout)
- 然后往发信息给到交换机,没有填写routingKey(fanout类型的交换机,即使指定了routingKey也会被忽略掉)
import com.booyue.tlh.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Producer {
public static final String EXCHANGE_NAME = "logs";
private static final String MESSAGE = " FanOut类型的消息";
public static void main(String[] args) throws IOException {
//获取一个连接
Connection connection = RabbitMQUtils.getConnection();
//获取一个通道
Channel channel = connection.createChannel();
/**
* 声明一个交换机(运行完这一步,rabbitmq系统中就会创建一个名字和类型对应的)
* 参数1:logs
* 参数2:fanout
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 往“logs”交换机里发送20条"{i} Hello RabbitMQ"的信息。这里不需要指定routingKey
*/
for (int i = 0; i < 20; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, (i + MESSAGE).getBytes());
}
}
}
消费者01和消费者02
消费者01和消费者02的区别就是模拟作业时间不同(这里贴上了消费者01)。消费者做的事情:
- 获取连接
- 获取通道
- 声明交换机(指定名字、类型【fanout类型】),如果在生产者那边之指定了这可以不指定,如果两边制定了的话后面那个也没有影响
- 获取一个临时队列,当然也可以不是临时的。临时的好处就是当客户端断开连接之后,系统会自动释放资源
- 建立交换机和队列的绑定关系,不需要指定routingKey(fanout类型的交换机,即使指定了routingKey也会被忽略掉)
- 接受消息
import com.booyue.tlh.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j
public class Consumer01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//创建一个临时队列(临时队列在客户端断开之后会自动删除,释放系统资源)
String queueName = channel.queueDeclare().getQueue();
//创建绑定关系
channel.queueBind(queueName, EXCHANGE_NAME, "");
/**
* 接收消息
* 参数1:队列名
* 参数2:是否自动确认
* 参数3:消息回调接口
*/
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String message = new String(body, "utf-8");
try {
//模拟完成业务逻辑的耗时
Thread.sleep(1000);
} finally {
log.info("消费者1收到的信息:{}", message);
}
}
});
}
}
启动生产者和两个消费者,查看两个消费者收到的消息
发布订阅模式的却别于HelloWorld和WorkQueue的最大区别就是,每个消费者都能收到相同内容、相同数量的消息
17:33:30.262 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:0 FanOut类型的消息
17:33:31.262 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:1 FanOut类型的消息
17:33:32.263 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:2 FanOut类型的消息
17:33:33.263 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:3 FanOut类型的消息
17:33:34.264 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:4 FanOut类型的消息
17:33:35.264 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:5 FanOut类型的消息
17:33:36.265 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:6 FanOut类型的消息
17:33:37.265 [pool-1-thread-7] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:7 FanOut类型的消息
17:33:38.266 [pool-1-thread-7] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:8 FanOut类型的消息
17:33:39.266 [pool-1-thread-7] INFO com.booyue.tlh.publish_subscrible.Consumer01 - 消费者1收到的信息:9 FanOut类型的消息
17:33:30.262 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:0 FanOut类型的消息
17:33:31.262 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:1 FanOut类型的消息
17:33:32.263 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:2 FanOut类型的消息
17:33:33.263 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:3 FanOut类型的消息
17:33:34.264 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:4 FanOut类型的消息
17:33:35.264 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:5 FanOut类型的消息
17:33:36.265 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:6 FanOut类型的消息
17:33:37.265 [pool-1-thread-6] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:7 FanOut类型的消息
17:33:38.266 [pool-1-thread-7] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:8 FanOut类型的消息
17:33:39.266 [pool-1-thread-7] INFO com.booyue.tlh.publish_subscrible.Consumer02 - 消费者2收到的信息:9 FanOut类型的消息