我们知道,一般Dubbo服务端会存在多个实例,Dubbo消费端内置了策略,可以进行负载均衡调度。我们接下来大致看下。
官方给出的路由、负载均衡整体架构如下:
在开始之前,先说下Dubbo中个几个概念:
Cluster
俗称的集群,为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样,在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环境下出现了多个服务提供者。这时会出现一个问题,服务消费者需要决定选择哪个服务提供者进行调用。另外服务调用失败时的处理措施也是需要考虑的,是重试呢,还是抛出异常,亦或是只打印异常等。为了处理这些问题,Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。这样一来,服务消费者只需通过这个 Invoker 进行远程调用即可,至于具体调用哪个服务提供者,以及调用失败后如何处理等问题,现在都交给集群模块去处理。集群模块是服务提供者和服务消费者的中间层,为服务消费者屏蔽了服务提供者的情况,这样服务消费者就可以专心处理远程调用相关事宜。比如发请求,接受服务提供者返回的数据等。这就是集群的作用。集群工作过程可分为两个阶段,第一个阶段是在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例,该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。Directory 的用途是保存 Invoker,可简单类比为 List。其实现类 RegistryDirectory 是一个动态服务目录,可感知注册中心配置的变化,它所持有的 Invoker 列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory 会动态增删 Invoker,并调用 Router 的 route 方法进行路由,过滤掉不符合路由规则的 Invoker。当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。最后 FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。Directory
服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过 Netty 等客户端进行远程调用。在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。或者,如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。实际上服务目录在获取注册中心的服务配置信息后,会为每条配置信息生成一个 Invoker 对象,并把这个 Invoker 对象存储起来,这个 Invoker 才是服务目录最终持有的对象。而Invoker就是最终持有的远程方法调用LoadBalance
负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。负载均衡可分为软件负载均衡和硬件负载均衡。在我们日常开发中,一般很难接触到硬件负载均衡。但软件负载均衡还是可以接触到的,比如 Nginx。在 Dubbo 中,也有负载均衡的概念和相应的实现。Dubbo 需要对服务消费者的调用请求进行分配,避免少数服务提供者负载过大。服务提供者负载过大,会导致部分请求超时。因此将负载均衡到每个服务提供者上,是非常必要的。Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。Router
服务路由,。服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。服务路由包含一条路由规则,路由规则决定了服务消费者的调用目标,即规定了服务消费者可调用哪些服务提供者。Dubbo 目前提供了三种服务路由实现,分别为条件路由 ConditionRouter、脚本路由 ScriptRouter 和标签路由 TagRouter。
上述解释来自Dubbo逛网。
可以看到,Dubbo的负载均衡就是借助这几个模块来实现。而其
基于之前的分析Dubbo消费端启动流程、处理逻辑,方法调用实现(基于Dubbo3)
我们知道在ReferenceConfig
进行了Invoker的初始化:
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new ServiceConfigURL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
if (url != null && url.length() > 0) {
// user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
urls.add(url.putAttribute(REFER_KEY, map));
} else {
URL peerURL = ClusterUtils.mergeUrl(url, map);
peerURL = peerURL.putAttribute(PEER_KEY, true);
urls.add(peerURL);
}
}
}
} else {
// assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
u = u.putAttribute(MONITOR_KEY, monitorUrl);
}
urls.add(u.putAttribute(REFER_KEY, map));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException(
"No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
" use dubbo version " + Version.getVersion() +
", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) {
// registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else {
// not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Referred dubbo service " + interfaceClass.getName());
}
URL consumerURL = new ServiceConfigURL(CONSUMER_PROTOCOL, map.get(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
MetadataUtils.publishServiceDefinition(consumerURL);
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
在这里我们看到,如果只有一个地址且非注册地址的话,那么就直连,不进行任何负载均衡处理。
如果有多个地址,且地址是注册地址的话,那么这里对应registry
对应的protocol,
这里REF_PROTOCOL
是一个AdaptiveExtension
,
private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
在Dubhbo中对应的是Protocol$Adaptive
,其refer实现如下:
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
这里我们就获取到了一个RegistryProtocol
,其对应的refer
实现如下:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl, url);
}
如果是RegistryService
服务,直接注册。其他走doRefer
.
而这里可以看到,如果我们配置了group
属性,那么会在这里进行合并,获取到的是一个MergeableCluster
.
而接下来的doRefer
中则会进行实际的服务引用处理:
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
Map<String, Object> consumerAttribute = new HashMap<>(url.getAttributes());
consumerAttribute.remove(REFER_KEY);
URL consumerUrl = new ServiceConfigURL(parameters.get(PROTOCOL_KEY) == null ? DUBBO : parameters.get(PROTOCOL_KEY),
null,
null,
parameters.get(REGISTER_IP_KEY),
0, getPath(parameters, type),
parameters,
consumerAttribute);
url = url.putAttribute(CONSUMER_URL_KEY, consumerUrl);
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
return interceptInvoker(migrationInvoker, url, consumerUrl, url);
}
ClusterInvoker migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
这里返回的是一个ServiceDiscoveryMigrationInvoker
.
另外需要注意看下这个方法interceptInvoker
:
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl, URL registryURL) {
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, invoker, consumerUrl, registryURL);
}
return invoker;
}
这个方法相当于是调用了对应监听器,而Dubbo中默认的实现是MigrationRuleListener
,当触发refer事件之后,会调用MigrationRuleHandler.doMigrate
,最后会触发:
serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url);
最后是返回了一个InvocationInterceptorInvoker
.
回到ReferenceConfig
的createProxy
处
if (registryURL != null) { // registry url is available
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else {
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
这里如果是通过服务注册来获取的话,那么返回的是一个:ZoneAwareClusterInvoker
(用来处理不同机房zone)
如果我们没有特殊指定,ZoneAwareClusterInvoker会持有一个FailoverClusterInvoker,同样的FailoverClusterInvoker里面会在持有一个Invoker
即ZoneAwareClusterInvoker => FailoverClusterInvoker => Invoker
大概是这样一个封装流程。
zone,多注册中心机房处理
对于在配置中使用了机房配置的话,比如:
dubbo.registries.shanghai.id=shanghai
dubbo.registries.shanghai.address=xxxx
dubbo.registries.shanghai.zone=shanghai
dubbo.registries.beijing.id=beijing
dubbo.registries.beijing.address=xxxx
dubbo.registries.beijing.zone=beijing
为了进行流量的分离,需要北京机房的只能消费北京,上海机房只能消费上海的。
这时候ZoneAwareClusterInvoker
首先会根据zone
在多个机房里面选择一个:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
ClusterInvoker<T> clusterInvoker = (ClusterInvoker<T>) invoker;
if (clusterInvoker.isAvailable() && clusterInvoker.getRegistryUrl()
.getParameter(PREFERRED_KEY, false)) {
return clusterInvoker.invoke(invocation);
}
}
String zone = invocation.getAttachment(REGISTRY_ZONE);
if (StringUtils.isNotEmpty(zone)) {
for (Invoker<T> invoker : invokers) {
ClusterInvoker<T> clusterInvoker = (ClusterInvoker<T>) invoker;
if (clusterInvoker.isAvailable() && zone.equals(clusterInvoker.getRegistryUrl().getParameter(ZONE_KEY))) {
return clusterInvoker.invoke(invocation);
}
}
String force = invocation.getAttachment(REGISTRY_ZONE_FORCE);
if (StringUtils.isNotEmpty(force) && "true".equalsIgnoreCase(force)) {
throw new IllegalStateException("No registry instance in zone or no available providers in the registry, zone: "
+ zone
+ ", registries: " + invokers.stream().map(invoker -> ((MockClusterInvoker<T>) invoker).getRegistryUrl().toString()).collect(Collectors.joining(",")));
}
}
Invoker<T> balancedInvoker = select(loadbalance, invocation, invokers, null);
if (balancedInvoker.isAvailable()) {
return balancedInvoker.invoke(invocation);
}
for (Invoker<T> invoker : invokers) {
ClusterInvoker<T> clusterInvoker = (ClusterInvoker<T>) invoker;
if (clusterInvoker.isAvailable()) {
return clusterInvoker.invoke(invocation);
}
}
return invokers.get(0).invoke(invocation);
}
可以看到
- 如果设置了:
dubbo.registries.shanghai.preferred=true
那么不管其他规则,优先选用该注册中心。
2. 如果没有设置preferred
,那么同一个zone优先级高
3. 如果没有设置zone,那么根据权重
dubbo.registries.shanghai.weight=100
- 如果没有其他选项,任选一个
Cluster集群
这样我们就选择了一个注册中心,接下来就是通过Cluster
从注册中心的多个服务选择一个。
dubbo.registries.shanghai.cluster=failover
可选的策略如下:
failover
默认策略,FailoverCluster
,对应FailoverClusterInvoker
,FailoverClusterInvoker 在调用失败时,会自动切换 Invoker 进行重试。默认配置下,Dubbo 会使用这个类作为缺省 Cluster Invoker。failback
策略,FailbackCluster
对应FailbackClusterInvoker
,FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。failfast
策略,FailfastCluster
对应FailfastClusterInvoker
,FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。failsafe
策略,FailsafeCluster
对应FailsafeClusterInvoker
,FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。forking
测录,ForkingCluster
对应ForkingClusterInvoker
,ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。brocadcast
,BroadcastCluster
对应BroadcastClusterInvoker
。BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。
服务目录
我们从Cluster选择Invoker的时候,Cluster并不持有Invoker的集合,而是在内部维护了一个服务目录,通过服务目录来获取Invoker的集合列表。
在Dubbo中,服务目录抽象为AbstractDirectory
,类结构层次如下:
我们挑选两个说明一下:
StaticDirectory
:这是一个静态固定的服务目录,这个目录里面的Invoker列表是不变的RegistryDirectory
,继承自DynamicDirectory
,从名字上能够看出这是一个动态服务目录,里面的Invoker列表是会变化的。对于RegistryDirectory
其会根据注册中心中的注册服务上下线动态变化。
另外在服务目录获取Invoker
列表的时候,会根据对应的路由Router
在此进行筛选。
Router路由
通过服务目录获取Invoker列表的时候,会通过路由设置对Invoker列表进行过滤,常见的有ConditionRouter
、TagRouter
、ScriptRouter
等。
LoadBalance 负载均衡
当我们通过Cluster
获取到Invoker
集合之后,接下来会通过LoadBalance
从待选的集合中选择一个Invoker去最终执行请求。Dubbo中提供了如下几种LoadBalance
负载均衡策略:
consistenthash
,ConsistentHashLoadBalance
一致性hash的实现,random
,RandomLoadBalance
是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2],权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。roundrobin
,RoundRobinLoadBalance
加权轮询负载均衡,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。leastactive
,LeastActiveLoadBalance
,LeastActiveLoadBalance 是基于加权最小活跃数算法实现的。活跃调用数越小,表明该服务提供者效率越高,单位时间内可处理更多的请求。此时应优先将请求分配给该服务提供者。在具体实现中,每个服务提供者对应一个活跃数 active。初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。除了最小活跃数,LeastActiveLoadBalance 在实现上还引入了权重值。
可以看到,整个Dubbo中集群负载均衡大致流程如下:
如果是注册中心方式,则首先会判断是否有zone配置,如果有zone配置,根据zone的方式从多个服务注册地址里面选择一个服务注册中心,选择好后从选择的服务注册中心里面根据配置的Cluster方式选择一个Invoker集合(这时候选择是通过服务目录选择的,服务目录中会根据配置的路由方式再次筛选),然后在对选择好的Invoker列表进行负载均衡选择,根据配置好的负载均衡策略,最终选择一个Invoker
如果是非注册中心方式(直连方式)且有多个地址,那么和上面的方式基本一样,只不过没有zone这一层
如果是非注册中心且只有一个地址,那么直连,不会有任何其他处理