官方文档地址: 5 Topics
基于模式(主题)接收消息。
前提条件
本教程假设你已经安装了 RabbitMQ 并在本地主机端口(5672)上运行。
主题
在上一篇教程中,我们改进了日志系统。我们使用了direct
交换器,而不是仅能进行模拟广播的fanout
交换器,从而获得了选择性接收日志的可能性。
虽然使用direct
交换器改进了我们的系统,但它仍然有局限性 - 不能基于多个条件进行路由。
在我们的日志系统中,我们可能不仅希望根据严重性订阅日志,还希望根据发出日志的来源订阅日志。您可能从 unix 工具syslog
中了解了这个概念,该工具根据严重程度(info/warn/crit…
)和设备(auth/cron/kern…
)来路由日志。
这将给我们提供很大的灵活性 - 我们可能希望不只监听来自"cron"
的错误日志,还要监听来自"kern"
的所有日志。
要在日志系统中实现这一点,我们需要了解更复杂的topic
交换器。
Topic 交换器
发送到topic
交换器的消息的routing_key
必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定一些与消息相关的特性。几个有效的路由键示例:stock.usd.nyse
,nyse.vmw
,quick.orange.rabbit
。路由键最多255
个字节。
绑定键也必须是相同的形式。topic
交换器背后的逻辑与direct
交换器类似 - 使用特定路由键发送的消息将被传递到所有使用匹配的绑定键绑定的队列。但是,绑定键有两个特殊情况:
*
可以代替一个单词。#
可以代替零个或多个单词。
用一个例子来解释最简单:
在本例中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个词将描述速度,第二个是颜色,第三个是物种:<speed>.<colour>.<species>
我们创建了三个绑定:Q1
与绑定键*.orange.*
绑定,Q2
与绑定键*.*.rabbit
和lazy.#
绑定。
这些绑定可概括为:
Q1
对所有的橙色动物都感兴趣。Q2
对兔子和速度慢的动物感兴趣。
将路由键设置为以下内容,对应的发送队列如下:
- 路由键设置为
"quick.orange.rabbit"
的消息将被发送到两个队列。 - 路由键设置为
"lazy.orange.elephant"
的消息也会发送到这两个队列。 - 路由键设置为
"quick.orange.fox"
的消息只会去第一个队列。 - 路由键设置为
"lazy.brown.fox"
的消息只到第二个队列。 - 路由键设置为
"lazy.pink.rabbit"
的消息只会被发送到第二个队列一次,即使它匹配了两个绑定。 - 路由键设置为
"quick.brown.fox"
的消息不匹配任何绑定,因此将被丢弃。
如果我们不遵守约定,用一个或四个单词来传达信息,比如"orange"
或者"quick.orange.mally.rabbit"
,这些消息将不匹配任何绑定,会被丢失。
"lazy.orange.male.rabbit"
在另一方面,即使它有四个单词,也将匹配最后一个绑定并将传递到第二个队列。
Topic 交换器
topic
交换器功能强大,像其他交换器一样。
当队列使用#
绑定键绑定时,它将接收所有的消息,不管路由键是什么,就像在fanout
交换器中一样。
绑定键如果不使用特殊字符*
和#
,topic
就和direct
一样了。
把它们放一起
我们将在日志系统中使用topic
交换器。我们首先假设日志的路由键有两个单词:<facility>.<severity>
。
代码与前面的教程几乎相同。EmitLogTopic.java
类代码:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wangbo
* @date 2019/10/23 11:24
*/
public class EmitLogTopic {
//交换器名称
private final static String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接器连接到服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection()){
//创建一个通道
Channel channel = connection.createChannel();
//声明交换器,设置交换器类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//从命令行接受参数
//路由键
String routingKey = getRouting(args);
//日志消息
String message = getMessage(args);
//发布消息到交换器,并设置路由键
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
private static String getRouting(String[] strings) {
if (strings.length < 1) {
return "anonymous.info";
}
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2) {
return "Hello World!";
}
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) {
return "";
}
if (length < startIndex) {
return "";
}
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
ReceiveLogsTopic.java
类代码:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wangbo
* @date 2019/10/22 18:25
*/
public class ReceiveLogsTopic {
//交换器名称
private final static String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接器连接到服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明交换器,设置交换器类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//获取临时队列的名称
String queueName = channel.queueDeclare().getQueue();
//将临时队列和交换器绑定,进行多重绑定
if (args.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String bindingKey : args) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回调对象
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
//消费者监听
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
编译并运行示例,包含教程1
中的类路径 - 在 Windows 上使用%CP%
。
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志:
java -cp $CP ReceiveLogsTopic "#"
接收所有来自设备kern
的日志:
java -cp $CP ReceiveLogsTopic "kern.*"
或者接收所有critical
级别的日志:
java -cp $CP ReceiveLogsTopic "*.critical"
你可以创建多个绑定:
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
发出带有路由键kern.critical
的日志:
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
接下来,在教程6
中了解如何作为远程过程调用执行往返消息。