介绍
上一篇文章说到我们启动了一个NettyServer,写过Netty程序的小伙伴都知道我们是通过实现
ChannelHandler接口来处理业务逻辑的,然后将ChannelHandler增加到ChannelPipeline上,一个请求被ChannelPipeline上的ChannelHandler依次处理,典型的责任链模式
但是在上一节NettyServer启动的时候,我们看到了ChannelPipeline上只加入了一个ChannelHandler即NettyServerHandler(实现了io.netty.channel.ChannelHandler接口),其实这个NettyServerHandler什么都没做,只是将请求转到Dubbo的ChannelHandler(实现了org.apache.dubbo.remoting.ChannelHandler)上。
注意,Netty和Dubbo中都定义了ChannelHandler接口。Netty中的ChannelHandler执行用的是责任链模式,而Dubbo中的ChannelHandler执行用的是装饰者模式。而Dubbo只所以要重新定义一个ChannelHandler接口,主要是为了不和具体的通信层框架耦合,毕竟网路通信框架不只有Netty
所以真正的请求执行会经过如下几个ChannelHandler,其中只有NettyServerHandler实现的是Netty框架中的ChannelHandler接口,其余实现的都是Dubbo中的ChannelHandler接口
先大概说一下这些ChannelHandler的作用
ChannelHandler | 作用 |
---|---|
NettyServerHandler | 处理Netty服务端事件,如连接,断开,读取,写入,发生异常 |
MultiMessageHandler | 多消息报文批处理 |
HeartbeatHandler | 心跳处理 |
AllChannelHandler | 将Netty的所有请求放到业务线程池中去处理 |
DecodeHandler | 对消息进行解码 |
HeaderExchangeHandler | 封装处理 Request/Reponse,和 telnet请求 |
ExchangeHandlerAdapter | 查找服务方法并调用 |
接收请求
Dubbo服务导出过程中会启动NettyServer,即执行NettyServer#doOpen方法
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码器handler
.addLast("encoder", adapter.getEncoder()) // 编码器handler
// 心跳检查handler
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
从中可以看到NettyServerHandler为处理业务逻辑的Handler,当接收到消息时会激活NettyServerHandler#channelRead方法
// NettyServerHandler.java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
然后依次执行如下Handler的received方法
MultiMessageHandler和HeartbeatHandler和主流程关系不大,就不仔细分析了,直接到AllChannelHandler
接着到了AllChannelHandler,将请求放到业务线程池中去执行(通过Dubbo Spi可以配置多种不同的实现,后面一篇文章会详细分析线程模型和线程池策略)
// AllChannelHandler.java
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
// 将请求和响应消息派发到线程池中处理
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
}
}
接着到了DecodeHandler,将消息解码,因为服务提供方和服务消费方默认都用到了AllChannelHandler,所以消息类型可能为Request,也可能为Response
// DecodeHandler
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
// 对 Decodeable 接口实现类对象进行解码
decode(message);
}
if (message instanceof Request) {
// 对 Request 的 data 字段进行解码
decode(((Request) message).getData());
}
if (message instanceof Response) {
// 对 Response 的 result 字段进行解码
decode(((Response) message).getResult());
}
// 解码完毕后的下一站为 HeaderExchangeHandler
handler.received(channel, message);
}
下一站是HeaderExchangeHandler
这一站代码有点多,主要就是封装处理Request/Reponse,请求响应就是在这个Handler中实现的
如果请求不需要响应会调用ExchangeHandlerAdapter#received(DubboProtocol的匿名内部类)方法
如果请求需要响应调会调用ExchangeHandlerAdapter#reply
终于到了终点站ExchangeHandlerAdapter了
// 在DubboProtocol.java中
// ExchangeHandlerAdapter.java
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 获取 Invoker 实例
// 服务导出的时候在 exporterMap 中保存了 serviceKey -> Exporter 的映射关系
// 这里根据inv得到serviceKey得到 Exporter,再得到 Invoker
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
// 省略回调相关的代码
RpcContext rpcContext = RpcContext.getContext();
// 用ThreadLocal来保存上下文信息
rpcContext.setRemoteAddress(channel.getRemoteAddress());
// 通过 Invoker 调用具体的服务
// 这里是 AbstractProxyInvoker
Result result = invoker.invoke(inv);
// 异步执行
if (result instanceof AsyncRpcResult) {
// thenApply相当于Stream中的map,对元素进行转换
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
// 同步执行,直接设置结果返回
return CompletableFuture.completedFuture(result);
}
}
主要就是根据Invocation对象(封装了请求的方法名,参数类型,参数)来找到对应的Invoker,然后调用调用Invoker#invoke方法
服务导出的时候已经把这种映射关系存在如下Map中了哈
public abstract class AbstractProtocol implements Protocol {
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
}
从服务导出的时候知道这个Invoker最原始为AbstractProxyInvoker,然后被各种装饰类装饰,典型的装饰者模式
当调用如下方法(调用本地方法获取结果)
// DubboProtocol
Result result = invoker.invoke(inv);
ProtocolFilterWrapper$1是匿名内部类哈,然后每经过一次匿名内部类调用一次Filter接口的实现,最终调用到AbstractProxyInvoker#doInvoker方法,服务导出的时候已经说了这个Invoker是JavassistProxyFactory创建的。拦截器这一部分我后面会详细讲讲
public class JavassistProxyFactory extends AbstractProxyFactory {
/**
* 针对provider端,将服务对象包装成一个Invoker对象
*/
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 重写类AbstractProxyInvoker类的doInvoke方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 这个方法里面调用执行本地方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
Wrapper根据方法名,参数类型,参数,找到对应的服务的实现方法执行即可。
这个Wrapper其实是对Service接口实现类的封装,避免通过反射调用Service接口的实现类,提高性能。
当执行AbstractProxyInvoker#doInvoke方法时,会调用请求的方法,并返回结果
最后总结一下,这个请求的处理过程为
- NettyServerHandler#channelRead
- NettyServer(AbstractPeer#received)
- MultiMessageHandler#received
- HeartbeatHandler#received
- AllChannelHandler#received(这里默认是AllChannelHandler,可以通过SPI来确定线程模型和线程池策略)
- DecodeHandler#received
- HeaderExchangeHandler#received
- ExchangeHandlerAdapter#reply
当HeaderExchangeHandler收到返回值后会调用channel.send(res)方法
所以返回结果的时候会先调用NettyServerHandler#write方法
返回响应
整个调用链路如下,我就不追了,你们追一下就能看懂了
- NettyServerHandler#write
- NettyServer(AbstractPeer#sent)
- MultiMessageHandler#sent
- HeartbeatHandler#sent
- AllChannelHandler#sent
- DecodeHandler#sent
- HeaderExchangeHandler#sent
- ExchangeHandlerAdapter#sent(DubboProtocol中的匿名内部类,是个空实现)
欢迎关注
参考博客
客户端超时或者服务端超时
[1]https://juejin.im/post/6844903857416323079