前言
集群容错源码包含四个部分,分别是服务目录 Directory、服务路由 Router、集群 Cluster和 负载均衡 LoadBalance。
快速上手
根据集群容源码,编写了例子:
public class ClusterTest {
public static void main(String[] args) throws Exception {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(GreetingsService.class);
MethodDescriptor methodDescriptor = serviceDescriptor.getMethod("sayHi", new Class[]{
String.class});
Method method = methodDescriptor.getMethod();
String serviceName = serviceDescriptor.getServiceName();
URL url = URL.valueOf("dubbo://127.0.0.1:28092/"+serviceName+"?timeout=12000&monitor=mm");
Protocol protocol = new ProtocolFilterWrapper(new DubboProtocol());
protocol.export(new Invoker<GreetingsService>() {
// ...
@Override
public Result invoke(Invocation invocation) throws RpcException {
System.out.println("yoyoy111111111");
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
});
Invoker invoker = protocol.refer(GreetingsService.class, url);
// 把远程对象invoker加入StaticDirectory
// 创建集群invoker代码Cluster
// Cluster调用的invoke,会从StaticDirectory加载可能的远程对象执行
invoker = Cluster.getCluster(null, false).join(new StaticDirectory(Lists.newArrayList(invoker)));
invoker.invoke(new RpcInvocation(method, serviceName, new String[]{
"yoyo"}));
}
}
分析invoke源码得出:
集群工作过程可分为两个阶段:
- 在服务消费者初始化期间,集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
- 第二个阶段是在服务消费者进行远程调用时。以 FailoverClusterInvoker 为例
- 该类型 Cluster Invoker 首先会调用 Directory 的 list 方法列举 Invoker 列表(可将 Invoker 简单理解为服务提供者)。
- Directory 的用途是保存 Invoker,可简单类比为 List。
- 当 FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表后,它会通过 LoadBalance 从 Invoker 列表中选择一个 Invoker。
- FailoverClusterInvoker 会将参数传给 LoadBalance 选择出的 Invoker 实例的 invoke 方法,进行真正的远程调用。
服务目录
StaticDirectory即为服务目录,它的作用维护远程调用Invoker集合的。分析其源码
public abstract class AbstractDirectory<T> implements Directory<T> {
// 注册中心url 例:
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=test2&dubbo=2.0.2&pid=7861&release=2.7.8×tamp=1616255181499
private final URL url;
// 注册中心url + refer中参数
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=test2&dubbo=2.0.2&interface=dubbo.GreetingsService&metadata-type=remote&methods=sayHi&pid=7861&protocol=dubbo®ister.ip=192.168.3.5&release=2.7.8&side=consumer&sticky=false×tamp=1616255078292
private volatile URL consumerUrl;
// 路由规则,用于过滤不符合要求的远程Invoker
private volatile List<Router> routers;
}
// 静态目录实现类
public class StaticDirectory<T> extends AbstractDirectory<T> {
// consumerUrl 对应的远程对象集合
private final List<Invoker<T>> invokers;
// 目录对应的接口
public Class<T> getInterface() {
return invokers.get(0).getInterface();
}
// 根据url利用spi提取router对象并创建路由链对象
public void buildRouterChain() {
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
routerChain.setInvokers(invokers);
this.setRouterChain(routerChain);
}
// 根据传入运行时会话invocation通过Router规则对invokers进行过滤
// 返回符合要求的
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
List<Invoker<T>> finalInvokers = invokers;
if (routerChain != null) {
finalInvokers = routerChain.route(getConsumerUrl(), invocation);
}
return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}
}
服务路由
上述的routerChain对象就是由服务路由Router组成的一条链路。其作用是
在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由,筛选出符合路由规则的服务提供者。Dubbo 目前提供了三种服务路由实现。
条件路由 ConditionRouter
条件路由规则由两个条件组成,分别用于对服务消费者和提供者进行匹配。 比如有这样一条规则:
- host = 10.20.153.10 => host = 10.20.153.11
- 该条规则表示 IP 为 10.20.153.10 的服务消费者只可调用 IP 为 10.20.153.11 机器上的服务
如果服务消费者匹配条件为空,表示不对服务消费者进行限制。如果服务提供者匹配条件为空,表示对某些服务消费者禁用服务。
脚本路由 ScriptRouter
支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。如:
- 所有包含get的方法全部调用192.168.0.23这台提供者
(function(invokers,invocation,context){
var methodName = invocation.methodName;
var resultInvokers=[];
for(var i=0;i<invokers.length;i++){
var invoker = invokers[i];
var currentConsumerUrl = invoker.url;
var providerUrl = invoker.providerUrl;
var consumerHost=currentConsumerUrl.host;
var providerHost = providerUrl.host;
var providerProtocol = providerUrl.protocol;
var currentService = providerUrl.getParameter("interface");
if(methodName.indexOf('get')>=0&&providerHost=='192.168.0.23'){
resultInvokers.push(invoker);
}
}
//该方法返回值必须是一个Invoker的集合或者数组
return resultInvokers;})(invokers,invocation,context);
其实就是用脚本的方式实现写个方法执行
标签路由 TagRouter
标签路由通过将某一个或多个服务的提供者划分到同一个分组,约束流量只在指定分组中流转,从而实现流量隔离的目的。
- 对于provider端,可以通过静态或动态的方式对服务进行打标签,静态打标可以在JVM的启动参数上增加-Ddubbo.provider.tag={env_tag}来实现,动态打标则是直接修改provider在注册中心上的地址实现
- 对于consumer端,请求标签的作用域为每一次 invocation,使用attachment来传递请求标签.目前仅仅支持硬编码的方式设置dubbo.tag
它与dubbo的version、group机制有什么区别?
- dubbo的version与group是静态的路由,如果URL中带了不匹配的version,在上图代码中的invokers就根本不存在该服务提供者;
- tag路由是动态的,就算带了不匹配的tag,也是会放在invokers中,每次调用时都执行一次路由逻辑。
标签路由降级约定
- consumer dubbo.tag=tag1 时优先选择标记了tag=tag1的provider。若集群中不存在与请求标记对应的服务,默认将降级请求 tag为空的provider;如果要改变这种默认行为,即找不到匹配tag1的provider返回异常,需设置request.tag.force=true;
- consumer dubbo.tag未设置时,只会匹配tag为空的provider。即使集群中存在可用的服务,若tag不匹配也就无法调用,这与约定1不同,携带标签的请求可以降级访问到无标签的服务,但不携带标签/携带其他种类标签的请求永远无法访问到其他标签的服务。
负载均衡
FailoverClusterInvoker中的select()方法最近会使用到负载均衡,何为负载均衡?它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。Dubbo 提供了4种负载均衡实现
基于权重随机算法的 RandomLoadBalance
- 假设我们有一组服务器 servers = [A, B, C] 对应的权重为 weights = [5, 3, 2],权重总和为10。
- 现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。
- 接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。
权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次,服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。
基于最少活跃调用数算法的 LeastActiveLoadBalance
- 每个服务提供者对应一个活跃数 active。
- 初始情况下,所有服务提供者活跃数均为0。每收到一个请求,活跃数加1,完成请求后则将活跃数减1。
- 某一时刻它们的活跃数相同,此时 Dubbo 会根据它们的权重去分配请求,权重越大,获取到新请求的概率就越大。
在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求
基于 hash 一致性的 ConsistentHashLoadBalance
一致性 hash 算法由麻省理工学院的 Karger 及其合作者于1997年提出的,算法提出之初是用于大规模缓存系统的负载均衡。它的工作过程是这样的
- 首先根据 ip 或者其他的信息为缓存节点生成一个 hash,并将这个 hash 投射到 [0, 232 - 1] 的圆环上。
- 当有查询或写入请求时,则为缓存项的 key 生成一个 hash 值。然后查找第一个大于或等于该 hash 值的缓存节点,并到这个节点中查询或写入缓存项。
- 如果当前节点挂了,则在下一次查询或写入缓存时,为缓存项查找另一个大于其 hash 值的缓存节点即可。
大致效果如上图所示,每个缓存节点在圆环上占据一个位置。如果缓存项的 key 的 hash 值小于缓存节点 hash 值,则到该缓存节点中存储或读取缓存项。比如下面绿色点对应的缓存项将会被存储到 cache-2 节点中。由于 cache-3 挂了,原本应该存到该节点中的缓存项最终会存储到 cache-4 节点中。
我们把上图的缓存节点替换成 Dubbo 的服务提供者
这里相同颜色的节点均属于同一个服务提供者,比如 Invoker1-1,Invoker1-2,……, Invoker1-160。这样做的目的是通过引入虚拟节点,让 Invoker 在圆环上分散开来,避免数据倾斜问题。所谓数据倾斜是指,由于节点不够分散,导致大量请求落到了同一个节点上,而其他节点只会接收到了少量请求的情况
基于加权轮询算法的 RoundRobinLoadBalance
- 假设我们有三台服务器 servers = [A, B, C],对应的权重为 weights = [2, 5, 1]
- 三次请求分别返回A,B,C,weight=[1,4,0]
- 二次请求分别反回A,B,weight=[0,3,0]
- 三次请求全返回B
- 恢复新的一轮,weights = [2, 5, 1]
- 我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。
- 但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。
以默认的RandomLoadBalance为例子源码分析
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
int totalWeight = 0;
boolean sameWeight = true;
// 下面这个循环有两个作用,第一是计算总权重 totalWeight,
// 第二是检测每个服务提供者的权重是否相同
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// 累加权重
totalWeight += weight;
// 检测当前服务提供者的权重与上一个服务提供者的权重是否相同,
// 不相同的话,则将 sameWeight 置为 false。
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 下面的 if 分支主要用于获取随机数,并计算随机数落在哪个区间上
if (totalWeight > 0 && !sameWeight) {
// 随机获取一个 [0, totalWeight) 区间内的数字
int offset = random.nextInt(totalWeight);
// 循环让 offset 数减去服务提供者权重值,当 offset 小于0时,返回相应的 Invoker。
// 举例说明一下,我们有 servers = [A, B, C],weights = [5, 3, 2],offset = 7。
// 第一次循环,offset - 5 = 2 > 0,即 offset > 5,
// 表明其不会落在服务器 A 对应的区间上。
// 第二次循环,offset - 3 = -1 < 0,即 5 < offset < 8,
// 表明其会落在服务器 B 对应的区间上
for (int i = 0; i < length; i++) {
// 让随机值 offset 减去权重值
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
// 返回相应的 Invoker
return invokers.get(i);
}
}
}
// 如果所有服务提供者权重值相同,此时直接随机返回一个即可
return invokers.get(random.nextInt(length));
}
}
集群
为了避免单点故障,现在的应用通常至少会部署在两台服务器上。Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker。集群 Cluster 用途是将多个服务提供者合并为一个 Cluster Invoker,并将这个 Invoker 暴露给服务消费者。Dubbo 主要提供了这样几种容错方式:
- FailoverClusterInvoker - 失败自动切换,在调用失败时,会自动切换 Invoker 进行重试。默认配置。
- FailfastClusterInvoker - 快速失败,只会进行一次调用,失败后立即抛出异常。适用于幂等操作,比如新增记录。
- FailsafeClusterInvoker - 失败安全,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。适用于写入审计日志等操作。
- FailbackClusterInvoke - 失败自动恢复,会在调用失败后,返回一个空结果给服务消费者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。
- ForkingClusterInvoker - 并行调用多个服务提供者,会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。
以默认的FailoverClusterInvoker为例子进行源码分析
public class<T> extends AbstractClusterInvoker<T> {
// 省略部分代码
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 获取重试次数
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了
// 通过调用list可得到最新可用符合路由规则的Invoker列表
copyinvokers = list(invocation);
// 对 copyinvokers 进行判空检查
checkInvokers(copyinvokers, invocation);
}
// 1. 如果粘滞连接开启,则使用粘滞连接
// 2. 用均衡算法选取一个
// 3. 发现2选取的是已用过的排除
// 4. 如果3步聚全部排序了,则在已用过的选取一个
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重试失败,则抛出异常
throw new RpcException(..., "Failed to invoke the method ...");
}
}
注册中心
如果一直使用静态服务目录,dubbo集群将变的很死板。因此dubbo提供了动态RegistryDirectory动态服务目录。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
// 集群策略,默认为failover。
private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
// 路由工厂,如果注册中心那边发过来路由策略变化,则根据此工厂类创建新路由
private static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getAdaptiveExtension();
// 服务key唯一标识,{group}/{interfaceName}:{version}
private final String serviceKey;
// 服务提供者接口类interfaceName
private final Class<T> serviceType;
// refer中所有的参数
private final Map<String, String> queryMap;
// 与父亲consumerUrl相同, 注册中心url + refer参数
private final URL directoryUrl;
// 是否引用多个服务组, group = aa,bb 或者 *
// @see toMergeInvokerList
private final boolean multiGroup;
// 与服务端通讯所使用的协议,将注册中新提交的url转变成Invoker
private Protocol protocol;
// 注册中心,上面的url发生变动都会通知本类
private Registry registry;
// 是否要把自己的registeredConsumerUrl信息注册到注册中心去
private boolean shouldRegister;
// registeredConsumerUrl包括必要的服务治理数据
private boolean shouldSimplified;
// 最终url = 注册中心url + exportUrl参数 + referUrl参数 + configurators 配置
private volatile URL overrideDirectoryUrl;
// refer Url
private volatile URL registeredConsumerUrl;
// 注册中心,手动配置的参数信息信息
private volatile List<Configurator> configurators;
// 服务提供者, exportUrl - Invoker
private volatile Map<String, Invoker<T>> urlInvokerMap;
// 服务提供者, Invoker集合
private volatile List<Invoker<T>> invokers;
// 服务提供者, 所有的exportUrl
private volatile Set<URL> cachedInvokerUrls;
}
由属性可知其完成依赖于注册中心的服务发现.Demo如下
public class RegistryTest {
public static void main(String[] args) throws Exception {
ApplicationModel.setApplication("app");
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(GreetingsService.class);
MethodDescriptor methodDescriptor = serviceDescriptor.getMethod("sayHi", new Class[]{
String.class});
Method method = methodDescriptor.getMethod();
String serviceName = serviceDescriptor.getServiceName();
repository.registerProvider(serviceName, new GreetingsServiceImpl(),serviceDescriptor, null, null );
// 用于创建ZookeeperRegistry
RegistryFactory registryFactory = new ZookeeperRegistryFactory();
String exportUrl = URL.decode("dubbo://127.0.0.1:28092/dubbo.GreetingsService?monitor=mm&timeout=12000");
String referUrl = URL.decode("register.ip=127.0.0.1:28092");
URL url = URL.valueOf("zookeeper://127.0.0.1:2181/dubbo.GreetingsService?registry=multicast&refer="+referUrl+"&export=" + exportUrl);
// 通过RegistryProtocol可以把注册中心与服务目录关联起来
RegistryProtocol protocol = new RegistryProtocol();
protocol.setProtocol(new DubboProtocol());
protocol.setRegistryFactory(registryFactory);
protocol.export(new Invoker<GreetingsService>() {
// ...
@Override
public Result invoke(Invocation invocation) throws RpcException {
System.out.println("22yoyoyoy");
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
});
RegistryProtocol protocol2 = new RegistryProtocol();
protocol2.setProtocol(new DubboProtocol());
protocol2.setRegistryFactory(registryFactory);
Invoker invoker = protocol2.refer(GreetingsService.class, url);
invoker.invoke(new RpcInvocation(method, serviceName, new String[]{
"yoyo"}));
}
}
- RegistryProtocol.export方法会把url注册到注册中心上去
- RegistryProtocol.doRefer会根据url订阅注册中心变更事件,通过NotifyListener反通知RegistryDirectory
- 注册中心通知事件不仅仅总共有三种:
- configuratorURL ,在管理人员,人工的配置信息
- routerURLs, 管理人员,设置的路由信息
- providerURLs 提供者,由于代码修改,启停所导致的变动
关于ZookeeperRegistry注册中心的细节可参考《 Dubbo之ZookeeperRegistry源码分析》
主要参考
《集群》
《负载均衡》
《服务路由》
《服务目录》
《源码分析Dubbo服务注册与发现机制RegistryDirectory》
《 Dubbo之ZookeeperRegistry源码分析》