消费者集群分摊消费模式
1.首先启动两个消费者
启动第一个消费者
package com.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.utils.MQConnectionUtils;
public class Consumer {
//队列名称
private static final String QUEUE_NAME ="zrf_644";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消费者01");
//1.创建一个新的连接
Connection connection = MQConnectionUtils.newConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.创建一个队列 参数1:队列名称 参数2:是否持久化 参数3:是否唯一 参数4:是否自动删除
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消费者获取生产者消息:"+msg);
}
};
//设置应该模式 设置为true 表示默认自动应答
channel.basicConsume(QUEUE_NAME, true,defaultConsumer);
}
}
启动第二个消费者
package com.consumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.utils.MQConnectionUtils;
public class Consumer {
//队列名称
private static final String QUEUE_NAME ="zrf_644";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("消费者02");
//1.创建一个新的连接
Connection connection = MQConnectionUtils.newConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.创建一个队列 参数1:队列名称 参数2:是否持久化 参数3:是否唯一 参数4:是否自动删除
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
//监听获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消费者获取生产者消息:"+msg);
}
};
//设置应该模式 设置为true 表示默认自动应答
channel.basicConsume(QUEUE_NAME, true,defaultConsumer);
}
}
启动循环生产10个数据
package com.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.utils.MQConnectionUtils;
//简单队列生产者
public class Producer {
//队列名称
private static final String QUEUE_NAME ="zrf_644";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个新的连接
Connection connection = MQConnectionUtils.newConnection();
//2.创建通道
Channel channel = connection.createChannel();
//3.创建一个队列 参数1:队列名称 参数2:是否持久化 参数3:是否唯一 参数4:是否自动删除
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for(int i=0;i<10;i++) {
//4.创建消息
String msg ="zrf_644444msg:"+i;
//5.生产者发送消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("生产者发送消息成功:"+msg);
}
//6.关闭通道和连接
channel.close();
connection.close();
}
}
消费者01的显示
消费者01
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
消费者获取生产者消息:zrf_644444msg:0
消费者获取生产者消息:zrf_644444msg:2
消费者获取生产者消息:zrf_644444msg:4
消费者获取生产者消息:zrf_644444msg:6
消费者获取生产者消息:zrf_644444msg:8
循环者02的显示
消费者02
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
消费者获取生产者消息:zrf_644444msg:1
消费者获取生产者消息:zrf_644444msg:3
消费者获取生产者消息:zrf_644444msg:5
消费者获取生产者消息:zrf_644444msg:7
消费者获取生产者消息:zrf_644444msg:9
可以看到这是根据分摊的方式进行消费的,实际上是根据取模的方式进行分摊消费的!