上一篇,我们介绍了直接路由模型,这一篇我们介绍下主题路由模型。
就如同上一篇所说,直接路由模型和主题路由模型非常相似,它们唯一区别在于:直接路由模型对于路由key是精确匹配的,而主题路由模型对于路由key是模糊匹配的(使用通配符 )。
因此当我们了解上一篇中的直接路由模型,也不难理解本篇中的主题路由模型了。如图:
那么topic中的路由key的匹配规则是什么呢?
topic模式中路由key中存在两种通配符 * 和 #
- * (star) can substitute for exactly one word. *匹配明确的一个单词
- # (hash) can substitute for zero or more words. #匹配0到多个单词
topic模式中若路由key为"#" ,则表示对所有队列感兴趣 ,相当于fanout
topic模式中若路由key中不含有*和#字符时,则表示只对精确匹配的路由key队列感兴趣,则相对于routing
示例代码:
生产者
package com.tingcream.rabbitmq.topic;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicLogProducer {
private static String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主机 端口 vhost 用户名 密码
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
// lazy.black.elephant 懒惰的黑色的象
// lazy.white.sheep 懒惰的白色的羊
// quick.black.horse 快速的黑色的马
// quick.red.horse 快速的红色的马
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
for (int i=0;i<10;i++){
String message="你好,这是lazy.black.elephant级别消息 "+i;
channel.basicPublish(EXCHANGE_NAME,"lazy.black.elephant",null,message.getBytes());
System.out.println("TopicLogProducer Send: " + message );
}
for (int i=0;i<10;i++){
String message="你好,这是lazy.white.sheep级别消息 "+i;
channel.basicPublish(EXCHANGE_NAME,"lazy.white.sheep",null,message.getBytes());
System.out.println("TopicLogProducer Send: " + message );
}
for (int i=0;i<10;i++){
String message="你好,这是quick.black.horse级别消息 "+i;
channel.basicPublish(EXCHANGE_NAME,"quick.black.horse",null,message.getBytes());
System.out.println("TopicLogProducer Send: " + message );
}
for (int i=0;i<10;i++){
String message="你好,这是quick.red.horse级别消息 "+i;
channel.basicPublish(EXCHANGE_NAME,"quick.red.horse",null,message.getBytes());
System.out.println("TopicLogProducer Send: " + message );
}
channel.close();
connection.close();
}
}
消费者A
package com.tingcream.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TopicLogReceiverA {
private static String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主机 端口 vhost 用户名 密码
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//获取一个随机的队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定routingKey "#" 表示对所有队列感兴趣 ,相当于fanout
// 绑定routingKey 若不含"#"和"*",表示只对精确匹配的路由key队列感兴趣,则相对于routing
// channel.queueBind(queue, exchange, routingKey)
channel.queueBind(queueName, EXCHANGE_NAME, "*.black.*");
System.out.println("TopicLogReceiverA Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println( "TopicLogReceiverA接收到消息:" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消费者B
package com.tingcream.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class TopicLogReceiverB {
private static String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主机 端口 vhost 用户名 密码
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//获取一个随机的队列名称
String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queue, exchange, routingKey) //bind方法可以调用多次,绑定多个
channel.queueBind(queueName, EXCHANGE_NAME, "quick.#");
channel.queueBind(queueName, EXCHANGE_NAME, "*.white.*");
System.out.println("TopicLogReceiverB Waiting for messages");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println( "TopicLogReceiverB接收到消息:" + message );
}
};
channel.basicConsume(queueName, true, consumer);
}
}
依次启动消费者A、B和生产者。
消费者A输出
TopicLogReceiverA Waiting for messages
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 0
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 1
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 2
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 3
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 4
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 5
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 6
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 7
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 8
TopicLogReceiverA接收到消息:你好,这是lazy.black.elephant级别消息 9
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 0
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 1
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 2
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 3
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 4
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 5
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 6
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 7
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 8
TopicLogReceiverA接收到消息:你好,这是quick.black.horse级别消息 9
消费者B输出:
TopicLogReceiverB Waiting for messages
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 0
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 1
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 2
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 3
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 4
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 5
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 6
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 7
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 8
TopicLogReceiverB接收到消息:你好,这是lazy.white.sheep级别消息 9
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 0
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 1
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 2
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 3
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 4
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 5
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 6
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 7
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 8
TopicLogReceiverB接收到消息:你好,这是quick.black.horse级别消息 9
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 0
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 1
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 2
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 3
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 4
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 5
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 6
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 7
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 8
TopicLogReceiverB接收到消息:你好,这是quick.red.horse级别消息 9
管理后台中 exchange标签项中多出了一个topic交换机,名叫 topic_logs ,这就是我们刚声明的topic类型交换机。