版权声明:本文为博主原创文章,可以转载不可复制。 https://blog.csdn.net/qq_32331073/article/details/83576172
MessageHandler的作用
上一节中我们提到过,ExecutorSubscribableChannel
类持有一个实例handlers
—— MessageHandler集合,是MessageChannel的订阅者,用来作为处理Messages的约定。
MessageHandler是一个接口,它的实现类都必须实现方法 —— handleMessage(Message<?> message)用来处理消息Message —— 从外部接收的或 从应用程序内部传递的。
/**
* Contract for handling a {@link Message}.
*
* @author Mark Fisher
* @author Iwein Fuld
* @since 4.0
*/
public interface MessageHandler {
/**
* Handle the given message.
* @param message the message to be handled
*/
void handleMessage(Message<?> message) throws MessagingException;
}
MessageHandler实现类
MessageHandler 的具体实现类共有 两类九种
,分别用来处理不同类型的Message
- 未实现
org.springframework.context.SmartLifecycle
接口- UserRegistryMessageHandler
- NoOpMessageHandler
- 实现
org.springframework.context.SmartLifecycle
接口- SubProtocolWebSocketHandler
- SimpAnnotationMethodMessageHandler
- WebSocketAnnotationMethodMessageHandler
- NoOpBrokerMessageHandler
- SimpleBrokerMessageHandler
- StompBrokerRelayMessageHandler
- UserDestinationMessageHandler
两类MessageHandler有什么区别?
- 先来看下
org.springframework.context.SmartLifecycle
接口 —— 智能生命周期,是org.springframework.context.Lifecycle
和org.springframework.context.Phased
的扩展。说白了:只要你实现了SmartLifeCycle
接口,你便可以在任何时候判断Bean
所处的生命周期,并可以通过实现指定方法在指定的生命周期去搞事情
。
SmartLifecycle :
public interface SmartLifecycle extends Lifecycle, Phased {
boolean isAutoStartup();
void stop(Runnable callback);
}
- 就拿
AbstractBrokerMessageHandler
来说,它是NoOpBrokerMessageHandler、SimpleBrokerMessageHandler和StompBrokerRelayMessageHandler的父类接口,实现了SmartLifecycle
,在start()、stop()方法中分别实现了对相关MessageChannel
的订阅
和取消订阅
。 - 到这里我们可以明白,不实现
SmartLifecycle
接口的那类MessageHandler就是不需要绑定MessageChannel,相反另一类则是用来在指定生命周期订阅或解订阅MessageChannel。
AbstractBrokerMessageHandler:
public abstract class AbstractBrokerMessageHandler
implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle {
@Override
public boolean isAutoStartup() {
return this.autoStartup;
}
@Override
public int getPhase() {
return Integer.MAX_VALUE;
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
logger.info("Starting...");
this.clientInboundChannel.subscribe(this);
this.brokerChannel.subscribe(this);
if (this.clientInboundChannel instanceof InterceptableChannel) {
((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor);
}
startInternal();
this.running = true;
logger.info("Started.");
}
}
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
logger.info("Stopping...");
stopInternal();
this.clientInboundChannel.unsubscribe(this);
this.brokerChannel.unsubscribe(this);
if (this.clientInboundChannel instanceof InterceptableChannel) {
((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor);
}
this.running = false;
logger.info("Stopped.");
}
}
@Override
public final void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
stop();
callback.run();
}
}
@Override
public final boolean isRunning() {
return this.running;
}
}