前一个例子使用的routing key是固定的,这个例子则是非固定的routing key,而binding key则使用了通配符。
基本知识
- topic exchange:和direct exchange相似,使用特定routing
key发送的消息会被投递到具有与之匹配的binding key的所有queue中。 - topic exchange routing key的格式:多个词(word)用点(.)连接起来的字符串,最大长度255字节。
topic exchange binding key的格式:和routing key相同,但可以使用通配符。
“* (star)“ can substitute for exactly one word.
”#(hash)“ can substitute for zero or more words.Topic exchange可以实现其它exchange的功能。
比如,一个queue使用“#”为binding key,那么它将接收所有message,这和fanout exchange的功能一样。如果queue的binding key既没有使用“*”,也没有使用“#”,那么它和direct exchange功能一样。
Toturial 5
(1) Producer
一切都在代码里,很简单。
//MyMessage.java
public class MyMessage {
//routing key
private String routingKey;
//<index>.<speed>.<color>.<species>
private String message;
public MyMessage(){
this.routingKey = "info";
this.message = "this is an info message";
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
//MyTask.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Random;
//producer发送消息给exchange,exchange决定把消息放到哪个queue里面
public class MyTask implements Runnable {
//message index,用来查看多个consumer同时从broker取message时,消息是怎么被分派的
private static Integer index = 0;
//
private static final String[] speeds = new String[]{"lazy", "general", "quick", "speeds"};
private static final String[] colors = new String[]{"orange", "pink", "brown", "colors"};
private static final String[] species = new String[]{"rabbit", "fox", "bird", "species"};
//define name of the exchange
private static final String EXCHANGE_NAME = "topic_logs";
//connection to the server(broker)
private Connection rbtMqConn;
//channel
private Channel rbtMqChnl;
//控制停止线程
private boolean isStop = false;
public void setIsStop(boolean stop){
this.isStop = stop;
}
@Override
public void run() {
try{
//1.创建一个connection链接到RabbitMQ服务器(connection为我们处理socket/协议/认证授权等)
ConnectionFactory factory = new ConnectionFactory();
//本例使用本机作为服务器;Consumer也从这个broker接收消息,也可以使用其它主机,比如172.16.21.111
factory.setHost("localhost");
rbtMqConn = factory.newConnection();
//2.创建一个channel
rbtMqChnl = rbtMqConn.createChannel();
//3.声明exchange,名字为EXCHANGE_NAME,类型为topic
//这一步必须,因为不能发布到不存在的exchange是不允许的
//通过rabbitmqctl list_exchanges -p <vhost>,查看vhost上的exchange
rbtMqChnl.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//如果没有queue绑定到exchange上,这些消息将会丢失,但这对本例来说没问题;
//如果没有consumer正在监听,我们可以放心地丢弃消息。
//send message per 3s
while (!isStop){
//随机生成message
MyMessage myMessage = generateMessage(index++);
//4.通过指定的exchange发布特定routingKey的消息:
rbtMqChnl.basicPublish(EXCHANGE_NAME,
myMessage.getRoutingKey()/*routing key*/,
null,
myMessage.getMessage().getBytes("UTF-8"));
System.out.println(" [Producer] Sent '" + myMessage.getRoutingKey() + "', '" + myMessage.getMessage() + "'");
Thread.sleep(3000);
}
}catch(Exception ex){
System.out.println(ex.getMessage());
}finally {
//5.最后,使用完之后,关闭channel和connection;
if(null != rbtMqChnl){
try{
rbtMqChnl.close();
}catch(Exception ignore){
}
}
if(null != rbtMqConn){
try{
rbtMqConn.close();
}catch(Exception ignore){
}
}
}
System.out.println(" [Producer] Send task stop");
}
/**
* 生成随机消息(包括routing key 和message),用于发送给RabbitMQ服务器
* */
public static MyMessage generateMessage(Integer index) {
MyMessage myMessage = new MyMessage();
StringBuilder message = new StringBuilder("The log is {");
//index
StringBuilder routingKey = new StringBuilder(String.format("[%d]", index));
Random rand = new Random();
//speed
Integer num = rand.nextInt(4);//[0, 3]
routingKey.append(".").append(speeds[num]);
message.append(speeds[num]).append("-");
//color
num = rand.nextInt(4);
routingKey.append(".").append(colors[num]);
message.append(colors[num]).append("-");
//species
num = rand.nextInt(4);
routingKey.append(".").append(species[num]);
message.append(species[num]).append("}");
//now we get a routing key: <index>.<speed>.<color>.<species>
//注意routing key不可以使用通配符,但binding key可以
myMessage.setRoutingKey(routingKey.toString());
//and also get a message
//注意消息和routing key没有必然联系,routing key相当于对消息进行了分类
myMessage.setMessage(message.toString());
return myMessage;
}
}
//MyProducer.java
public class MyProducer {
public static void main(String[] argv) throws Exception {
MyTask sendTask = new MyTask();
Thread thread = new Thread(sendTask);
thread.start();
//let the thread run 60 seconds
Thread.sleep(60000);
sendTask.setIsStop(true);
}
}
(2) Consumer
//MyConsumer.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer
{
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
//1.创建connection链接到RabbitMQ Server
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");//使用本机的RabbitMQ Server
Connection connection = factory.newConnection();
//2.创建channel
Channel channel = connection.createChannel();
//3.声明exchange,指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//4.使用随机生成的消息队列
//注意,当consumer退出连接时,该随机队列会自动删除
String queueName = channel.queueDeclare().getQueue();
System.out.println(" [Consumer] declare to get random queue: " + queueName);
//5.queue绑定到exchange,并指定binding key
//rabbitMQ接收消息之后给exchange,而exchange将根据该路由决定是否将消息放入该消息队列
//binding key的含义随exchange的类型的不同而不同;
//对于fanout类型的exchange来说,routing key是没有意义的,会被直接忽略。
//注意:支持多个绑定,见下面代码
//使用rabbitmqctl list_bindings命令,查看绑定
//channel.queueBind(queueName, EXCHANGE_NAME, "*.*.orange.*"/*bindingKey*/);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.*.rabbit"/*bindingKey*/);
channel.queueBind(queueName, EXCHANGE_NAME, "*.lazy.#"/*bindingKey*/);
//6.设置消息处理函数
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [Consumer] Received '" + envelope.getRoutingKey() + "', '" + message + "'");
}
};
//7.监听消息
channel.basicConsume(queueName, true, consumer);
}
}
(3) 运行结果
只有符合 “...rabbit” 和 “.lazy.#” 的消息被接收和处理。