在上篇文章Routing中,我们改进了日志系统,使用类型为direct的exchange,使得可以有选择性的接收日志。而不是fanout那样只是简单的广播信息。
虽然使用了direct的exchange来改进我们的日志系统,但是还是有局限性,无法根据多重条件来进行路由选择。
在我们的日志系统中,我们希望不仅仅根据日志的级别,而且根据日志的来源来订阅日志。类似于syslog这个unix工具,它根据严重性(info/warn/crit...)和设备(auth/cron/kern...)来选择发送日志。
这样可能给予我们更多的灵活性,我们希望关注来自于“cron”的critical errors和来自于“kern”的所有日志。
发往topic类型exchange的消息不能任意选择routing_key,必须是由.隔开的一系列标识,这些标识可以是任何东西,但经常是一些与消息特性相关的词,一些合法的routing key的例子:"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。routing key的长度限制为255bytes。
绑定的key和routing key的形式一样,topic exchange的逻辑跟direct exchange类似。附带特殊routing key的消息会被分发到所有绑定匹配的binding key的queue中。需要注意的是,关于binding key有两个特殊的情况。
* 可以匹配一个标识。
#可以匹配0个或多个标识。
如上图所示,我们准备发送关于动物的消息,消息会附带一个routing key包含3个标识(两个.隔开),第一个标识描述速度,第二个标识描述颜色,第三个标识描述种类:"<speed>.<colour>.<species>"。
我们建了3个绑定关系。Q1通过"*.orange.*"绑定,Q2通过"*.*.rabbit"和"lazy.#"来绑定。
这几个绑定关系可以总结为:
Q1对所有橙色动物感兴趣。
Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切
消息附带routing key为"quick.orange.rabbit"将会被分发到Q1和Q2。消息附带routing key为"lazy.orange.elephant"也会被分发到Q1和Q2。消息附带routing key为"quick.orange.fox"只会被分发到Q1。消息附带routing key为"lazy.brown.fox"会被分发到Q2。消息附带routing key为"lazy.pink.rabbit"将会被分发一次到Q2,虽然这个消息匹配到两个binding key。消息附带routing key为"quick.brown.fox"将会被丢弃。
如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
代码如下:EmitLogTopic.java
package org.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5673);
connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String regArgv[] = new String[]{"kern.critical","A critical kernel error"};
String routingKey = getRouting(regArgv);
String message = getMessage(regArgv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
private static String getRouting(String[] strings) {
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}
private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}
private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0)
return "";
if (length < startIndex)
return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
ReceiveLogsTopic.java
package org.rabbitmq;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5673);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String regArgv[] = new String[]{"kern.*","*.critical"};
if (regArgv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : regArgv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
运行结果如下:
参考:http://www.rabbitmq.com/tutorials/tutorial-five-java.html