Dubbo源码解析四:服务提供方接收请求及返回结果

在这里插入图片描述

介绍

上一篇文章说到我们启动了一个NettyServer,写过Netty程序的小伙伴都知道我们是通过实现
ChannelHandler接口来处理业务逻辑的,然后将ChannelHandler增加到ChannelPipeline上,一个请求被ChannelPipeline上的ChannelHandler依次处理,典型的责任链模式

在这里插入图片描述

但是在上一节NettyServer启动的时候,我们看到了ChannelPipeline上只加入了一个ChannelHandler即NettyServerHandler(实现了io.netty.channel.ChannelHandler接口),其实这个NettyServerHandler什么都没做,只是将请求转到Dubbo的ChannelHandler(实现了org.apache.dubbo.remoting.ChannelHandler)上。

注意,Netty和Dubbo中都定义了ChannelHandler接口。Netty中的ChannelHandler执行用的是责任链模式,而Dubbo中的ChannelHandler执行用的是装饰者模式。而Dubbo只所以要重新定义一个ChannelHandler接口,主要是为了不和具体的通信层框架耦合,毕竟网路通信框架不只有Netty

所以真正的请求执行会经过如下几个ChannelHandler,其中只有NettyServerHandler实现的是Netty框架中的ChannelHandler接口,其余实现的都是Dubbo中的ChannelHandler接口
在这里插入图片描述

先大概说一下这些ChannelHandler的作用

ChannelHandler 作用
NettyServerHandler 处理Netty服务端事件,如连接,断开,读取,写入,发生异常
MultiMessageHandler 多消息报文批处理
HeartbeatHandler 心跳处理
AllChannelHandler 将Netty的所有请求放到业务线程池中去处理
DecodeHandler 对消息进行解码
HeaderExchangeHandler 封装处理 Request/Reponse,和 telnet请求
ExchangeHandlerAdapter 查找服务方法并调用

接收请求

Dubbo服务导出过程中会启动NettyServer,即执行NettyServer#doOpen方法

protected void doOpen() throws Throwable {
    
    
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
    
    
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
    
    
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder()) // 解码器handler
                            .addLast("encoder", adapter.getEncoder()) // 编码器handler
                            // 心跳检查handler
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

从中可以看到NettyServerHandler为处理业务逻辑的Handler,当接收到消息时会激活NettyServerHandler#channelRead方法

// NettyServerHandler.java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
    
    
        handler.received(channel, msg);
    } finally {
    
    
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

然后依次执行如下Handler的received方法
在这里插入图片描述
MultiMessageHandler和HeartbeatHandler和主流程关系不大,就不仔细分析了,直接到AllChannelHandler

接着到了AllChannelHandler,将请求放到业务线程池中去执行(通过Dubbo Spi可以配置多种不同的实现,后面一篇文章会详细分析线程模型和线程池策略)

// AllChannelHandler.java
public void received(Channel channel, Object message) throws RemotingException {
    
    
    ExecutorService cexecutor = getExecutorService();
    try {
    
    
        // 将请求和响应消息派发到线程池中处理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
    
    
    }
}

接着到了DecodeHandler,将消息解码,因为服务提供方和服务消费方默认都用到了AllChannelHandler,所以消息类型可能为Request,也可能为Response

// DecodeHandler
public void received(Channel channel, Object message) throws RemotingException {
    
    
    if (message instanceof Decodeable) {
    
    
        // 对 Decodeable 接口实现类对象进行解码
        decode(message);
    }

    if (message instanceof Request) {
    
    
        // 对 Request 的 data 字段进行解码
        decode(((Request) message).getData());
    }

    if (message instanceof Response) {
    
    
        // 对 Response 的 result 字段进行解码
        decode(((Response) message).getResult());
    }

    // 解码完毕后的下一站为 HeaderExchangeHandler
    handler.received(channel, message);
}

下一站是HeaderExchangeHandler
这一站代码有点多,主要就是封装处理Request/Reponse,请求响应就是在这个Handler中实现的

如果请求不需要响应会调用ExchangeHandlerAdapter#received(DubboProtocol的匿名内部类)方法
如果请求需要响应调会调用ExchangeHandlerAdapter#reply

终于到了终点站ExchangeHandlerAdapter了

// 在DubboProtocol.java中
// ExchangeHandlerAdapter.java
 @Override
 public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    
    

     if (!(message instanceof Invocation)) {
    
    
         throw new RemotingException(channel, "Unsupported request: "
                 + (message == null ? null : (message.getClass().getName() + ": " + message))
                 + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
     }

     Invocation inv = (Invocation) message;
     // 获取 Invoker 实例
     // 服务导出的时候在 exporterMap 中保存了 serviceKey -> Exporter 的映射关系
     // 这里根据inv得到serviceKey得到 Exporter,再得到 Invoker
     Invoker<?> invoker = getInvoker(channel, inv);
     // need to consider backward-compatibility if it's a callback

	 // 省略回调相关的代码
	
     RpcContext rpcContext = RpcContext.getContext();
     // 用ThreadLocal来保存上下文信息
     rpcContext.setRemoteAddress(channel.getRemoteAddress());
     // 通过 Invoker 调用具体的服务
     // 这里是 AbstractProxyInvoker
     Result result = invoker.invoke(inv);

     // 异步执行
     if (result instanceof AsyncRpcResult) {
    
    
         // thenApply相当于Stream中的map,对元素进行转换
         return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);

     } else {
    
    
         // 同步执行,直接设置结果返回
         return CompletableFuture.completedFuture(result);
     }
 }

主要就是根据Invocation对象(封装了请求的方法名,参数类型,参数)来找到对应的Invoker,然后调用调用Invoker#invoke方法

服务导出的时候已经把这种映射关系存在如下Map中了哈

public abstract class AbstractProtocol implements Protocol {
    
    
    protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
}

从服务导出的时候知道这个Invoker最原始为AbstractProxyInvoker,然后被各种装饰类装饰,典型的装饰者模式

当调用如下方法(调用本地方法获取结果)

// DubboProtocol
Result result = invoker.invoke(inv);

在这里插入图片描述
ProtocolFilterWrapper$1是匿名内部类哈,然后每经过一次匿名内部类调用一次Filter接口的实现,最终调用到AbstractProxyInvoker#doInvoker方法,服务导出的时候已经说了这个Invoker是JavassistProxyFactory创建的。拦截器这一部分我后面会详细讲讲

public class JavassistProxyFactory extends AbstractProxyFactory {
    
    

    /**
     * 针对provider端,将服务对象包装成一个Invoker对象
     */
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    
    
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 重写类AbstractProxyInvoker类的doInvoke方法
        return new AbstractProxyInvoker<T>(proxy, type, url) {
    
    
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
    
    
                // 这个方法里面调用执行本地方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

Wrapper根据方法名,参数类型,参数,找到对应的服务的实现方法执行即可。

这个Wrapper其实是对Service接口实现类的封装,避免通过反射调用Service接口的实现类,提高性能。

当执行AbstractProxyInvoker#doInvoke方法时,会调用请求的方法,并返回结果

最后总结一下,这个请求的处理过程为

  1. NettyServerHandler#channelRead
  2. NettyServer(AbstractPeer#received)
  3. MultiMessageHandler#received
  4. HeartbeatHandler#received
  5. AllChannelHandler#received(这里默认是AllChannelHandler,可以通过SPI来确定线程模型和线程池策略)
  6. DecodeHandler#received
  7. HeaderExchangeHandler#received
  8. ExchangeHandlerAdapter#reply

当HeaderExchangeHandler收到返回值后会调用channel.send(res)方法

所以返回结果的时候会先调用NettyServerHandler#write方法

返回响应

整个调用链路如下,我就不追了,你们追一下就能看懂了

  1. NettyServerHandler#write
  2. NettyServer(AbstractPeer#sent)
  3. MultiMessageHandler#sent
  4. HeartbeatHandler#sent
  5. AllChannelHandler#sent
  6. DecodeHandler#sent
  7. HeaderExchangeHandler#sent
  8. ExchangeHandlerAdapter#sent(DubboProtocol中的匿名内部类,是个空实现)

欢迎关注

在这里插入图片描述

参考博客

客户端超时或者服务端超时
[1]https://juejin.im/post/6844903857416323079

猜你喜欢

转载自blog.csdn.net/zzti_erlie/article/details/108189766