首先,看看dubbo消费端的主要调用流程(这里就不画图了),以后补充:
第一步:创建消费端代理:ReferenceConfig.createProxy,这里没什么特别,典型的C/S调用设计,都是通过JAVA动态代理或是Javassist的代理实现,比如mybatis mapper。
思考:假如我想实现API网关的dubbo协议路由怎么实现呢:毫无疑问必须使用ReferenceConfig的get.set以及init方法,下一篇将展开讨论实现细节..
第二步:直接跳到动态代理代码里的InvokerInvocationHandler:
这里忽略invoker的初始化(基本就是提供者注册+消费端订阅后初始化)
思考:假如让你实现一个RPC的客户端同步和异步调用你会怎么实现呢?
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(this.invoker, args); } else if ("toString".equals(methodName) && parameterTypes.length == 0) { return this.invoker.toString(); } else if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return this.invoker.hashCode(); } else { return "equals".equals(methodName) && parameterTypes.length == 1 ? this.invoker.equals(args[0]) : this.invoker.invoke(new RpcInvocation(method, args)).recreate(); } } }
第三步:AbstractClusterInvoker.invoke;
这里主要是加载一个列表:invoker list
public Result invoke(Invocation invocation) throws RpcException { this.checkWheatherDestoried(); //选择invoker List<Invoker<T>> invokers = this.list(invocation); LoadBalance loadbalance; if (invokers != null && invokers.size() > 0) { loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(invocation.getMethodName(), "loadbalance", "random")); } else { loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random"); } RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation); return this.doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = this.directory.list(invocation); return invokers; }
第四步:真正加载invoker的实现:AbstractDirectory
注意这里有个路由选择的逻辑,也就是我们今天关注的重点:
router.route:具体的路由实现逻辑:比如ConditionRouter
思考:假如你的程序中需要实现插件式功能,你如何实现呢:基本思路:配置中心配置+ 配置通知更新+插件根据优先级生效。
public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (this.destroyed) { throw new RpcException("Directory already destroyed .url: " + this.getUrl()); } else { List<Invoker<T>> invokers = this.doList(invocation); List<Router> localRouters = this.routers; if (localRouters != null && localRouters.size() > 0) { Iterator var5 = localRouters.iterator(); while(var5.hasNext()) { Router router = (Router)var5.next(); try { if (router.getUrl() == null || router.getUrl().getParameter("runtime", true)) { invokers = router.route(invokers, this.getConsumerUrl(), invocation); } } catch (Throwable var7) { logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var7.getMessage(), var7); } } } return invokers; } }
第五步:FailoverClusterInvoker.doInvoke:
这里通过负责均衡策略选择一个invoker,然后通过各种过滤器过滤。
最后:通过DubboInvoker发送数据给服务提供端。
了解完调用的主流程,我们基本就梳理清楚路由逻辑的实现在那个环节,以及需要注意设置那些属性,比如runtime.
但是经过我本人测试dubbo的路由功能并不稳定,所有用它做灰度发布不靠谱(但是通过权重设置实现rolling update还是比较靠谱的),不稳定因素有:
1.消费端没有及时订阅新增的服务提供者信息,而我们的需求是需要路由到增的服务提供者,导致ConditionRouter.matchCondition无法匹配成功,然后就不会使用路由规则。
2.路由规则没有及时通知更新到消费端,导致AbstractDirectory.setRouters一直没有添加ConditionRouter。(经过测试这个经常出现,这个问题比较严重,也有可能是我环境的问题)
学习感悟:总的来说通过今天的研究还是学到了一些常用的设计思路,而不仅仅是研究代码和原理。