RabbitMq05——订阅模式

    有时候我们希望的是当生产者发送一条消息的时候,与它相关的消费者都能接收到该消息,而不是每一条消息只能一个消费者消费。即相当于我们关注的公众号,公众号发送一条消息,但是所有关注它的用户都可以收到该条消息。

    要实现这种模式,需要加入交换机来作为中间转换。即生产者发送消息不再直接发到队列,而是发送到交换机,再由交换机发送到对应的队列之中。但是每一个消费者都有一个自己的队列,并与之绑定,同时再将队列都绑定到交换机上,这样就可以实现一条消息有多个消费者来消费,也就是订阅模式

生产者

package com.mmr.rabbitmq.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {
	
	private final static String EXCHANGE_NAME = "fanout_exchange";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		//获取MQ连接
		Connection connection = ConnectionUtils.getConnection();
		//获取信道
		Channel channel = connection.createChannel();
		
		//声明交换机,设置交换机类型为fanout
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		String message = "i am coming------>";
		
		for (int i = 0; i < 20; i++) {
			//向交换机中发送消息
			channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes());
			System.out.println(message + i);
			
		}
		channel.close();
		connection.close();
	}
}
消费者1
package com.mmr.rabbitmq.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Rev1 {
	
	private final static String EXCHANGE_NAME = "fanout_exchange";
	private final static String QUEUE_NAME = "fanout_queue01";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		
		Connection connection = ConnectionUtils.getConnection();
		final Channel channel = connection.createChannel();
		//声明队列
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//声明每次向消费者发送一条消息
		channel.basicQos(1);
		//将队列与交换机绑定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		//创建一个消费者
		DefaultConsumer consumer = new DefaultConsumer(channel){

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
					byte[] body) throws IOException {
				String msg = new String(body, "utf-8");
				
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					//一般用于异常处理,告知MQ消息处理失败,
					//第二个参数为true,则该条消息被重新放回队列,为false则放弃该条消息
//					channel.basicReject(envelope.getDeliveryTag(), false);
					e.printStackTrace();
				}finally {
					System.out.println("Recv1---->" + msg + "---->Done");
					//向MQ发送确认,告诉MQ该条消息已经消费,可以重新发送了
					//第二个参数为false表示只确认当前这一条消息,
					//如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认(批量确认针对的是整个信道)
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

消费者2

    消费者2与消费者1类似

package com.mmr.rabbitmq.fanout;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Rev2 {
	
	private final static String EXCHANGE_NAME = "fanout_exchange";
	private final static String QUEUE_NAME = "fanout_queue02";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		
		Connection connection = ConnectionUtils.getConnection();
		final Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		channel.basicQos(1);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		DefaultConsumer consumer = new DefaultConsumer(channel){

			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
					byte[] body) throws IOException {
				String msg = new String(body, "utf-8");
				
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally {
					System.out.println("Recv1---->" + msg + "---->Done");
					channel.basicAck(envelope.getDeliveryTag(), false);
				}
			}
		};
		channel.basicConsume(QUEUE_NAME, false, consumer);
	}
}

运行两个消费者,然后运行生产者,从控制台我们可以看到消费者1和消费者2同时都收到了生产者所发送的消息。



猜你喜欢

转载自blog.csdn.net/yangsheng0111/article/details/80958685