官网英文版学习——RabbitMQ学习笔记(六)Routing

有选择的接收消息。

        上一节我们使用的是fanout exchange来实现消息的发布/订阅模式,这并没有给我们带来多大的灵活性——它只能够让人盲目地进行广播。而本节我们采用direct类型的交换器来实现有选择的接收消息。直接交换器背后的路由算法很简单——消息传递到绑定键与消息的路由键完全匹配的队列。


        如上这个设置中,我们可以看到与它绑定的两个队列的直接交换X。第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个绑定键为绿色。

        在这个设置里面,一个被发布到交换器里的带有橙色路由键的消息将被路由到队列Q1中,带有黑色或绿色路由键的消息将去往队列Q2中,所有其他消息将被舍弃掉。

        下面这个是多重绑定,该绑定相似与上一节的发布/订阅模式


        使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键黑。在这种情况下,直接交换将表现为fanout类型,并将消息广播给所有匹配的队列。带有black路由键的消息将被发送到Q1和Q2。

      接下来,我们将采用这个模型,代替fanout来发送消息,以这种方式进行,接收程序放可以选择性的接收消息。

该篇与上篇区别仅在于,发布和订阅方的交换器的类型变为direct,同时设置发布消息的路由键,本篇将其设置为“error”,将订阅方其中一个队列的路由键设置为“error”,另一个设置为“bug”,则运行后,只有路由键和队列名称相同的一方能够收到消息,另一个bug路由键的收不到消息。

代码如下(在上篇代码上改造,修改的代码加上了下划线以便区分):

发布方:

package com.rabbitmq.HelloWorld;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publish {
	
	private static final String EXCHANGE_NAME = "exchangeB";

	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
//		创建工厂
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
//		创建连接
		Connection connetion = factory.newConnection();
//		获得信道
		Channel channel = connetion.createChannel();
//		声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct类型的交换器)
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		String message = "555,2,2,33,66";
//		发送消息,将第二项参数routingkey修改为error
		channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		channel.close();
		connetion.close();
	}

}

订阅方一,路由键为“error”

package com.rabbitmq.HelloWorld;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Subscribe {
	
	private static final String EXCHANGE_NAME = "exchangeB";
	private static final String QUEUE_NAME = "queueA";

	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
//		创建工厂
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
//		创建连接
		Connection connetion = factory.newConnection();
//		获得信道
		Channel channel = connetion.createChannel();
//		声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct的交换器)
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//		声明一个队列,在此采用临时队列
		String queueName = channel.queueDeclare().getQueue();
//		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//		队列和交换器进行绑定,并设定路由键为error
		channel.queueBind(queueName, EXCHANGE_NAME, "error");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				// TODO Auto-generated method stub
				String message = new String(body,"utf-8");
				System.out.println("[x] received'"+message+"'");
			}
		};
		channel.basicConsume(queueName, consumer);
	}

}

订阅方二(路由键为“bug”)

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Subscribe {
	
	private static final String EXCHANGE_NAME = "exchangeB";
	private static final String QUEUE_NAME = "queueA";

	public static void main(String[] args) throws IOException, TimeoutException {
		// TODO Auto-generated method stub
//		创建工厂
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.10.185");
		factory.setUsername("admin");
		factory.setPassword("123456");
		factory.setPort(5672);
//		创建连接
		Connection connetion = factory.newConnection();
//		获得信道
		Channel channel = connetion.createChannel();
//		声明交换器(声明了一个名字位exchangeA,修改fanout类型为direct类型的交换器�?
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//		声明�?个队列,在此采用临时队列
		String queueName = channel.queueDeclare().getQueue();
//		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//		队列和交换器进行绑定,未设定路由键
		channel.queueBind(queueName, EXCHANGE_NAME, "bug");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body) throws IOException {
				// TODO Auto-generated method stub
				String message = new String(body,"utf-8");
				System.out.println("[x] received'"+message+"'");
			}
		};
		channel.basicConsume(queueName, consumer);
	}

}

运行结果:

下面这个是在另一个项目中,没有引进日志打印的包,红色忽略即可

   效果已经呈现了。

 

有选择的接收消息。

        上一节我们使用的是fanout exchange来实现消息的发布/订阅模式,这并没有给我们带来多大的灵活性——它只能够让人盲目地进行广播。而本节我们采用direct类型的交换器来实现有选择的接收消息。直接交换器背后的路由算法很简单——消息传递到绑定键与消息的路由键完全匹配的队列。


        如上这个设置中,我们可以看到与它绑定的两个队列的直接交换X。第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个绑定键为绿色。

        在这个设置里面,一个被发布到交换器里的带有橙色路由键的消息将被路由到队列Q1中,带有黑色或绿色路由键的消息将去往队列Q2中,所有其他消息将被舍弃掉。

        下面这个是多重绑定,该绑定相似与上一节的发布/订阅模式


        使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在X和Q1之间添加绑定键黑。在这种情况下,直接交换将表现为fanout类型,并将消息广播给所有匹配的队列。带有black路由键的消息将被发送到Q1和Q2。

      接下来,我们将采用这个模型,代替fanout来发送消息,以这种方式进行,接收程序放可以选择性的接收消息。

该篇与上篇区别仅在于,发布和订阅方的交换器的类型变为direct,同时设置发布消息的路由键,本篇将其设置为“error”,将订阅方其中一个队列的路由键设置为“error”,另一个设置为“bug”,则运行后,只有路由键和队列名称相同的一方能够收到消息,另一个bug路由键的收不到消息。

代码如下(在上篇代码上改造,修改的代码加上了下划线以便区分):

发布方:

[java]  view plain  copy
 
  1. package com.rabbitmq.HelloWorld;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.concurrent.TimeoutException;  
  5.   
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9.   
  10. public class Publish {  
  11.       
  12.     <u>private static final String EXCHANGE_NAME = "exchangeB";</u>  
  13.   
  14.     public static void main(String[] args) throws IOException, TimeoutException {  
  15.         // TODO Auto-generated method stub  
  16. //      创建工厂  
  17.         ConnectionFactory factory = new ConnectionFactory();  
  18.         factory.setHost("192.168.10.185");  
  19.         factory.setUsername("admin");  
  20.         factory.setPassword("123456");  
  21.         factory.setPort(5672);  
  22. //      创建连接  
  23.         Connection connetion = factory.newConnection();  
  24. //      获得信道  
  25.         Channel channel = connetion.createChannel();  
  26. //      <u>声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct类型的交换器)  
  27.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");</u>  
  28.         String message = "555,2,2,33,66";  
  29. //      <u>发送消息,将第二项参数routingkey修改为error  
  30.         channel.basicPublish(EXCHANGE_NAME, "error"null, message.getBytes());</u>  
  31.         System.out.println(" [x] Sent '" + message + "'");  
  32.         channel.close();  
  33.         connetion.close();  
  34.     }  
  35.   
  36. }  

订阅方一,路由键为“error”

[java]  view plain  copy
 
  1. package com.rabbitmq.HelloWorld;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.concurrent.TimeoutException;  
  5.   
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9. import com.rabbitmq.client.Consumer;  
  10. import com.rabbitmq.client.DefaultConsumer;  
  11. import com.rabbitmq.client.Envelope;  
  12. import com.rabbitmq.client.AMQP.BasicProperties;  
  13.   
  14. public class Subscribe {  
  15.       
  16.     <u>private static final String EXCHANGE_NAME = "exchangeB";</u>  
  17.     private static final String QUEUE_NAME = "queueA";  
  18.   
  19.     public static void main(String[] args) throws IOException, TimeoutException {  
  20.         // TODO Auto-generated method stub  
  21. //      创建工厂  
  22.         ConnectionFactory factory = new ConnectionFactory();  
  23.         factory.setHost("192.168.10.185");  
  24.         factory.setUsername("admin");  
  25.         factory.setPassword("123456");  
  26.         factory.setPort(5672);  
  27. //      创建连接  
  28.         Connection connetion = factory.newConnection();  
  29. //      获得信道  
  30.         Channel channel = connetion.createChannel();  
  31. //      <u>声明交换器(声明了一个名字位exchangeA,类型修改fanout为direct的交换器)  
  32.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");</u>  
  33. //      声明一个队列,在此采用临时队列  
  34.         String queueName = channel.queueDeclare().getQueue();  
  35. //      channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
  36. //      队列和交换器进行绑定,并设定路由键为error  
  37.         <u>channel.queueBind(queueName, EXCHANGE_NAME, "error");</u>  
  38.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  39.         Consumer consumer = new DefaultConsumer(channel){  
  40.             @Override  
  41.             public void handleDelivery(String consumerTag, Envelope envelope,  
  42.                     BasicProperties properties, byte[] body) throws IOException {  
  43.                 // TODO Auto-generated method stub  
  44.                 String message = new String(body,"utf-8");  
  45.                 System.out.println("[x] received'"+message+"'");  
  46.             }  
  47.         };  
  48.         channel.basicConsume(queueName, consumer);  
  49.     }  
  50.   
  51. }  

订阅方二(路由键为“bug”)

[java]  view plain  copy
 
  1. import java.io.IOException;  
  2. import java.util.concurrent.TimeoutException;  
  3.   
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.Consumer;  
  8. import com.rabbitmq.client.DefaultConsumer;  
  9. import com.rabbitmq.client.Envelope;  
  10. import com.rabbitmq.client.AMQP.BasicProperties;  
  11.   
  12. public class Subscribe {  
  13.       
  14.     <u>private static final String EXCHANGE_NAME = "exchangeB";</u>  
  15.     private static final String QUEUE_NAME = "queueA";  
  16.   
  17.     public static void main(String[] args) throws IOException, TimeoutException {  
  18.         // TODO Auto-generated method stub  
  19. //      创建工厂  
  20.         ConnectionFactory factory = new ConnectionFactory();  
  21.         factory.setHost("192.168.10.185");  
  22.         factory.setUsername("admin");  
  23.         factory.setPassword("123456");  
  24.         factory.setPort(5672);  
  25. //      创建连接  
  26.         Connection connetion = factory.newConnection();  
  27. //      获得信道  
  28.         Channel channel = connetion.createChannel();  
  29. //      <u>声明交换器(声明了一个名字位exchangeA,修改fanout类型为direct类型的交换器�?  
  30.         channel.exchangeDeclare(EXCHANGE_NAME, "direct");</u>  
  31. //      声明�?个队列,在此采用临时队列  
  32.         String queueName = channel.queueDeclare().getQueue();  
  33. //      channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
  34. //      <u>队列和交换器进行绑定,未设定路由键  
  35.         channel.queueBind(queueName, EXCHANGE_NAME, "bug");</u>  
  36.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  37.         Consumer consumer = new DefaultConsumer(channel){  
  38.             @Override  
  39.             public void handleDelivery(String consumerTag, Envelope envelope,  
  40.                     BasicProperties properties, byte[] body) throws IOException {  
  41.                 // TODO Auto-generated method stub  
  42.                 String message = new String(body,"utf-8");  
  43.                 System.out.println("[x] received'"+message+"'");  
  44.             }  
  45.         };  
  46.         channel.basicConsume(queueName, consumer);  
  47.     }  
  48.   
  49. }  

运行结果:

下面这个是在另一个项目中,没有引进日志打印的包,红色忽略即可

   效果已经呈现了。

猜你喜欢

转载自www.cnblogs.com/xiaoyao-001/p/9193428.html