Dubbo服务目录

什么是服务目录?

引用官网的介绍:

服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。

在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。

如果这样说,服务目录和注册中心的功能不就雷同了吗?确实如此,这里这么说是为了方便大家理解。实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。

Invoker 有什么用呢?看名字就知道了,这是一个具有远程调用功能的对象。讲到这大家应该知道了什么是服务目录了,它可以看做是 Invoker 集合,且这个集合中的元素会随注册中心的变化而进行动态调整。

继承体系 

 服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory,它们均是 AbstractDirectory 的子类。AbstractDirectory 实现了 Directory 接口,这个接口包含了一个重要的方法定义,即 list(Invocation),用于列举 Invoker。

Directory 继承自 Node 接口,Node 这个接口继承者比较多,像 Registry、Monitor、Invoker 等均继承了这个接口。这个接口包含了一个获取配置信息的方法 getUrl,实现该接口的类可以向外提供配置信息。另外,大家注意看 RegistryDirectory 实现了 NotifyListener 接口,当注册中心节点信息发生变化后,RegistryDirectory 可以通过此接口方法得到变更信息,并根据变更信息动态调整内部 Invoker 列表。

AbstractDirectory

AbstractDirectory 封装了 Invoker 列举流程,具体的列举逻辑则由子类实现。

public abstract class AbstractDirectory<T> implements Directory<T> {
    ...

    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }

        return doList(invocation);
    }
    // 由子类实现
    protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

}

StaticDirectory

StaticDirectory 即静态服务目录,顾名思义,它内部存放的 Invoker 是不会变动的。所以,理论上它和不可变 List 的功能很相似。

public class StaticDirectory<T> extends AbstractDirectory<T> {
    private static final Logger logger = LoggerFactory.getLogger(StaticDirectory.class);

    private final List<Invoker<T>> invokers;

    public StaticDirectory(List<Invoker<T>> invokers) {
        this(null, invokers, null);
    }

    public StaticDirectory(List<Invoker<T>> invokers, RouterChain<T> routerChain) {
        this(null, invokers, routerChain);
    }

    public StaticDirectory(URL url, List<Invoker<T>> invokers) {
        this(url, invokers, null);
    }

    public StaticDirectory(URL url, List<Invoker<T>> invokers, RouterChain<T> routerChain) {
        super(url == null && CollectionUtils.isNotEmpty(invokers) ? invokers.get(0).getUrl() : url, routerChain);
        if (CollectionUtils.isEmpty(invokers)) {
            throw new IllegalArgumentException("invokers == null");
        }
        this.invokers = invokers;
    }

    @Override
    public Class<T> getInterface() {
        // 获取接口类
        return invokers.get(0).getInterface();
    }

    // 检测服务目录是否可用
    @Override
    public boolean isAvailable() {
        if (isDestroyed()) {
            return false;
        }
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                return true;
            }
        }
        return false;
    }

    @Override
    public void destroy() {
        if (isDestroyed()) {
            return;
        }
        // 调用父类销毁
        super.destroy();
        for (Invoker<T> invoker : invokers) {
            invoker.destroy();
        }
        invokers.clear();
    }

    public void buildRouterChain() {
        RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
        routerChain.setInvokers(invokers);
        this.setRouterChain(routerChain);
    }

    // 列举invokers
    @Override
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
        List<Invoker<T>> finalInvokers = invokers;
        if (routerChain != null) {
            try {
                // 服务路由
                finalInvokers = routerChain.route(getConsumerUrl(), invocation);
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
        return finalInvokers == null ? Collections.emptyList() : finalInvokers;
    }

}

RegistryDirectory

RegistryDirectory 是一种动态服务目录,实现了 NotifyListener 接口。当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表。

RegistryDirectory 中有几个比较重要的逻辑:

第一是 Invoker 的列举逻辑:doList;

第二是接收服务配置变更的逻辑:notify;

第三是 Invoker 列表的刷新逻辑。

doList

    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            // 服务提供者关闭或禁用了服务,此时抛出 No provider 异常
            // 1. No service provider 2. Service providers are disabled
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
                    getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
                    NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
                    ", please check status of providers(disabled, not registered or in blacklist).");
        }

        if (multiGroup) {
            return this.invokers == null ? Collections.emptyList() : this.invokers;
        }

        List<Invoker<T>> invokers = null;
        try {
            // Get invokers from cache, only runtime routers will be executed.
            invokers = routerChain.route(getConsumerUrl(), invocation);
        } catch (Throwable t) {
            logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
        }
        
        return invokers == null ? Collections.emptyList() : invokers;
    }

notify

RegistryDirectory实现了NotifyListener接口的notify方法,通过这个接口获取注册中心变更通知,进而动态动态调整服务目录。

    @Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(url -> {
                    if (UrlUtils.isConfigurator(url)) {
                        return CONFIGURATORS_CATEGORY;
                    } else if (UrlUtils.isRoute(url)) {
                        return ROUTERS_CATEGORY;
                    } else if (UrlUtils.isProvider(url)) {
                        return PROVIDERS_CATEGORY;
                    }
                    return "";
                }));

        // 配置器urls
        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        // 路由urls
        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        toRouters(routerURLs).ifPresent(this::addRouters);

        // 服务提供者urls
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        refreshOverrideAndInvoker(providerURLs);
    }

    private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

refreshInvoker

refreshInvoker 方法是保证 RegistryDirectory 随注册中心变化而变化的关键所在。

主要逻辑:

1、根据入参 invokerUrls 的数量和协议头判断是否禁用所有的服务,如果禁用,则将 forbidden 设为 true,并销毁所有的 Invoker。

2、若不禁用,则将 url 转成 Invoker,得到 <url, Invoker> 的映射关系。

3、合并多个组的invoker

4、销毁无用的 Invoker,避免服务消费者调用已下线的服务的服务。

    private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null");

        // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
        if (invokerUrls.size() == 1
                && invokerUrls.get(0) != null
                && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // Forbid to access
            this.invokers = Collections.emptyList();
            routerChain.setInvokers(this.invokers);
            destroyAllInvokers(); // Close all invokers
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls == Collections.<URL>emptyList()) {
                invokerUrls = new ArrayList<>();
            }
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                // 添加缓存 url 到 invokerUrls 中
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<>();
                // 缓存 invokerUrls
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            // 将 url 转成 Invoker
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

            /**
             * If the calculation is wrong, it is not processed.
             *
             * 1. The protocol configured by the client is inconsistent with the protocol of the server.
             *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
             * 2. The registration center is not robust and pushes illegal specification data.
             *
             */
            // 转换出错,直接打印异常,并返回
            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                        .toString()));
                return;
            }

            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
            // pre-route and build cache, notice that route cache should build on original Invoker list.
            // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
            routerChain.setInvokers(newInvokers);
            // 合并多个组的 Invoker
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;

            try {
                // 销毁无用 Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

toInvokers

toInvokers 方法一开始会对服务提供者 url 进行检测,若服务消费端的配置不支持服务端的协议,或服务端 url 协议头为 empty 时,toInvokers 均会忽略服务提供方 url。必要的检测做完后,紧接着是合并 url,然后访问缓存,尝试获取与 url 对应的 invoker。如果缓存命中,直接将 Invoker 存入 newUrlInvokerMap 中即可。如果未命中,则需新建 Invoker。

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<>();
        // 获取服务消费端配置的协议
        String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                // 检测服务提供者协议是否被服务消费者所支持
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    // 若服务消费者协议头不被消费者所支持,则忽略当前 providerUrl
                    continue;
                }
            }
            // 忽略 empty 协议
            if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            // 通过 SPI 检测服务端协议是否被消费端支持,不支持则抛出异常
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            // 合并 url
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                // 忽略重复 url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            // 将本地 Invoker 缓存赋值给 localUrlInvokerMap
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            // 获取与 url 对应的 Invoker
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            // 缓存未命中
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(DISABLED_KEY)) {
                        // 获取 disable 配置,取反,然后赋值给 enable 变量
                        enabled = !url.getParameter(DISABLED_KEY, false);
                    } else {
                        // 获取 enable 配置,并赋值给 enable 变量
                        enabled = url.getParameter(ENABLED_KEY, true);
                    }
                    if (enabled) {
                        // 调用 refer 获取 Invoker
                        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                // 缓存 Invoker 实例
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

toMergeInvokerList

首先是生成 group 到 Invoker 类比的映射关系表,若关系表中的映射关系数量大于1,表示有多组服务。此时通过集群类合并每组 Invoker,并将合并结果存储到 mergedInvokers 中,最后返回mergedInvokers。

    private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
        List<Invoker<T>> mergedInvokers = new ArrayList<>();
        Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
        for (Invoker<T> invoker : invokers) {
            String group = invoker.getUrl().getParameter(GROUP_KEY, "");
            groupMap.computeIfAbsent(group, k -> new ArrayList<>());
            groupMap.get(group).add(invoker);
        }

        if (groupMap.size() == 1) {
            // 如果 groupMap 中仅包含一组键值对,此时直接取出该键值对的值即可
            mergedInvokers.addAll(groupMap.values().iterator().next());
        } else if (groupMap.size() > 1) {
            // groupMap.size() > 1 成立,表示 groupMap 中包含多组键值对,比如:
            // {
            //     "dubbo": [invoker1, invoker2, invoker3, ...],
            //     "hello": [invoker4, invoker5, invoker6, ...]
            // }
            for (List<Invoker<T>> groupList : groupMap.values()) {
                StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
                staticDirectory.buildRouterChain();
                // 通过集群类合并每个分组对应的 Invoker 列表
                mergedInvokers.add(CLUSTER.join(staticDirectory));
            }
        } else {
            mergedInvokers = invokers;
        }
        return mergedInvokers;
    }

destroyUnusedInvokers

主要逻辑:

通过 newUrlInvokerMap 找出待删除 Invoker 对应的 url,并将 url 存入到 deleted 列表中。然后再遍历 deleted 列表,并从 oldUrlInvokerMap 中移除相应的 Invoker,销毁之。

    private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            destroyAllInvokers();
            return;
        }
        // check deleted invoker
        List<String> deleted = null;
        if (oldUrlInvokerMap != null) {
            // 获取新生成的 Invoker 列表
            Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
            // 遍历老的 <url, Invoker> 映射表
            for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
                // 检测 newInvokers 中是否包含老的 Invoker
                if (!newInvokers.contains(entry.getValue())) {
                    if (deleted == null) {
                        deleted = new ArrayList<>();
                    }
                    // 若不包含,则将老的 Invoker 对应的 url 存入 deleted 列表中
                    deleted.add(entry.getKey());
                }
            }
        }

        if (deleted != null) {
            // 遍历 deleted 集合,并到老的 <url, Invoker> 映射关系表查出 Invoker,销毁之
            for (String url : deleted) {
                if (url != null) {
                    // 从 oldUrlInvokerMap 中移除 url 对应的 Invoker
                    Invoker<T> invoker = oldUrlInvokerMap.remove(url);
                    if (invoker != null) {
                        try {
                            // 从 oldUrlInvokerMap 中移除 url 对应的 Invoker
                            invoker.destroy();
                            if (logger.isDebugEnabled()) {
                                logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
                            }
                        } catch (Exception e) {
                            logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
                        }
                    }
                }
            }
        }
    }

以上参考自官网:http://dubbo.apache.org/zh-cn/docs/source_code_guide/directory.html

发布了149 篇原创文章 · 获赞 100 · 访问量 18万+

猜你喜欢

转载自blog.csdn.net/u011212394/article/details/103407752