本文主要介绍consumer和provider在具体服务调用的时候是如何进行交互的。首先我们要清楚在consumer启动后对于服务接口生成了怎样的接口代理类和provider端对于服务的实现类有生成了怎么样代理类供Invoker使用。
下面以Dubbo提供的demo来讲解。
先来回顾一下
1.provider端
1.1.provider为各个服务实现类(如 DemoServiceImpl)都怎么生成装饰类的?
首先我们知道provider端发布服务前会生成基于代理的Invoker,请看ServiceConfig.doExportUrlsFor1Protocol()方法处的
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
此处会调用
proxyFactory(JavassistProxyFactory) 来生成Invoker,请看 JavassistProxyFacotory是如何生成Invoker的:
public class JavassistProxyFactory extends AbstractProxyFactory { public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
我们再来分析下关键的 Wrapper.getWrapper(class)到底做了什么?
public static Wrapper getWrapper(Class<?> c) { while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class. c = c.getSuperclass(); if (c == Object.class) return OBJECT_WRAPPER; Wrapper ret = WRAPPER_MAP.get(c); if (ret == null) { ret = makeWrapper(c); WRAPPER_MAP.put(c, ret); } return ret; } private static Wrapper makeWrapper(Class<?> c) { //此处省略,主要就是根据class内的特性生成一个Wrapper类(即包装类) }
其实Wrapper.
makeWrapper()就是为具体的接口生成一个代理类,如DemoServiceImpl,最后生成的Wrapper类为:
com.alibaba.dubbo.common.bytecode.Wrapper1 extends Wrapper implements com.alibaba.dubbo.common.bytecode.ClassGenerator.DC { public static String[] pns; // property name array. public static java.util.Map pts; // property type map. public static String[] mns; // all method name array. 这里等于 [sayHello] public static String[] dmns; // declared method name array. 这里等于 [sayHello] public static Class[] mts0;// 这里是[class java.lang.String] public Wrapper1(){} public String[] getPropertyNames(){ return pns; } public boolean hasProperty(String n){ return pts.containsKey($1); } public Class getPropertyType(String n){ return (Class)pts.get($1); } public String[] getMethodNames(){ return mns; } public String[] getDeclaredMethodNames(){ return dmns; } public void setPropertyValue(Object o, String n, Object v){ com.alibaba.dubbo.demo.provider.DemoServiceImpl w; try{ w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl)$1); }catch(Throwable e){ throw new IllegalArgumentException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \""+$2+"\" filed or setter method in class com.alibaba.dubbo.demo.provider.DemoServiceImpl."); } /** * @param o 服务实例 * @param n 要调用的方法名字符串 * @param p 方法调用入参的参数类型列表 * @param v 调用的方法的入参值列表 * */ public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException{ com.alibaba.dubbo.demo.provider.DemoServiceImpl w; try{ w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl)$1); }catch(Throwable e){ throw new IllegalArgumentException(e); } try{ if( "sayHello".equals( $2 ) && $3.length == 1 ) { return ($w)w.sayHello((java.lang.String)$4[0]); } } catch(Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl."); } public Object getPropertyValue(Object o, String n){ com.alibaba.dubbo.demo.provider.DemoServiceImpl w; try{ w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl)$1); }catch(Throwable e){ throw new IllegalArgumentException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \""+$2+"\" filed or setter method in class com.alibaba.dubbo.demo.provider.DemoServiceImpl."); } }
重点请看 Wrapper1的 invokeMethod方法,他接受一个关键入参Object o,这个也就是具体服务接口的实现类实例,因此,invokerMethod最后调用的时候还是使用的接口实现类来执行。
那为什么要这么大费周折构建这样一个类?原因很简单,因为起初我们需要构建的Invoker,
doInvoke(T proxy, String methodName,Class<?>[] parameterTypes, Object[] arguments)方法接受参数决定我们必须生成这样的一个wrapper类来中转一次。
1.2.provider接收消费端数据处理过程
首先provider端netty会捕获消费端请求,并根据时间类型进行处理,对请求的数据进行解码,处理过程如下(自下而上的过程):
这些步骤只是捕获请求并做解码工作,我们可以主要看一下DubboCodec.decodeBody方法,这里面会重新构建请求的Request (包含消费端传来的reqId,data数据)
构建完成后一次返回,在将请求数据返回到NettyCodecAdapter$InternalDecoder.messageReceived的时候,将会开始调用NettyHandler来处理此次调用,
可见方法内代码:
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
这里有一个操作:设置上行sendUpstream(),然后会使用nettyHandler处理这个上行流:nettyHandler.handleUpstream();通过事件类型判断(本次调用为 MessageEvent)
调用nettyHandler.messageReceived();这里开始就使用起初创建NettyHandler时传入的handler链来依次执行:
(nettyServer.received() -> MultiMessageHandler.received() -> HeartbeatHandler.received() -> AllChannelHandler.received() -> DecodeHandler.received() -> headerExchangeHandler.received() -> DubboProtocol.requestHandler)
2.consumer端
2.1consumer为每个服务接口生成了怎样的proxy类?
我们知道consumer端为接口生成代理类在ReferenceConfig.createProxy(Map<String, String> map) 处最后一句
proxyFactory.getProxy(invoker);
现在来看看proxyFactory是如何生成具体对象的代理类的:(注:这里使用的是默认的 JavassistProxyFactory),现在来看看
public class JavassistProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } abstract public Object newInstance(InvocationHandler handler); }
初步分析这里首先会使用
com.alibaba.dubbo.common.bytecode.Proxy.getProxy(interface )
先来生成获取到当前Proxy类的一个实现类(因为newInstance 为抽象方法,因此首先需要构建一个Proxy的实现类),接着通过调用Proxy实现类的
newInstance(InvokerInvocationHandler h) 来获取具体服务接口的代理类,那么
Proxy.getProxy(class)
又是怎么构建他它自身的实现子类的呢?
看下面的代码:
public static Proxy getProxy(Class<?>... ics) { return getProxy(ClassHelper.getClassLoader(Proxy.class), ics); } public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { //根据需要首先生成ics的代理类Class,接着生成本类Proxy的一个实现子类(实现了抽象方法 newInstance()), //此处代码逻辑省略 }
经过分析(请自行阅读源代码)得知,getProxy方法里会生成两个类:
一个是抽象类 Proxy的实现类:(newInstance 在Proxy类中是一个抽象方法)
public com.alibaba.dubbo.common.bytecode.Proxy0 extends com.alibaba.dubbo.common.bytecode.Proxy { public Proxy0(){} public Object newInstance(InvocationHandler h){ return new com.alibaba.dubbo.common.bytecode.proxy0 ($1); //$1就是方法第一个入参 h } }
另一个是DemoService接口的代理类:
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.demo.DemoService; import com.alibaba.dubbo.rpc.service.EchoService; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; // 见com.alibaba.dubbo.common.bytecode.Proxy.getProxy public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { // methods包含proxy0实现的所有接口方法(去重) public static Method[] methods; private InvocationHandler handler; public String sayHello(String arg0) { Object[] args = new Object[1]; args[0] = arg0; Object localObject = this.handler.invoke(this, methods[0], args); return (String)localObject; } public Object $echo(Object arg0) { Object[] args = new Object[1]; args[0] = arg0; Object localObject = this.handler.invoke(this, methods[1], args); return (Object)localObject; } public proxy0() { } public proxy0(InvocationHandler paramInvocationHandler) { this.handler = paramInvocationHandler; } }
有上面生成的代理类代码可知,
Proxy.getProxy(interfaces)首先返回了 Proxy0实例,接着调用 Proxy0的 newInstance() 方法,该方法会新建一个 接口服务的实现代理类
new
com.alibaba.dubbo.common.bytecode.proxy0
(InvocationHandler h);所以,
JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces) 实际上返回的是
com.alibaba.dubbo.common.bytecode.proxy0
的实例,即DemoServie接口的代理类。
2.2 consumer端在生成服务接口代理类时传入的
InvocationHandler
是怎样的结构?
先来回顾一下consumer端在创建服务的Invoker的时候最后生成的结构是怎样的:根据文章
6
Consumer如何通过引用代理调用具体服务方法
可知
Invoker调用流程:
MockClusterInvoker
(directory
)
.
invoke
(
Invocation invocation
)
【AvailableCluster内】
判断是否配置了mock,有则根据mock策略进行处理
-> AbstractClusterInvoker
(StaticDirectory
)
.
invoke
(
Invocation invocation
)
[
AvailableCluster内
]
>>> StaticDirectory(URL url, List<Invoker<T>> invokers)
获取可用的Invoker列表 Invokers(StaticDirectory
内获取,类型为MockClusterInvoker),负载策略LoadBalance
-> AbstractClusterInvoker
(StaticDirectory
)
.
doInvoke
(
invocation, invokers, loadbalance
)
[
AvailableCluster内
]
迭代Invokers,获取轮询获取一个可用的(available
= ture)的invoker来执行
-> MockClusterInvoker
(directory
)
.
invoke
(
Invocation invocation
)
【FailoverClusterInvoker内】
-> AbstractClusterInvoker
(RegistryDirectory)
.
invoke
(
Invocation invocation
)
【FailoverClusterInvoker内】
>>> RegistryDirectory 的invokers类型为RegistryDirectory$InvokerDelegete
-> FailoverClusterInvoker()
.
doInvoke
(
invocation, invokers, loadbalance
)
【FailoverClusterInvoker内】
->RegistryDirectory$InvokerDelegete
(Invoker<T> invoker, URL url, URL providerUrl
)
.
invoke
(
Invocation invocation
)
【RegistryDirectory$
InvokerDelegete
内,实际是 在
InvokerDelegete
的父类 InvokerWrapper内】 >>> InvokerDelegete
内的Invoker类型为为 ListenerInvokerWrapper,是protocolListenerWrapper创建的
->
ListenerInvokerWrapper
()
.
invoke
(
Invocation invocation
)
-> ProtocolFilterWrapper$1().invoke(Invocation invocation)
[这里会依次调用consumer的Filter链进行各步校验后才进入到真正的Invoker执行(ConsumerCOntextFilter -> MonitorFilter -> FUtureFilter)]
->DobboInvoker().invoke(Invocation inv)
[父类
AbstractInvoker
内]
->DobboInvoker.doInvoke() 这里会设置单双工传输,同步异步,获取一个ExchangeClient,开始进入client发送数据流环节
->
ReferenceCountExchangeClient
.request(Object request, int timeout
) request 是要发送的数据【RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, timeout=600000, version=0.0.0}]
】
-> HeaderExchangeClient.request(Object request, int timeout) 这里开始从ExchangeClient 转到channel
-> HeaderExchangeChannel.request(Object request, int timeout) 这里开始构建请求的 Request对象,设置双工模式,设置请求数据【Request [id=112, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, timeout=600000, version=0.0.0}]]】,构建DefaultFuture(保存了Request对象信息,以备后续根据reqId 匹配调用返回的结果) ,同时这里将开始转到 NettyClient来处理
-> NettyClient(AbstractPeer).send(Request)
-> NettyClient(AbstractClient).send(Request,boolean send) 这里开始使用NettyChannel来处理请求数据
-> NettyChannel().send(Object message, boolean sent)
-> NioClientSocketChannel.write(message) 这里会返回一个 org.jboss.netty.channel.ChannelFuture
-> org.jboss.netty.channel.Channels.write(message)
-> ChannelPipeline(使用DefaultChannelPipeline).sendDownstream(DownstreamMessageEvent) 这里会获取NettyHandler来处理请求数据
-> NettyHandler.handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) 这里的服务调用 ChannelEvent 为 MessageEvent
-> NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e) 这里首先会调用super.writeRequested super为SimpleChannelHandler;接着使用NettyHandler内的Handler链处理
-> NettyCodecAdapter$InternalEncoder(OneToOneEncoder).handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
->NettyCodecAdapter$InternalEncoder.encode()
-> DubboCountCodec.encode()
->DubboCodec(ExchangeCodec).encode(Channel channel, ChannelBuffer buffer, Object msg)
->DubboCodec(ExchangeCodec).encodeRequest(Channel channel, ChannelBuffer buffer, Object msg)
->DubboCodec.encodeRequestData() 到这一步 NettyHandler super.writeRequested 完成
->NettyClient(AbstractPeer).sent(Channel ch, Object msg) //这里开始使用Handler链处理
-> MultiMessageHandler(AbstractChannelHandlerDelegate)
-> HeartbeatHandler.sent()
-> AllChannelHandler(WrappedChannelHandler).sent()
-> DecodeHandler(AbstractChannelHandlerDelegate).sent()
->HeaderExchangeHandler().sent()
-> DubboProtocol$1 这里就最后就转到了 DubboProtocol起初定义的 requestHandler.send 方法,这里是个空处理
-> DefaultFuture.doSent() 发送请求时间