引言
最近要写一个根据版本分配权重的负载策略,因为是基于springcloud的策略,先参考了项目中的轮询策略即com.netflix.loadbalancer.RoundRobinRules,使用的是加权随机算法,较为原始。
后面参考dubbo中的加权轮询算法,使用特(不)殊(懂)算法使轮询更为合理,下面就要对这种算法进行分析吧。
逻辑
- 使用本地权重表,根据调用情况动态调整。
- 每次调用根据算法更新权重表,设置本地权重为本地所有权重加上配置权重,选出本地权重最大的服务,并设置它的本地权重减去本本轮总权重。
- 权重表回收,删除1分内未被调用的实例
- 预热期权重算法,预热期默认10分钟,warmWeight = uptime/(warmup/weight),如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10
源码
主要涉及RoundRobinLoadBalance和AbstractLoadBalance两个类。
名词:
- 原始权重:服务设置中的weight
- 动态权重:每次选取操作调整后的权重
- 动态权重总和:每次调整完后的所有服务动态权重总和
- 本地动态权重表:记录本地服务选取时的动态权重信息,每次调用选取算法都会更新。
-
RoundRobinLoadBalance
package org.apache.dubbo.rpc.cluster.loadbalance;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* 轮询负载均衡策略
* Round robin load balance.
*/
public class RoundRobinLoadBalance extends AbstractLoadBalance {
//策略名称
public static final String NAME = "roundrobin";
//动态权重更新时间
private static final int RECYCLE_PERIOD = 60000;
//路由权重
protected static class WeightedRoundRobin {
private int weight;
//动态权重
private AtomicLong current = new AtomicLong(0);
//最后选取时间
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
//每次选取操作增加原始权重
public long increaseCurrent() {
return current.addAndGet(weight);
}
//每次选中减去动态总权重
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
//更新锁
private AtomicBoolean updateLock = new AtomicBoolean();
/**
* get invoker addr list cached for specified invocation
* <p>
* <b>for unit test only</b>
* 获取url对应的权重路由
* 结构如下:
* {
* "bike.get":{
* "url1": WeightedRoundRobin,
* "url2": WeightedRoundRobin,
* ...
* },
* "bike.update:{
* "url1": WeightedRoundRobin,
* "url2": WeightedRoundRobin,
* ...
* }
* }
* @param invokers
* @param invocation
* @return
*/
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
//获取url对应的权重路由
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}
/**
* 根据动态权重表选取服务
* @param invokers 实例列表
* @param url 请求url 在这没啥用
* @param invocation 请求调用信息
* @param <T>
* @return 选出的实例调度器
*/
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
//获取url对应的动态权重表
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
//如果权重表为空,则新建
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
//动态权重总和,用于计算更新动态权重
int totalWeight = 0;
//计算时动态权重最小值
long maxCurrent = Long.MIN_VALUE;
//当前时间,设置为动态权重表最后选取时间
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
//循环所有注册服务
for (Invoker<T> invoker : invokers) {
//获取服务id
String identifyString = invoker.getUrl().toIdentityString();
//获取服务对应的本地动态权信息
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
//获取权重,预热期返回预热权重,否则为原始权重
int weight = getWeight(invoker, invocation);
//新建本地动态权重信息
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
//是否为预热权重,预热情况更新权重
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
//每次选取调整对应的动态选择
long cur = weightedRoundRobin.increaseCurrent();
//更新最后选取时间,为什么不在increaseCurrent方法里面更新?
//入long cur = weightedRoundRobin.increaseCurrent(now);
weightedRoundRobin.setLastUpdate(now);
//获取最大权重服务
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
//相加计算总的权重
totalWeight += weight;
}
//移除过期的实例,默认60秒没访问移除
//调度器数和权重集合数不一致是,更新权重集合
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
//减少选中服务的动态权重值
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
// 没有选出调度器的时候返回第一个服务。
return invokers.get(0);
}
}
-
AbstractLoadBalance
package org.apache.dubbo.rpc.cluster.loadbalance;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import java.util.List;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
/**
* AbstractLoadBalance
*/
public abstract class AbstractLoadBalance implements LoadBalance {
/**
* Calculate the weight according to the uptime proportion of warmup time
* the new weight will be within 1(inclusive) to weight(inclusive)
* 计算预热期权重,最小为1
* warmWeight = uptime/(warmup/weight),
* 如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10
* @param uptime the uptime in milliseconds 上线时间
* @param warmup the warmup time in milliseconds 预热时间
* @param weight the weight of an invoker 原值权重
* @return weight which takes warmup into account
*/
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
/**
* Get the weight of the invoker's invocation which takes warmup time into account
* if the uptime is within the warmup time, the weight will be reduce proportionally
* 获取调用程序的调用权重,其中考虑了预热时间如果正常运行时间在预热时间内,则权重将按比例减少
* @param invoker the invoker
* @param invocation the invocation of this invoker
* @return weight
*/
int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
//请求时间
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
//处理时间,当前时间-invoker上线时间
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
//预热时间10分钟
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
//上线时间小于预热时间,返回预热中的权重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
//正常情况返回invoker权重
return Math.max(weight, 0);
}
}
效果
举例
a、b、c权重分别为2、7、1如下:
结果执行为,a2次,b7次,c1次,结果喜人。