dubbo源码分析-消费端启动初始化过程-笔记

消费端的代码解析是从下面这段代码开始的

  • <dubbo:reference id="xxxService" interface="xxx.xxx.Service"/>
  • ReferenceBean(afterPropertiesSet) ->getObject() ->get()->init()->createProxy  最终会获得一个代理对象。

createProxy第375行

  • 前面很多代码都是初始化的动作,需要仔细分析的代码代码从createProxy第375行开始
List<URL> us = loadRegistries(false); //从注册中心上获得相应的协议url地址
if (us != null && us.size() > 0) {
       for (URL u : us) {
           URL monitorUrl = loadMonitor(u); 
           if (monitorUrl != null) {
               map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
           }
           urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
       }
}
if (urls == null || urls.size() == 0) {
       throw new IllegalStateException("No such any registry to reference " + interfaceName  + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
   }
if (urls.size() == 1) {
    invoker = refprotocol.refer(interfaceClass, urls.get(0)); //获得invoker代理对象
} else {
    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    URL registryURL = null;
    for (URL url : urls) {
        invokers.add(refprotocol.refer(interfaceClass, url));
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            registryURL = url; // 用了最后一个registry url
        }
    }
    if (registryURL != null) { // 有 注册中心协议的URL
        // 对有注册中心的Cluster 只用 AvailableCluster
        URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 
        invoker = cluster.join(new StaticDirectory(u, invokers));
    }  else { // 不是 注册中心的URL
        invoker = cluster.join(new StaticDirectory(invokers));
    }
}

refprotocol.refer

  • refprotocol这个对象,定义的代码如下,是一个自适应扩展点,得到的是Protocol$Adaptive。
  • Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
  • 直接找到Protocol$Adaptive代码中的refer代码块如下
  • 这段代码中,根据当前的协议url,得到一个指定的扩展点,传递进来的参数中,协议地址为registry://,
  • 所以,我们可以直接定位到RegistryProtocol.refer代码
 public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

RegistryProtocol.refer

  • 这个方法里面的代码,基本上都能看懂
  1. 根据根据url获得注册中心,这个registry是zookeeperRegistry
  2. 调用doRefer,按方法,传递了几个参数, 其中有一个culster参数,这个需要注意下
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
       url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
       Registry registry = registryFactory.getRegistry(url);
       if (RegistryService.class.equals(type)) {
           return proxyFactory.getInvoker((T) registry, type, url);
       }
       // group="a,b" or group="*"
       Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
       String group = qs.get(Constants.GROUP_KEY);
       if (group != null && group.length() > 0 ) {
           if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                   || "*".equals( group ) ) {
               return doRefer( getMergeableCluster(), registry, type, url );
           }
       }
       return doRefer(cluster, registry, type, url);
   }

cluster

  • doRefer方法中有一个参数是cluster,我们找到它的定义代码如下,
  • 又是一个自动注入的扩展点。
  • setter 方法在injectExtention 中注入
private Cluster cluster;

public void setCluster(Cluster cluster) {
    this.cluster = cluster;
}
  • 从下面的代码可以看出,这个不仅仅是一个扩展点,而且方法层面上,还有一个@Adaptive,表示会动态生成一个自适应适配器

Cluster$Adaptive

@SPI(FailoverCluster.NAME)
public interface Cluster {

    /**
     * Merge the directory invokers to a virtual invoker.
     * 
     * @param <T>
     * @param directory
     * @return cluster invoker
     * @throws RpcException
     */
    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

}

Cluster$Adaptive

  • refprotocol这个对象,定义的代码如下,是一个自适应扩展点,得到的是Protocol$Adaptive。
  • Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
  • 直接找到Protocol$Adaptive代码中的refer代码块如下
  • 这段代码中,根据当前的协议url,得到一个指定的扩展点,传递进来的参数中,协议地址为registry://,
  • 所以,我们可以直接定位到RegistryProtocol.refer代码
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

RegistryProtocol.doRefer

这段代码中,有一个RegistryDirectory,可能看不懂,我们暂时先忽略,等会单独讲.(基于注册中心动态发现服务提供者)

  1. 将consumer://协议地址注册到注册中心
  2. 订阅zookeeper地址的变化
  3. 调用cluster.join()方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}

cluster.join

  • 由前面的Cluster$Adaptive这个类中的join方法的分析,得知cluster.join会调用MockClusterWrapper.join方法,
  • 然后再调用FailoverCluster.join方法。
  • 这个意思很明显了。也就是我们上节课讲过的mock容错机制,如果出现异常情况,会调用MockClusterInvoker,否则,调用FailoverClusterInvoker.

public class MockClusterWrapper implements Cluster {

   private Cluster cluster;

   public MockClusterWrapper(Cluster cluster) {
      this.cluster = cluster;
   }

   public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
      return new MockClusterInvoker<T>(directory,
            this.cluster.join(directory));
   }
}

小结

  • refprotocol.ref,这个方法,会返回一个MockClusterInvoker(FailoverClusterInvoker)。
  • 这里面一定还有疑问,我们先把主线走完,再回过头看看什么是cluster、什么是directory

proxyFactory.getProxy(invoker);

  • 再回到ReferenceConfig这个类,在createProxy方法的最后一行,调用proxyFactory.getProxy(invoker). 把前面生成的invoker对象作为参数,再通过proxyFactory工厂去获得一个代理对象。接下来我们分析下这段代码做了什么。
  • 其实前面在分析服务发布的时候,基本分析过了,所以再看这段代码,应该会很熟悉
  • ProxyFactory, 会生成一个动态的自适应适配器。ProxyFactory$Adaptive,然后调用这个适配器中的getProxy方法,代码如下
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
  • 很显然,又是通过javassist实现的一个动态代理,我们来看看JavassistProxyFactory.getProxy

JavassistProxyFactory.getProxy

  • 通过javasssist动态字节码生成动态代理类,

proxy.getProxy(interfaces)

  • 在Proxy.getProxy这个类的如下代码中添加断点,在debug下可以看到动态字节码如下

public java.lang.String sayHello(java.lang.String arg0){
  Object[] args = new Object[1]; 
  args[0] = ($w)$1; 
  Object ret = handler.invoke(this, methods[0], args); 
return (java.lang.String)ret;
}
  • 上面红色部分代码的handler,就是在JavassistProxyFactory.getProxy中。
  • 传递的new InvokerInvocationHandler(invoker)看到这里,

什么时候建立和服务端的连接

  • 前面我们通过代码分析到了,消费端的初始化过程,但是似乎没有看到客户端和服务端建立NIO连接。
  • 实际上,建立连接的过程在消费端初始化的时候就建立好的,只是前面我们没有分析,
  • 代码在RegistryProtocol.doRefer方法内的directory.subscribe方法中。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    return cluster.join(directory);
}

directory.subscribe

调用链为: RegistryDirectory.subscribe ->FailbackRegistry. subscribe->- AbstractRegistry.subscribe>zookeeperRegistry.doSubscribe

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

FailbackRegistry. subscribe

  • 调用FailbackRegistry.subscribe 进行订阅,这里有一个特殊处理,如果订阅失败,则会添加到定时任务中进行重试
@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 向服务器端发送订阅请求
        doSubscribe(url, listener);

zookeeperRegistry. doSubscribe

调用zookeeperRegistry执行真正的订阅操作,这段代码太长,我就不贴出来了,这里面主要做两个操作

  1. 对providers/routers/configurator三个节点进行创建和监听
  2. 调用notify(url,listener,urls) 将已经可用的列表进行通知

AbstractRegistry.notify

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((urls == null || urls.size() == 0) 
            && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
           String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
           List<URL> categoryList = result.get(category);
           if (categoryList == null) {
              categoryList = new ArrayList<URL>();
              result.put(category, categoryList);
           }
           categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        listener.notify(categoryList);
    }
}

总结

消费端初始化这块就完了,

猜你喜欢

转载自my.oschina.net/u/3847203/blog/2966724