Spring Boot对RabbitMQ进行了很好的支持,今天简单的从源码层面分析RabbitMQ推送消息后如何找到我们监听方法的,以及Spring Cloud Stream中如何找到的。
首先引入pom文件,版本基于spring boot 2.2.7。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置类RabbitmqConfig
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_email_test";
public static final String QUEUE_INFORM_SMS = "queue_sms_test";
public static final String QUEUE_DEAD="queue_dead";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_test";
public static final String EXCHANGE_DIRECT_INFORM="exchange_direct_test";
private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 交换机配置
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
/**
* 交换机配置
* ExchangeBuilder提供了fanout、direct、topic、header交换机类型的配置
* @return the exchange
*/
@Bean(EXCHANGE_DIRECT_INFORM)
public Exchange EXCHANGE_DIRECT_INFORM() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.directExchange(EXCHANGE_DIRECT_INFORM).durable(true).build();
}
//声明队列
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信交换机
args.put(DEAD_LETTER_QUEUE_KEY, EXCHANGE_DIRECT_INFORM);
// x-dead-letter-routing-key 声明 死信路由键
args.put(DEAD_LETTER_ROUTING_KEY, "queue_dead");
// Queue queue = new Queue(QUEUE_INFORM_SMS,true,false,false,args);
Queue queue = QueueBuilder.durable(QUEUE_INFORM_SMS).withArguments(args).build();
return queue;
}
//声明队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
return queue;
}
//声明队列
@Bean(QUEUE_DEAD)
public Queue QUEUE_DEAD() {
Queue queue = new Queue(QUEUE_DEAD);
return queue;
}
/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* 绑定队列到交换机 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_SMS2(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_DIRECT_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.sms.test").noargs();
}
@Bean
public Binding BINDING_QUEUE_DEAD(@Qualifier(QUEUE_DEAD) Queue queue, @Qualifier(EXCHANGE_DIRECT_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_DEAD).noargs();
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE_DIRECT_INFORM,QUEUE_DEAD);
}
}
监听类:
@Component
public class ReceiveHandler {
//监听email队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg, Message message, Channel channel) throws Exception {
System.out.println("email队列接收消息:"+msg);
}
}
发送类:
@Component
public class RabbitmqSend {
@Autowired
RabbitTemplate rabbitTemplate;
public void testSendByTopics(){
DlxMsg messagePostProcessor = new DlxMsg(5000l);
for (int i=0;i<1;i++){
String message = "sms email inform to user"+i;
rabbitTemplate.convertAndSend(RabbitmqConfig.QUEUE_INFORM_EMAIL,"inform.sms.email",message);
System.out.println("Send Message is:'" + message + "'");
}
}
}
启动类:
@SpringBootApplication
public class RabbitmqDemoApplication {
@Autowired
RabbitmqSend rabbitmqSend;
public static void main(String[] args) throws Exception{
ConfigurableApplicationContext run = SpringApplication.run(RabbitmqDemoApplication.class, args);
RabbitmqSend rabbitmqSend = (RabbitmqSend)run.getBean("rabbitmqSend");
rabbitmqSend.testSendByTopics();
}
}
application.yml:
server:
port: 44001
spring:
application:
name: test-rabbitmq-producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
passowrd: guest
virtualHost: /
这些启动完就可以直接运行了,现在直接分析源码。
我们要分析的就是RabbitMQ如何直接找到标有@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})注解的方法。
在spring-boot-autoconfigure.jar包中的spring.factories文件中,我们可以看到有自动注入RabbitAutoConfiguration类。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
....................
}
我们直接看RabbitAnnotationDrivenConfiguration类。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
...............
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "direct")
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
DirectRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
该类中SimpleRabbitListenerContainerFactory和DirectRabbitListenerContainerFactory比较重要。两个类都继承自AbstractRabbitListenerContainerFactory类,这两个类创建的SimpleMessageListenerContainer和DirectMessageListenerContainer主要是用来负责从RabbitMQ接收消息并转发到我们自己的方法上。大家先记住这两个类。
RabbitAnnotationDrivenConfiguration类上有EnableRabbit。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitListenerConfigurationSelector.class)
public @interface EnableRabbit {
}
public class RabbitListenerConfigurationSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { RabbitBootstrapConfiguration.class.getName() };
}
}
public class RabbitBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(RabbitListenerEndpointRegistry.class));
}
}
}
EnableRabbit导入了RabbitBootstrapConfiguration,RabbitBootstrapConfiguration会注册进RabbitListenerAnnotationBeanPostProcessor和RabbitListenerEndpointRegistry两个类。
我们首先看RabbitListenerAnnotationBeanPostProcessor类的postProcessAfterInitialization方法。
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
在创建每一个Bean对象之后,会检测该Bean中是否有包含@RabbitListener注解的方法,如果有则会执行processAmqpListener方法。
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
方法会创建一个MethodRabbitListenerEndpoint(Rabbit监听方法终端)对象。
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
Object errorHandler = resolveExpression(rabbitListener.errorHandler());
if (errorHandler instanceof RabbitListenerErrorHandler) {
endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
}
else if (errorHandler instanceof String) {
String errorHandlerBeanName = (String) errorHandler;
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
}
else {
throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
+ errorHandler.getClass().toString());
}
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String autoStartup = rabbitListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
}
endpoint.setExclusive(rabbitListener.exclusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
try {
endpoint.setPriority(Integer.valueOf(priority));
}
catch (NumberFormatException ex) {
throw new BeanInitializationException("Invalid priority value for " +
rabbitListener + " (must be an integer)", ex);
}
}
resolveExecutor(endpoint, rabbitListener, target, beanName);
resolveAdmin(endpoint, rabbitListener, target);
resolveAckMode(endpoint, rabbitListener);
resolvePostProcessor(endpoint, rabbitListener, target, beanName);
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);
this.registrar.registerEndpoint(endpoint, factory);
}
endpoint经过一系列设置后,执行this.registrar.registerEndpoint(endpoint, factory);方法。registrar=RabbitListenerEndpointRegistrar(),在该类创建时定义。
RabbitListenerEndpointRegistrar.registerEndpoint方法
public void registerEndpoint(RabbitListenerEndpoint endpoint,
@Nullable RabbitListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
Assert.state(!this.startImmediately || this.endpointRegistry != null, "No registry available");
// Factory may be null, we defer the resolution right before actually creating the container
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
因为startImmediately=false,此处只是将endpoint加入到list中。
继续看RabbitListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated方法。
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, RabbitListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(RabbitListenerConfigurer.class);
for (RabbitListenerConfigurer configurer : instances.values()) {
configurer.configureRabbitListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
// clear the cache - prototype beans will be re-cached.
this.typeCache.clear();
}
直接进入registrar.afterPropertiesSet()。
public void afterPropertiesSet() {
registerAllEndpoints();
}
protected void registerAllEndpoints() {
Assert.state(this.endpointRegistry != null, "No registry available");
synchronized (this.endpointDescriptors) {
for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(// NOSONAR never null
descriptor.endpoint, resolveContainerFactory(descriptor));
}
this.startImmediately = true; // trigger immediate startup
}
}
这里的endpointRegistry就是RabbitListenerEndpointRegistry类。执行registerListenerContainer方法。
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
.........................................
}
}
继续进入createListenerContainer方法
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
....................
}
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
JavaUtils javaUtils =
JavaUtils.INSTANCE
.acceptIfNotNull(this.connectionFactory, instance::setConnectionFactory)
.acceptIfNotNull(this.errorHandler, instance::setErrorHandler);
if (this.messageConverter != null && endpoint != null) {
endpoint.setMessageConverter(this.messageConverter);
}
javaUtils
.acceptIfNotNull(this.acknowledgeMode, instance::setAcknowledgeMode)
.acceptIfNotNull(this.channelTransacted, instance::setChannelTransacted)
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
.acceptIfNotNull(this.taskExecutor, instance::setTaskExecutor)
.acceptIfNotNull(this.transactionManager, instance::setTransactionManager)
.acceptIfNotNull(this.prefetchCount, instance::setPrefetchCount)
.acceptIfNotNull(this.defaultRequeueRejected, instance::setDefaultRequeueRejected)
.acceptIfNotNull(this.adviceChain, instance::setAdviceChain)
.acceptIfNotNull(this.recoveryBackOff, instance::setRecoveryBackOff)
.acceptIfNotNull(this.mismatchedQueuesFatal, instance::setMismatchedQueuesFatal)
.acceptIfNotNull(this.missingQueuesFatal, instance::setMissingQueuesFatal)
.acceptIfNotNull(this.consumerTagStrategy, instance::setConsumerTagStrategy)
.acceptIfNotNull(this.idleEventInterval, instance::setIdleEventInterval)
.acceptIfNotNull(this.failedDeclarationRetryInterval, instance::setFailedDeclarationRetryInterval)
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
.acceptIfNotNull(this.autoStartup, instance::setAutoStartup)
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors)
.acceptIfNotNull(this.deBatchingEnabled, instance::setDeBatchingEnabled);
if (this.batchListener && this.deBatchingEnabled == null) {
// turn off container debatching by default for batch listeners
instance.setDeBatchingEnabled(false);
}
if (endpoint != null) { // endpoint settings overriding default factory settings
javaUtils
.acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup)
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor)
.acceptIfNotNull(endpoint.getAckMode(), instance::setAcknowledgeMode);
javaUtils
.acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy);
instance.setListenerId(endpoint.getId());
endpoint.setBatchListener(this.batchListener);
endpoint.setupListenerContainer(instance);
}
if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
AbstractAdaptableMessageListener messageListener = (AbstractAdaptableMessageListener) instance
.getMessageListener();
javaUtils
.acceptIfNotNull(this.beforeSendReplyPostProcessors,
messageListener::setBeforeSendReplyPostProcessors)
.acceptIfNotNull(this.retryTemplate, messageListener::setRetryTemplate)
.acceptIfCondition(this.retryTemplate != null && this.recoveryCallback != null,
this.recoveryCallback, messageListener::setRecoveryCallback)
.acceptIfNotNull(this.defaultRequeueRejected, messageListener::setDefaultRequeueRejected)
.acceptIfNotNull(endpoint.getReplyPostProcessor(), messageListener::setReplyPostProcessor);
}
initializeContainer(instance, endpoint);
if (this.containerCustomizer != null) {
this.containerCustomizer.configure(instance);
}
return instance;
}
这里的createContainerInstance方法就是会根据之前创建的SimpleMessageListenerContainer和DirectMessageListenerContainer其中一个。
之后会执行到endpoint.setupListenerContainer(instance);这行。
public void setupListenerContainer(MessageListenerContainer listenerContainer) {
AbstractMessageListenerContainer container = (AbstractMessageListenerContainer) listenerContainer;
......................
setupMessageListener(listenerContainer);
}
private void setupMessageListener(MessageListenerContainer container) {
MessageListener messageListener = createMessageListener(container);
Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener");
container.setupMessageListener(messageListener);
}
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
messageListener.setHandlerAdapter(configureListenerAdapter(messageListener));
String replyToAddress = getDefaultReplyToAddress();
if (replyToAddress != null) {
messageListener.setResponseAddress(replyToAddress);
}
MessageConverter messageConverter = getMessageConverter();
if (messageConverter != null) {
messageListener.setMessageConverter(messageConverter);
}
if (getBeanResolver() != null) {
messageListener.setBeanResolver(getBeanResolver());
}
return messageListener;
}
createMessageListener方法会创建一个MessagingMessageListenerAdapter对象,之后configureListenerAdapter(messageListener)方法会创建一个HandlerAdapter对象用于关联对应的Bean和方法。
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
InvocableHandlerMethod invocableHandlerMethod =
this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
return new HandlerAdapter(invocableHandlerMethod);
}
此处的步骤就是
1、通过SimpleMessageListenerContainer或DirectMessageListenerContainer,找到MessagingMessageListenerAdapter
2、通过MessagingMessageListenerAdapter找到对应的HandlerAdapter
3、通过HandlerAdapter找到对应的Bean和方法。
好了,那么重点就在于如何通过RabbitMq找到SimpleMessageListenerContainer或DirectMessageListenerContainer了。
继续回到RabbitBootstrapConfiguration类,我们知道该类还注册了一个RabbitListenerEndpointRegistry。
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener<ContextRefreshedEvent> {
...............
}
该类实现了SmartLifecycle方法。那么我们就看看它的start方法。
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
startIfNecessary(listenerContainer);
}
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
public void start() {
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
}
}
}
try {
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
finally {
this.lazyLoad = false;
}
}
前面都是大同小异,我们主要看doStart方法。拿DirectMessageListenerContainer做例子,SimpleMessageListenerContainer类似。
protected void doStart() {
if (!this.started) {
actualStart();
}
}
protected void actualStart() {
..............
if (queueNames.length > 0) {
doRedeclareElementsIfNecessary();
getTaskExecutor().execute(() -> { // NOSONAR never null here
startConsumers(queueNames);
});
}
else {
this.started = true;
this.startedLatch.countDown();
}
.................
}
看名字就知道开始消费者,直接看startConsumers方法。
private void startConsumers(final String[] queueNames) {
while (!DirectMessageListenerContainer.this.started && isRunning()) {
this.cancellationLock.reset();
try {
for (String queue : queueNames) {
consumeFromQueue(queue);
}
}
................
DirectMessageListenerContainer.this.started = true;
DirectMessageListenerContainer.this.startedLatch.countDown();
}
}
}
}
private void consumeFromQueue(String queue) {
List<SimpleConsumer> list = this.consumersByQueue.get(queue);
// Possible race with setConsumersPerQueue and the task launched by start()
if (CollectionUtils.isEmpty(list)) {
for (int i = 0; i < this.consumersPerQueue; i++) {
doConsumeFromQueue(queue);
}
}
}
进入doConsumeFromQueue方法
private void doConsumeFromQueue(String queue) {
..............
Connection connection = null; // NOSONAR (close)
try {
connection = getConnectionFactory().createConnection();
}
SimpleConsumer consumer = consume(queue, connection);
...........
}
private SimpleConsumer consume(String queue, Connection connection) {
Channel channel = null;
SimpleConsumer consumer = null;
try {
channel = connection.createChannel(isChannelTransacted());
channel.basicQos(getPrefetchCount());
consumer = new SimpleConsumer(connection, channel, queue);
channel.queueDeclarePassive(queue);
consumer.consumerTag = channel.basicConsume(queue, getAcknowledgeMode().isAutoAck(),
(getConsumerTagStrategy() != null
? getConsumerTagStrategy().createConsumerTag(queue) : ""), // NOSONAR never null
isNoLocal(), isExclusive(), getConsumerArguments(), consumer);
}
.......................
return consumer;
}
终于看到点端倪了。
1、首先创建一个channel,用于和rabbitmq进行交互。
2、创建一个SimpleConsumer,用户消费。SimpleConsumer是DirectMessageListenerContainer的内部类,并继承DefaultConsumer。如果是SimpleMessageListenerContainer创建的就是InternalConsumer。
3、声明该channel交互哪个队列channel.queueDeclarePassive(queue)。
public Queue.DeclareOk queueDeclarePassive(String queue)
throws IOException
{
validateQueueNameLength(queue);
return (Queue.DeclareOk)
exnWrappingRpc(new Queue.Declare.Builder()
.queue(queue)
.passive()
.exclusive()
.autoDelete()
.build())
.getMethod();
}
4、建立channel和SimpleConsumer映射关系。在channel.basicConsume方法中,consumer是作为最后一个入参。
public String basicConsume(String queue, final boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
throws IOException
{
final Method m = new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build();
BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) {
@Override
public String transformReply(AMQCommand replyCommand) {
String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
_consumers.put(actualConsumerTag, callback);
..................
}
}
}
可以看到根据消费tag,找到对应的consumer。
这里简单说一下创建connection的步骤,有兴趣的同学可以自己看源码。
1、会通过com.rabbitmq.client.ConnectionFactory类的newConnection方法进行创建。
2、创建FrameHandler对象,主要用于和RabbitMq的Socket连接。
3、创建AMQConnection对象,封装FrameHandler。
4、启动AMQConnection.start方法。会调用SocketFrameHandler.initialize方法,最终会调用AMQConnection.run方法
public void run() {
boolean shouldDoFinalShutdown = true;
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
readFrame(frame);
}
} catch (Throwable ex) {
............
} finally {
if (shouldDoFinalShutdown) {
doFinalShutdown();
}
}
}
其中readFrame就是有消息来了之后会执行。
读取消息:
1、AMQChannel的handleCompleteInboundCommand方法,并执行processAsync,再执行processDelivery。
protected void processDelivery(Command command, Basic.Deliver method) {
Basic.Deliver m = method;
Consumer callback = _consumers.get(m.getConsumerTag());
......................
try {
..................
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
} catch (WorkPoolFullException e) {
// couldn't enqueue in work pool, propagating
throw e;
}
}
2、根据刚才放入的consumer标签,找到消费者Consumer。执行dispatcher.handleDelivery方法
public void handleDelivery(final Consumer delegate,
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
executeUnlessShuttingDown(
new Runnable() {
@Override
public void run() {
try {
delegate.handleDelivery(consumerTag,
envelope,
properties,
body);
} catch (Throwable ex) {
connection.getExceptionHandler().handleConsumerException(
channel,
ex,
delegate,
consumerTag,
"handleDelivery");
}
}
});
}
3、再执行SimpleConsumer.handleDelivery方法。刚才说了SimpleConsumer是DirectMessageListenerContainer内部类,其实也是执行DirectMessageListenerContainer类的handleDelivery方法。
PS:这里说一下
①:如果是 DirectMessageListenerContainer,则通过SimpleConsumer执行callExecuteListener。
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) {
MessageProperties messageProperties =
getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8");
messageProperties.setConsumerTag(consumerTag);
messageProperties.setConsumerQueue(this.queue);
Message message = new Message(body, messageProperties);
long deliveryTag = envelope.getDeliveryTag();
if (this.logger.isDebugEnabled()) {
this.logger.debug(this + " received " + message);
}
updateLastReceive();
if(){
.............
}else {
try {
callExecuteListener(message, deliveryTag);
}
catch (Exception e) {
// NOSONAR
}
}
}
②:如果是SimpleMessageListenerContainer,则InternalConsumer执行handleDelivery,往队列中放入消息。
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) {
try {
if (BlockingQueueConsumer.this.abortStarted > 0) {
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queueName),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
..................
}
}
else {
//网队列中放入消息
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
}
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
}
}
之后会在 SimpleMessageListenerContainer的run方法中执行下面代码取出。
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
4、执行callExecuteListener方法。一路跟进后,会进入到AbstractMessageListenerContainer的actualInvokeListener方法。并最后进入到doInvokeListener方法。
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Object data) {
...........................
// Actually invoke the message listener...
try {
if (data instanceof List) {
listener.onMessageBatch((List<Message>) data, channelToUse);
}
else {
message = (Message) data;
listener.onMessage(message, channelToUse);
}
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, data);
}
}
finally {
cleanUpAfterInvoke(resourceHolder, channelToUse, boundHere);
}
}
5、找到之前创建的实现类MessagingMessageListenerAdapter。
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR
Message<?> message = toMessagingMessage(amqpMessage);
invokeHandlerAndProcessResult(amqpMessage, channel, message);
}
protected void invokeHandlerAndProcessResult(@Nullable org.springframework.amqp.core.Message amqpMessage,
Channel channel, Message<?> message) throws Exception { // NOSONAR
InvocationResult result = null;
try {
if (this.messagingMessageConverter.method == null) {
amqpMessage.getMessageProperties()
.setTargetMethod(this.handlerAdapter.getMethodFor(message.getPayload()));
}
result = invokeHandler(amqpMessage, channel, message);
}
catch (ListenerExecutionFailedException e) {
}
}
6、invokeHandler方法就是之前创建的handlerAdapter,就能执行对应Bean的方法了。
public InvocationResult invoke(Message<?> message, Object... providedArgs) throws Exception { // NOSONAR
if (this.invokerHandlerMethod != null) {
return new InvocationResult(this.invokerHandlerMethod.invoke(message, providedArgs),
null, this.invokerHandlerMethod.getMethod().getGenericReturnType(),
this.invokerHandlerMethod.getBean(),
this.invokerHandlerMethod.getMethod());
}
else if (this.delegatingHandler.hasDefaultHandler()) {
// Needed to avoid returning raw Message which matches Object
Object[] args = new Object[providedArgs.length + 1];
args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);
}
else {
return this.delegatingHandler.invoke(message, providedArgs);
}
}
整体总结:
创建接收链路:
SimpleMessageListenerContainer和DirectMessageListenerContainer是AbstractMessageListenerContainer子类
1、通过SimpleMessageListenerContainer或DirectMessageListenerContainer,找到MessagingMessageListenerAdapter
2、通过MessagingMessageListenerAdapter找到对应的HandlerAdapter
3、通过HandlerAdapter找到对应的Bean和方法
读取接收:
1、会通过com.rabbitmq.client.ConnectionFactory类的newConnection方法进行创建。
2、创建FrameHandler对象,主要用于和RabbitMq的Socket连接。
3、创建AMQConnection对象,封装FrameHandler。
4、启动AMQConnection.start方法。会调用SocketFrameHandler.initialize方法,最终会调用AMQConnection.run准备进行接收
5、AMQChannel的handleCompleteInboundCommand方法,并执行processAsync,再执行processDelivery。
6、根据刚才放入的consumer标签,找到消费者Consumer。执行dispatcher.handleDelivery方法。
7、再执行SimpleConsumer.handleDelivery方法。刚才说了SimpleConsumer是DirectMessageListenerContainer内部类,其实也是执行DirectMessageListenerContainer类的handleDelivery方法。
8、执行callExecuteListener方法。一路跟进后,会进入到AbstractMessageListenerContainer的actualInvokeListener方法。并最后进入到doInvokeListener方法
9、找到之前创建的实现类MessagingMessageListenerAdapter。
其实读者只需要记住前面标蓝的重要三部分内容就行了,后续创建并接收的可以忽略。知道怎么建立对应关系即可。