public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if(lb instanceof BaseLoadBalancer) { this.name = ((BaseLoadBalancer)lb).getName(); } this.initialize(lb); }构造方法无非调用setLoadBalancer,这里进行了loadBalancer的初始化。
void initialize(ILoadBalancer lb) { if(this.serverWeightTimer != null) { this.serverWeightTimer.cancel(); } this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true); this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval); WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight(); sw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name); WeightedResponseTimeRule.this.serverWeightTimer.cancel(); } })); }
这个方法很有意思,起了一个循环定时任务,默认时间是30s,调度的任务是DynamicServerWeightTask,它的run方法无非调用了serverWeight.maintainWeights(),也就是说循环构造一个Task来执行这个方法。maintainWeights无非计算出每个服务器的响应时间的权重,好给choose()提供选择服务器的依据。
class DynamicServerWeightTask extends TimerTask { DynamicServerWeightTask() { } public void run() { WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Exception var3) { WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3); } } }
然后,在设置定时任务之后,构造一个ServerWeight,调用maintainWeights(),我们来看下maintainWeights的具体实现吧
public void maintainWeights() { ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer(); if(lb != null) { if(WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) { try { WeightedResponseTimeRule.logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if(stats != null) { double totalResponseTime = 0.0D; ServerStats ss; for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) { Server server = (Server)var6.next(); ss = stats.getSingleServerStat(server); } Double weightSoFar = Double.valueOf(0.0D); List<Double> finalWeights = new ArrayList(); Iterator var20 = nlb.getAllServers().iterator(); while(var20.hasNext()) { Server serverx = (Server)var20.next(); ServerStats ssx = stats.getSingleServerStat(serverx); double weight = totalResponseTime - ssx.getResponseTimeAvg(); weightSoFar = Double.valueOf(weightSoFar.doubleValue() + weight); finalWeights.add(weightSoFar); } WeightedResponseTimeRule.this.setWeights(finalWeights); return; } } catch (Exception var16) { WeightedResponseTimeRule.logger.error("Error calculating server weights", var16); return; } finally { WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false); } } } }
这儿有一点很优雅的实现,由于计算权重需要保证线程安全,并且这里线程竞争不激烈,那么这里使用一个AtomicBoolean的变量,通过cas方式保证当前时刻该任务执行的唯一性。
先遍历服务器列表,并得到每个服务器的平均响应时间,遍历过程中对其求和,遍历结束后得到总响应时间totalResponseTime。
再一次遍历服务器列表,并将总响应时间totalResponseTime减去每个服务器的平均响应时间作为权重weight,再将这之前的所以权重求和,存到权重列表的对应该服务器的下标的位置上。
for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) { Server server = (Server)var6.next(); ss = stats.getSingleServerStat(server); } Double weightSoFar = Double.valueOf(0.0D); List<Double> finalWeights = new ArrayList(); Iterator var20 = nlb.getAllServers().iterator(); while(var20.hasNext()) { Server serverx = (Server)var20.next(); ServerStats ssx = stats.getSingleServerStat(serverx); double weight = totalResponseTime - ssx.getResponseTimeAvg(); weightSoFar = Double.valueOf(weightSoFar.doubleValue() + weight); finalWeights.add(weightSoFar); }然后将权重列表存入accumulatedWeights成员,便于choose()使用。我们来看下choose()
public Server choose(ILoadBalancer lb, Object key) { if(lb == null) { return null; } else { Server server = null; while(server == null) { List<Double> currentWeights = this.accumulatedWeights; if(Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if(serverCount == 0) { return null; } int serverIndex = 0; double maxTotalWeight = currentWeights.size() == 0?0.0D: ((Double)currentWeights.get(currentWeights.size() - 1)).doubleValue(); if(maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) { double randomWeight = this.random.nextDouble() * maxTotalWeight; int n = 0; for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); if(d.doubleValue() >= randomWeight) { serverIndex = n; break; } } server = (Server)allList.get(serverIndex); } else { server = super.choose(this.getLoadBalancer(), key); if(server == null) { return server; } } if(server == null) { Thread.yield(); } else { if(server.isAlive()) { return server; } server = null; } } return server; } }我们重点分析一次每次选择服务器部分的逻辑
int serverIndex = 0; double maxTotalWeight = currentWeights.size() == 0?0.0D:((Double)currentWeights.get(currentWeights.size() - 1)).doubleValue(); if(maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) { double randomWeight = this.random.nextDouble() * maxTotalWeight; int n = 0; for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); if(d.doubleValue() >= randomWeight) { serverIndex = n; break; } } server = (Server)allList.get(serverIndex);
先得到accumulatedWeights的最后一个元素,即权重列表中的最后一个元素(最大的值)maxTotalWeight,再通过乘以一个随机数,得到一个0到maxTotalWeight之间的一个randomWeight,然后遍历权重列表,当第一次遇到的权重列表的大于randomWeight的元素,其下标对应的服务器就是被选中的服务器。(仔细思考下就会发现,当响应时间越短,则当前权重跟之前的权重差越大,也就是说增量大,越容易超过随机值randomWeight,即被选中的概率也就越大)。
当然,如果当前策略,不成立,默认会选择父类的轮询策略。还会判断下服务器是否可用,否则重选。