版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/ljz2016/article/details/81866493
UdConsumerContainer这个类是用于消费端的,复用信道池。
public void start() throws Exception,调用start方法,会绑定所有消费者到相应的队列(每个消费者绑定的队列可以不同)。
这个类的处理模式是,先将消息缓存到本地队列中,然后用户自定义的消息处理器再从本地队列中获取并处理,根据设定的消息队列处理模式,给予rabbitmq相应的回应。
BlockingQueueConsumer负责将消息从中间件取回到本地,messageHandler负责处理本地消息。
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
try {
if(BlockingQueueConsumer.this.abortStarted > 0L) {
if(!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
BlockingQueueConsumer.this.queue.clear();
this.getChannel().basicNack(envelope.getDeliveryTag(), true, true);
this.getChannel().basicCancel(consumerTag);
try {
this.getChannel().close();
} catch (TimeoutException var6) {
;
}
}
} else {
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
}
} catch (InterruptedException var7) {
Thread.currentThread().interrupt();
}
}
private boolean receiveAndExecute(BlockingQueueConsumer consumer) throws InterruptedException, IOException {
Channel channel = consumer.getChannel();
Message message = consumer.nextMessage();
boolean success = this.messageHandler.handle(this.serializerMessageConverter.fromMessage(message));
if(!success) {
channel.basicNack(message.getDeliveryTag(), false, true);
} else {
channel.basicAck(message.getDeliveryTag(), false);
}
return success;
}
相关的源码:
public class UdConsumerContainer {
private RabbitMQHelper rabbitMQHelper;
private long consumerStartTimeout = 60000L;
private Set<BlockingQueueConsumer> consumers;
private volatile boolean active;
private volatile boolean running;
private volatile Map<IMessageHandler, String[]> messageHandlerMap;//消息处理器,以及绑定的队列
private Executor executor = new SimpleAsyncTaskExecutor();
private final Object lifecycleMonitor = new Object();
private final Object consumersMonitor = new Object();
private Integer declarationRetries = Integer.valueOf(1);//重试次数
private volatile IMessageHandler messageHandler;
private final SerializerMessageConverter serializerMessageConverter = new SerializerMessageConverter();
private final AcknowledgeMode ackMode; //消息确认的模式
private final int restartInterval;
private volatile int pretfetch = 10; //同时接受的消息数目
private final BlockingQueue<BlockingQueueConsumer> needRestart;
private final IMessageConventer messageConventer;
public UdConsumerContainer(@Autowired RabbitMQHelper rabbitMQHelper, @Autowired CacheConnectionFactory cacheConnectionFactory) {
this.ackMode = AcknowledgeMode.MANUAL;
this.messageConventer = new DefaultMessageConventer();
this.restartInterval = 30000;
this.needRestart = new LinkedBlockingQueue();
this.rabbitMQHelper = rabbitMQHelper;
}
public void start() throws Exception {
Object var1 = this.lifecycleMonitor;
synchronized(this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
var1 = this.consumersMonitor;
synchronized(this.consumersMonitor) {
if(this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
} else {
int newConsumers = this.initializeConsumers();
Set<UdConsumerContainer.AsyncMessageProcessingConsumer> processors = new HashSet();
Iterator iterator = this.consumers.iterator();
BlockingQueueConsumer consumer;
UdConsumerContainer.AsyncMessageProcessingConsumer processor;
while(iterator.hasNext()) {
consumer = (BlockingQueueConsumer)iterator.next();
processor = new UdConsumerContainer.AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.executor.execute(processor);
}
iterator = processors.iterator();
consumer = null;
while(iterator.hasNext()) {
processor = (UdConsumerContainer.AsyncMessageProcessingConsumer)iterator.next();
FatalStartupException startupException = processor.getStartupException();
if(startupException != null) {
throw new RuntimeException(startupException);
}
}
this.executor.execute(() -> {
while(true) {
BlockingQueueConsumer consumer = null;
try {
consumer = (BlockingQueueConsumer)this.needRestart.take();
this.restart(consumer);
} catch (Exception var5) {
try {
Thread.sleep((long)this.restartInterval);
} catch (InterruptedException var4) {
Thread.interrupted();
}
}
}
});
}
}
}
private int initializeConsumers() {
int count = 0;
Object var2 = this.consumersMonitor;
synchronized(this.consumersMonitor) {
if(this.consumers == null && this.messageHandlerMap != null) {
int size = this.messageHandlerMap.size();
this.consumers = new HashSet(size);
for(Iterator iMessageHandlers = this.messageHandlerMap.keySet().iterator(); iMessageHandlers.hasNext(); ++count) {
IMessageHandler iMessageHandler = (IMessageHandler)iMessageHandlers.next();
String[] queues = (String[])this.messageHandlerMap.get(iMessageHandler);
BlockingQueueConsumer blockingQueueConsumer = this.createBlockingQueueConsumer(iMessageHandler, queues);
this.consumers.add(blockingQueueConsumer);
}
}
return count;
}
}
public void setQueueNamesAndHandlers(Map<IMessageHandler, String[]> messageHandlerMap) {
this.messageHandlerMap = messageHandlerMap;
this.queuesChanged();
}
private void restart(BlockingQueueConsumer consumer) {
Object var2 = this.consumersMonitor;
synchronized(this.consumersMonitor) {
if(this.consumers != null) {
consumer.stop();
this.consumers.remove(consumer);
if(!this.isActive()) {
return;
}
BlockingQueueConsumer blockingQueueConsumer = this.createBlockingQueueConsumer(consumer.getMessageHandler(), consumer.getQueues());
this.consumers.add(blockingQueueConsumer);
UdConsumerContainer.AsyncMessageProcessingConsumer asyncMessageProcessingConsumer = new UdConsumerContainer.AsyncMessageProcessingConsumer(blockingQueueConsumer);
this.executor.execute(asyncMessageProcessingConsumer);
FatalStartupException fatalStartupException = null;
try {
fatalStartupException = asyncMessageProcessingConsumer.getStartupException();
if(fatalStartupException != null) {
this.consumers.remove(blockingQueueConsumer);
throw new RuntimeException("Fatal exception on listener startup", fatalStartupException);
}
} catch (InterruptedException var8) {
Thread.currentThread().interrupt();
} catch (Exception var9) {
blockingQueueConsumer.stop();
this.consumers.remove(blockingQueueConsumer);
throw new RuntimeException(var9);
}
}
}
}
private void queuesChanged() {
Object var1 = this.consumersMonitor;
synchronized(this.consumersMonitor) {
if(this.consumers != null) {
int count = 0;
for(Iterator consumerIterator = this.consumers.iterator(); consumerIterator.hasNext(); ++count) {
BlockingQueueConsumer consumer = (BlockingQueueConsumer)consumerIterator.next();
consumer.basicCancel(true);
consumerIterator.remove();
}
this.addAndStartConsumers();
}
}
}
private void addAndStartConsumers() {
Object var1 = this.consumersMonitor;
synchronized(this.consumersMonitor) {
if(this.consumers != null) {
int size = this.messageHandlerMap.size();
this.consumers = new HashSet(size);
Iterator iMessageHandlers = this.messageHandlerMap.keySet().iterator();
while(iMessageHandlers.hasNext()) {
IMessageHandler iMessageHandler = (IMessageHandler)iMessageHandlers.next();
String[] queues = (String[])this.messageHandlerMap.get(iMessageHandler);
BlockingQueueConsumer blockingQueueConsumer = this.createBlockingQueueConsumer(iMessageHandler, queues);
this.consumers.add(blockingQueueConsumer);
UdConsumerContainer.AsyncMessageProcessingConsumer asyncMessageProcessingConsumer = new UdConsumerContainer.AsyncMessageProcessingConsumer(blockingQueueConsumer);
this.executor.execute(asyncMessageProcessingConsumer);
FatalStartupException fatalStartupException = null;
try {
fatalStartupException = asyncMessageProcessingConsumer.getStartupException();
if(fatalStartupException != null) {
this.consumers.remove(blockingQueueConsumer);
throw new RuntimeException("Fatal exception on listener startup", fatalStartupException);
}
} catch (InterruptedException var11) {
Thread.currentThread().interrupt();
} catch (Exception var12) {
blockingQueueConsumer.stop();
this.consumers.remove(blockingQueueConsumer);
throw new RuntimeException(var12);
}
}
}
}
}
private BlockingQueueConsumer createBlockingQueueConsumer(IMessageHandler iMessageHandler, String[] queues) {
BlockingQueueConsumer consumer = new BlockingQueueConsumer(iMessageHandler, this.rabbitMQHelper.getChannel(), this.pretfetch, this.ackMode, this.declarationRetries, queues, this.messageConventer);
return consumer;
}
public boolean isActive() {
return this.active;
}
public boolean isRunning() {
return this.running;
}
private boolean receiveAndExecute(BlockingQueueConsumer consumer) throws InterruptedException, IOException {
Channel channel = consumer.getChannel();
Message message = consumer.nextMessage();
boolean success = this.messageHandler.handle(this.serializerMessageConverter.fromMessage(message));
if(!success) {
channel.basicNack(message.getDeliveryTag(), false, true);
} else {
channel.basicAck(message.getDeliveryTag(), false);
}
return success;
}
private boolean isActive(BlockingQueueConsumer consumer) {
Object var3 = this.consumersMonitor;
boolean consumerActive;
synchronized(this.consumersMonitor) {
consumerActive = this.consumers != null && this.consumers.contains(consumer);
}
return consumerActive && this.isActive();
}
public IMessageHandler getMessageHandler() {
return this.messageHandler;
}
public UdConsumerContainer setMessageHandler(IMessageHandler messageHandler) {
this.messageHandler = messageHandler;
return this;
}
public int getPretfetch() {
return this.pretfetch;
}
public UdConsumerContainer setPretfetch(int pretfetch) {
this.pretfetch = pretfetch;
return this;
}
private class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalStartupException startupException;
AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
public void run() {
if(UdConsumerContainer.this.isActive()) {
boolean abort = false;
this.consumer.start();
this.start.countDown();
while(UdConsumerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean var2 = UdConsumerContainer.this.receiveAndExecute(this.consumer);
} catch (InterruptedException var6) {
Thread.interrupted();
} catch (IOException var7) {
synchronized(UdConsumerContainer.this.consumersMonitor) {
UdConsumerContainer.this.needRestart.offer(this.consumer);
break;
}
}
}
}
}
public FatalStartupException getStartupException() throws InterruptedException {
if(!this.start.await(UdConsumerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
;
}
return this.startupException;
}
}
public class BlockingQueueConsumer {
private Channel channel;
private volatile boolean normalCancel;
private final Map<String, String> consumerTags;
private final AtomicBoolean cancelled;
private final BlockingQueue<Delivery> queue;
private final AcknowledgeMode acknowledgeMode;
private final Set<String> missingQueues;
private long failedDeclarationRetryInterval;
private volatile long abortStarted;
private BlockingQueueConsumer.InnerConsumer consumer;
private int declarationRetries;
private long lastRetryDeclaration;
private final int prefectCount;
private final String[] queues;
private long shutdownTimeout;
private final IMessageConventer messageConventer;
private final IMessageHandler messageHandler;
public String[] getQueues() {
return this.queues;
}
public BlockingQueueConsumer(IMessageHandler iMessageHandler, Channel channel, int prefetchCount, AcknowledgeMode acknowledgeMode, Integer declarationRetries, String[] queues, IMessageConventer messageConventer) {
this.messageHandler = iMessageHandler;
this.messageConventer = messageConventer;
this.cancelled = new AtomicBoolean(false);
this.channel = channel;
this.consumerTags = new ConcurrentHashMap();
this.missingQueues = Collections.synchronizedSet(new HashSet());
this.acknowledgeMode = acknowledgeMode;
this.prefectCount = prefetchCount;
this.failedDeclarationRetryInterval = 5000L;
this.queue = new LinkedBlockingDeque(prefetchCount);
this.queues = (String[])((String[])Arrays.copyOf(queues, queues.length));
}
public void basicCancel(boolean excepted) {
this.normalCancel = excepted;
Iterator iterator = this.consumerTags.keySet().iterator();
while(iterator.hasNext()) {
String consumerTag = (String)iterator.next();
this.removeConsumer(consumerTag);
try {
if(this.channel.isOpen()) {
this.channel.basicCancel(consumerTag);
}
} catch (IOException var5) {
var5.printStackTrace();
}
}
this.abortStarted = System.currentTimeMillis();
}
private boolean removeConsumer(String consumerTag) {
this.consumerTags.remove(consumerTag);
if(this.consumerTags.isEmpty()) {
this.cancelled.set(true);
return false;
} else {
return true;
}
}
public void stop() {
if(this.abortStarted == 0L) {
this.abortStarted = System.currentTimeMillis();
}
if(this.consumer != null && this.consumer.getChannel() != null && this.consumerTags.size() > 0 && !this.cancelled.get()) {
RabbitUtil.closeMessageConsumer(this.channel, this.consumerTags.keySet());
}
try {
this.channel.close();
} catch (IOException var2) {
;
} catch (TimeoutException var3) {
;
}
this.consumer = null;
this.queue.clear();
}
public void setDeclarationRetries(int declarationRetries) {
this.declarationRetries = declarationRetries;
}
public int getQueueCount() {
return this.queue.size();
}
public boolean hasDelivery() {
return !this.queue.isEmpty();
}
public boolean cancelled() {
return this.cancelled.get() || this.abortStarted > 0L;
}
public IMessageHandler getMessageHandler() {
return this.messageHandler;
}
public Channel getChannel() {
return this.channel;
}
public void start() {
this.consumer = new BlockingQueueConsumer.InnerConsumer(this.channel);
int passiveDeclareRetries = this.declarationRetries;
while(!this.cancelled()) {
try {
this.attemptPassiveDeclarations();
passiveDeclareRetries = 0;
} catch (BlockingQueueConsumer.DeclarationException var10) {
if(passiveDeclareRetries > 0 && this.channel.isOpen()) {
try {
Thread.sleep(this.failedDeclarationRetryInterval);
} catch (InterruptedException var9) {
Thread.currentThread().interrupt();
throw new RuntimeException("声明队列失败重试时,休眠失败。");
}
} else {
if(var10.getFailedQueues().size() == this.queue.size()) {
throw new RuntimeException("所有队列都不可用");
}
this.missingQueues.addAll(var10.getFailedQueues());
this.lastRetryDeclaration = System.currentTimeMillis();
}
}
if(passiveDeclareRetries-- <= 0 || this.cancelled()) {
break;
}
}
if(!this.acknowledgeMode.isAutoAck() && !this.cancelled()) {
try {
this.channel.basicQos(this.prefectCount);
} catch (IOException var8) {
throw new RuntimeException(var8);
}
}
String[] queueNames = this.queues;
int len = queueNames.length;
for(int i = 0; i < len; ++i) {
String queueName = queueNames[i];
if(!this.missingQueues.contains(queueName)) {
try {
String consumerTag = this.channel.basicConsume(queueName, this.acknowledgeMode.isAutoAck(), this.consumer);
if(consumerTag != null) {
this.consumerTags.put(consumerTag, queueName);
}
} catch (IOException var7) {
throw new RuntimeException(var7);
}
}
}
}
private void attemptPassiveDeclarations() {
BlockingQueueConsumer.DeclarationException de = null;
String[] queueNames = this.queues;
int len = queueNames.length;
for(int i = 0; i < len; ++i) {
String queueName = queueNames[i];
try {
this.channel.queueDeclarePassive(queueName);
} catch (IOException var7) {
if(de == null) {
de = new BlockingQueueConsumer.DeclarationException(var7);
}
de.addFailedQueue(queueName);
}
}
}
public Message nextMessage() throws InterruptedException {
return this.handle((Delivery)this.queue.take());
}
private Message handle(Delivery delivery) {
if(delivery == null) {
return null;
} else {
byte[] body = delivery.getBody();
Envelope envelope = delivery.getEnvelope();
MessageProperties messageProperties = this.messageConventer.toMessageProperties(delivery.getProperties(), envelope, "UTF-8");
messageProperties.setConsumerTag(delivery.getConsumerTag());
messageProperties.setConsumerQueue((String)this.consumerTags.get(delivery.getConsumerTag()));
Message message = new Message(body, messageProperties, envelope.getDeliveryTag());
return message;
}
}
private static final class DeclarationException extends RuntimeException {
private final List<String> failedQueues;
DeclarationException() {
super("Failed to declare queue(s):");
this.failedQueues = new ArrayList();
}
private DeclarationException(Throwable t) {
super("Failed to declare queue(s):", t);
this.failedQueues = new ArrayList();
}
private void addFailedQueue(String queue) {
this.failedQueues.add(queue);
}
private List<String> getFailedQueues() {
return this.failedQueues;
}
public String getMessage() {
return super.getMessage() + this.failedQueues.toString();
}
}
private class InnerConsumer extends DefaultConsumer {
public InnerConsumer(Channel channel) {
super(channel);
}
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
try {
if(BlockingQueueConsumer.this.abortStarted > 0L) {
if(!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
BlockingQueueConsumer.this.queue.clear();
this.getChannel().basicNack(envelope.getDeliveryTag(), true, true);
this.getChannel().basicCancel(consumerTag);
try {
this.getChannel().close();
} catch (TimeoutException var6) {
;
}
}
} else {
BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
}
} catch (InterruptedException var7) {
Thread.currentThread().interrupt();
}
}
public void handleCancel(String consumerTag) throws IOException {
if(BlockingQueueConsumer.this.removeConsumer(consumerTag)) {
BlockingQueueConsumer.this.basicCancel(false);
}
}
}