Dubbo源码解析之加权轮询策略RoundRobinLoadBalance

引言

最近要写一个根据版本分配权重的负载策略,因为是基于springcloud的策略,先参考了项目中的轮询策略即com.netflix.loadbalancer.RoundRobinRules,使用的是加权随机算法,较为原始。

后面参考dubbo中的加权轮询算法,使用特(不)殊(懂)算法使轮询更为合理,下面就要对这种算法进行分析吧。

逻辑

  • 使用本地权重表,根据调用情况动态调整。
  • 每次调用根据算法更新权重表,设置本地权重为本地所有权重加上配置权重,选出本地权重最大的服务,并设置它的本地权重减去本本轮总权重。
  • 权重表回收,删除1分内未被调用的实例
  • 预热期权重算法,预热期默认10分钟,warmWeight = uptime/(warmup/weight),如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10

源码

主要涉及RoundRobinLoadBalance和AbstractLoadBalance两个类。

名词:

  1. 原始权重:服务设置中的weight
  2. 动态权重:每次选取操作调整后的权重
  3. 动态权重总和:每次调整完后的所有服务动态权重总和
  4. 本地动态权重表:记录本地服务选取时的动态权重信息,每次调用选取算法都会更新。
  •  RoundRobinLoadBalance

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 轮询负载均衡策略
 * Round robin load balance.
 */
public class RoundRobinLoadBalance extends AbstractLoadBalance {
    //策略名称
    public static final String NAME = "roundrobin";
    //动态权重更新时间
    private static final int RECYCLE_PERIOD = 60000;

    //路由权重
    protected static class WeightedRoundRobin {
        private int weight;
        //动态权重
        private AtomicLong current = new AtomicLong(0);
        //最后选取时间
        private long lastUpdate;
        public int getWeight() {
            return weight;
        }
        public void setWeight(int weight) {
            this.weight = weight;
            current.set(0);
        }
        //每次选取操作增加原始权重
        public long increaseCurrent() {
            return current.addAndGet(weight);
        }
        //每次选中减去动态总权重
        public void sel(int total) {
            current.addAndGet(-1 * total);
        }
        public long getLastUpdate() {
            return lastUpdate;
        }
        public void setLastUpdate(long lastUpdate) {
            this.lastUpdate = lastUpdate;
        }
    }

    private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
    //更新锁
    private AtomicBoolean updateLock = new AtomicBoolean();
    
    /**
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     * 获取url对应的权重路由
     * 结构如下:
     * {
     *     "bike.get":{
     *         "url1": WeightedRoundRobin,
     *         "url2": WeightedRoundRobin,
     *         ...
     *     },
     *     "bike.update:{
     *          "url1": WeightedRoundRobin,
     *          "url2": WeightedRoundRobin,
     *          ...
     *     }
     * }
     * @param invokers
     * @param invocation
     * @return
     */
    protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        //获取url对应的权重路由
        Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        if (map != null) {
            return map.keySet();
        }
        return null;
    }

    /**
     * 根据动态权重表选取服务
     * @param invokers 实例列表
     * @param url 请求url 在这没啥用
     * @param invocation  请求调用信息
     * @param <T>
     * @return 选出的实例调度器
     */
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        //获取url对应的动态权重表
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
        //如果权重表为空,则新建
        if (map == null) {
            methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
            map = methodWeightMap.get(key);
        }
        //动态权重总和,用于计算更新动态权重
        int totalWeight = 0;
        //计算时动态权重最小值
        long maxCurrent = Long.MIN_VALUE;
        //当前时间,设置为动态权重表最后选取时间
        long now = System.currentTimeMillis();
        Invoker<T> selectedInvoker = null;
        WeightedRoundRobin selectedWRR = null;
        //循环所有注册服务
        for (Invoker<T> invoker : invokers) {
            //获取服务id
            String identifyString = invoker.getUrl().toIdentityString();
            //获取服务对应的本地动态权信息
            WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
            //获取权重,预热期返回预热权重,否则为原始权重
            int weight = getWeight(invoker, invocation);
            //新建本地动态权重信息
            if (weightedRoundRobin == null) {
                weightedRoundRobin = new WeightedRoundRobin();
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(identifyString, weightedRoundRobin);
            }
            //是否为预热权重,预热情况更新权重
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            //每次选取调整对应的动态选择
            long cur = weightedRoundRobin.increaseCurrent();
            //更新最后选取时间,为什么不在increaseCurrent方法里面更新?
            //入long cur = weightedRoundRobin.increaseCurrent(now);
            weightedRoundRobin.setLastUpdate(now);
            //获取最大权重服务
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            //相加计算总的权重
            totalWeight += weight;
        }
        //移除过期的实例,默认60秒没访问移除
        //调度器数和权重集合数不一致是,更新权重集合
        if (!updateLock.get() && invokers.size() != map.size()) {
            if (updateLock.compareAndSet(false, true)) {
                try {
                    // copy -> modify -> update reference
                    ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                    newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
                    methodWeightMap.put(key, newMap);
                } finally {
                    updateLock.set(false);
                }
            }
        }
        //减少选中服务的动态权重值
        if (selectedInvoker != null) {
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        // 没有选出调度器的时候返回第一个服务。
        return invokers.get(0);
    }

}
  • AbstractLoadBalance

package org.apache.dubbo.rpc.cluster.loadbalance;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import java.util.List;

import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WARMUP;
import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_WEIGHT;
import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;

/**
 * AbstractLoadBalance
 */
public abstract class AbstractLoadBalance implements LoadBalance {
    /**
     * Calculate the weight according to the uptime proportion of warmup time
     * the new weight will be within 1(inclusive) to weight(inclusive)
     * 计算预热期权重,最小为1
     * warmWeight = uptime/(warmup/weight),
     * 如20权重服务,在启动5分钟时的预热权重 = 5/(10/20) = 5/0.5=10
     * @param uptime the uptime in milliseconds  上线时间
     * @param warmup the warmup time in milliseconds 预热时间
     * @param weight the weight of an invoker 原值权重
     * @return weight which takes warmup into account
     */
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
        int ww = (int) ( uptime / ((float) warmup / weight));
        return ww < 1 ? 1 : (Math.min(ww, weight));
    }

    @Override
    public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        if (invokers.size() == 1) {
            return invokers.get(0);
        }
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);


    /**
     * Get the weight of the invoker's invocation which takes warmup time into account
     * if the uptime is within the warmup time, the weight will be reduce proportionally
     * 获取调用程序的调用权重,其中考虑了预热时间如果正常运行时间在预热时间内,则权重将按比例减少
     * @param invoker    the invoker
     * @param invocation the invocation of this invoker
     * @return weight
     */
    int getWeight(Invoker<?> invoker, Invocation invocation) {
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
        if (weight > 0) {
            //请求时间
            long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                //处理时间,当前时间-invoker上线时间
                long uptime = System.currentTimeMillis() - timestamp;
                if (uptime < 0) {
                    return 1;
                }
                //预热时间10分钟
                int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
                //上线时间小于预热时间,返回预热中的权重
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight((int)uptime, warmup, weight);
                }
            }
        }
        //正常情况返回invoker权重
        return Math.max(weight, 0);
    }
}

效果

举例

a、b、c权重分别为2、7、1如下:

结果执行为,a2次,b7次,c1次,结果喜人。

猜你喜欢

转载自blog.csdn.net/lizz861109/article/details/109517749