图解+源码讲解 Ribbon 如何获取注册中心的实例

一起养成写作习惯!这是我参与「掘金日新计划 · 4 月更文挑战」的第12天,点击查看活动详情

图解+源码讲解 Ribbon 如何获取注册中心的实例

行动是成功的阶梯,行动越多,登得越高

想像一下在哪里开始

    其实我们之前都说过就是负载均衡器从获取的注册表中选取一个可以使用的服务,那我用了Eureka 注册中心的话,应该也是通过注册中心的客户端获取的注册表才对,那我们就看看在进行负载均衡器初始化的地方有没有线索

获取配置流程图

image.png

Ribbon 客户端配置类

    RibbonClientConfiguration 客户端配置类,看看这里面都初始化什么东西了,看看主要的逻辑部分,里面创建了路由规则、ping规则、服务列表、服务列表更新器、负载均衡器创建、过滤器以及上下文等等

public class RibbonClientConfiguration {
    // 连接超时时间
	public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
	// 读取超时时间
	public static final int DEFAULT_READ_TIMEOUT = 1000;
	@RibbonClientName
	private String name = "client";
    // ribbon 客户端配置
	@Bean
	@ConditionalOnMissingBean
	public IClientConfig ribbonClientConfig() {
	    ....
	}
    // ribbon 路由规则
	@Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
	    ....
	}
    // 路由 ping 操作
	@Bean
	@ConditionalOnMissingBean
	public IPing ribbonPing(IClientConfig config) {
	     ....
	}
    // ribbon 服务列表
	@Bean
	@ConditionalOnMissingBean
	@SuppressWarnings("unchecked")
	public ServerList<Server> ribbonServerList(IClientConfig config) {
		 ....
	}
    // ribbon 服务列表更新器
	@Bean
	@ConditionalOnMissingBean
	public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
		....
	}
    // 负载均衡器
	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}
    // 服务列表过滤器
	@Bean
	@ConditionalOnMissingBean
	@SuppressWarnings("unchecked")
	public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
		....
	}
    // ribbon 负载均衡上下文
	@Bean
	@ConditionalOnMissingBean
	public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
			IClientConfig config, RetryHandler retryHandler) {
		 ....
	}
    // 重试处理器
	@Bean
	@ConditionalOnMissingBean
	public RetryHandler retryHandler(IClientConfig config) {
		....
	}

}
复制代码

创建负载均衡器 ribbonLoadBalancer

    根据其他方法初始化好的参数进行 ribbonLoadBalancer 构造,这里面有Rule 规则、Ping机制、以及你要访问的服务地址等参数,实际创建的负载均衡器默认创建的是 ZoneAwareLoadBalancer 负载均衡器

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
            serverListFilter, serverListUpdater);
}
复制代码

默认负载均衡器 ZoneAwareLoadBalancer

    这个负载均衡器继承了 DynamicServerListLoadBalancer动态服务列表负载均衡器,一看就是我们需要的负载均衡器

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
                             IPing ping, ServerList<T> serverList,
                             ServerListFilter<T> filter,
                             ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
}
复制代码

    这里面的 super 方法就是 **DynamicServerListLoadBalancer **这里面的方法

动态服务列表负载均衡器 DynamicServerListLoadBalancer

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule,
    IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
    ServerListUpdater serverListUpdater) {
    super(clientConfig, rule, ping);
    this.serverListImpl = serverList;
    this.filter = filter;
    this.serverListUpdater = serverListUpdater;
    if (filter instanceof AbstractServerListFilter) {
        ((AbstractServerListFilter) filter).
            setLoadBalancerStats(getLoadBalancerStats());
    }
    // 核心方法,其他的方法先不细看
    restOfInit(clientConfig);
}
复制代码

restOfInit 方法

    这个方法里面是真正的获取服务实例列表操作,和开启后续如注册中心推送新的实例变更信息或者拉取实例变更操作

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // 开启并且初始化加载新的服务实例,比如注册中心推送或者拉取操作,后面讲解
    enableAndInitLearnNewServersFeature();
    // 获取服务列表操作
    updateListOfServers();
    ......
}
复制代码

更新服务列表 updateListOfServers()

@VisibleForTesting
public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        //获取服务列表信息
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                getIdentifier(), servers);
    }
    // 更新LoadBalancer中的所有服务列表
    updateAllServerList(servers);
}
复制代码

获取更新服务列表 getUpdatedListOfServers()

    走的是 DiscoveryEnabledNIWSServerList 这个类下面的 getUpdatedListOfServers() 操作,走的是注册中心的服务列表
image.png

@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
    // 获得服务方法
    return obtainServersViaDiscovery();
}
复制代码

获取服务发现列表 obtainServersViaDiscovery

private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
    List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
    // 如果客户端为提供者为空的话那么就返回一个空的列表
    if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
        logger.warn("EurekaClient has not been initialized yet,
                    returning an empty list");
        return new ArrayList<DiscoveryEnabledServer>();
    }
    // 获取 eureka 客户端操作
    EurekaClient eurekaClient = eurekaClientProvider.get();
    // 服务地址不为空的情况下
    if (vipAddresses!=null){
        for (String vipAddress : vipAddresses.split(",")) {
            // 通过 eureka 客户端获取服务实例操作
        List<InstanceInfo> listOfInstanceInfo = 
            eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
        for (InstanceInfo ii : listOfInstanceInfo) {
                // 服务实例必须是启动状态的或者正常状态的
            if (ii.getStatus().equals(InstanceStatus.UP)) {
                // 创建服务操作,加入到服务列表中
                DiscoveryEnabledServer des = 
                    createServer(ii, isSecure, shouldUseIpAddr);
                serverList.add(des);
                }
            }
        }
    }
    // 返回服务列表
    return serverList;
}
复制代码

image.png

更新LoadBalancer中的所有服务列表 updateAllServerList(servers)

protected void updateAllServerList(List<T> ls) {
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); 
            }
            setServersList(ls);
            // ping 服务后期讲解
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}
复制代码

设置服务列表 setServersList

    服务客户端获取的服务列表实例放入到 BaseLoadBalancer 的 allServerList 中,将客户端获取的服务列表实例放入到 serversInZones 这个haspMap中

@Override
public void setServersList(List lsrv) {
    // 将服务客户端获取的服务列表实例放入到 BaseLoadBalancer 的 allServerList 中
    super.setServersList(lsrv);
    // 下面是将客户端获取的服务列表实例放入到 serversInZones 这个haspMap中
    List<T> serverList = (List<T>) lsrv;
    Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>();
    for (Server server : serverList) {
        getLoadBalancerStats().getSingleServerStat(server);
        String zone = server.getZone();
        if (zone != null) {
            zone = zone.toLowerCase();
            List<Server> servers = serversInZones.get(zone);
            if (servers == null) {
                servers = new ArrayList<Server>();
                serversInZones.put(zone, servers);
            }
            servers.add(server);
        }
    }
    setServerListForZones(serversInZones);
}
复制代码

小结

  1. 创建一些必要的前提组件,比如 ZoneAwareLoadBalancer、PollingServerListUpdater 等操作
  2. 通过eureka client 客户端组件获取服务列表
  3. 根据获取的服务列表信息进行负载均衡器内部内部的服务列表进行更新

猜你喜欢

转载自juejin.im/post/7085520967547486221