发烧终于好了,有精力继续学习了。
今天学习Ribbon。因为zuul源码调试篇章学习过程中,学到route类型的filter时,发现当我们配置文件中使用serviceid作为route的配置时候,会使用RibbonRouteFilter,其中服务选择用的就是Ribbon。而且在学习feign的时候,注入的是LoadBalancerFeignClient,最终使用的也是Ribbion进行服务选择。
在第四章的时候,当时遗留了一个问题,服务间怎么传递http的header,当时发现构造request的时候,只构造了url、nethod等基本对象,没有传递header。在stack overflow上最终找到了答案,链接忘记保存了,具体的实现方法是实现自己的RequestInterceptor类即可。
@Component
public class MyRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
template.headers(getHeadersFromWherever());
}}
我们看下源码中的描述:
Zero or more {@code RequestInterceptors} may be configured for purposes such as adding headers to
* all requests. No guarantees are give with regards to the order that interceptors are applied.
* Once interceptors are applied, {@link Target#apply(RequestTemplate)} is called to create the
* immutable http request sent via {@link Client#execute(Request, feign.Request.Options)}. <
其实在我们学习feign时,已经学习到了Target,当时就没有深入去研究apply方法的作用。以后学习还是要仔细和深入。
向回答的大神致敬。
debug一下,我们发现在SynchronousMethodHandler执行executeAndDecode()的第一行就是构造header,
Request targetRequest(RequestTemplate template) {
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
return target.apply(new RequestTemplate(template));
}
最终执行的就是如文档描述的Target对象的apply方法。
我们可以看到经过我们自己写的Interceptor:
@Component
public class TemplateInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
Map<String,Collection<String>> headers = new HashMap<>();
Collection<String> head = new ArrayList<>();
head.add("liuyan");
headers.put("token",head);
template.headers(headers);
}
}
之后,到HardCodeTarget的apply方法后,request对象会有header:
题外话说完,继续学习Ribbon
我们从第四章学习到的LoadBalanceFeignClient入手,可以一直跟踪到LoadBalancerCommand中的selectServer()方法中,当时只学习到这里,我们继续往后跟踪,LoadBalancerContext的getServerFromLoadBalancer()方法中可以找到最终最终的根据负载均衡策略决定用哪个服务的接口是:
ILoadBalancer lb = getLoadBalancer();
我们可以看接口ILoadBalancer:
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
有这些方法
addServer就是初始化server列表,该方法也可以在后续的时间继续添加新的服务,包括同一个的服务,这样会改变均衡的权重。
chooseServer顾名思义就是选择一个要执行的服务。
markServerDown标记一个服务宕掉,这里看一下方法的描述
/**
* To be called by the clients of the load balancer to notify that a Server is down
* else, the LB will think its still Alive until the next Ping cycle - potentially
* (assuming that the LB Impl does a ping)
*
* @param server Server to mark as down
*/
这里意思是当一个服务不可用时,调用这个方法,但是只有当下一次ping周期结束后LoadBalancer才会真正的认为服务不可用,这里引入了一个新概念ping,后续我们会看到这个ping也是负载均衡重要的一部分。
getReachableServers获取所有可用的服务
getAllServers获取所有的服务,包括可用和不可用的,这也说明另一件事,服务不可用的时候LB是不会把宕掉的服务剔除,只是会标记一下,可见服务恢复后还是可以复活的。
该接口的一个最基础实现类是AbstractLoadBalancer,这个抽象类主要是新增了两个抽象方法:
getServerList(ServerGroup serverGroup):根据状态获取服务列表
getLoadBalancerStats():获取LB的相关统计信息
其中ServerGroup:
public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}
前者在初始化时判断服务是否可用,类似于防火墙之类的。后者初始化一些配置,例如负载均衡规则和ping的规则。
BaseLoadBalance的一个实现类是DynamicServerListLoadBalancer,它主要是可以动态的获取服务列表。
具体看这三个主要类的源码他们有三个重要的属性:
protected IRule rule = DEFAULT_RULE;
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;
我们挨个方法看:
首先是初始化server列表,我们可以看到正如前面描述,可以随时并且重复添加服务。
@Override
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Exception while adding Servers", name, e);
}
}
}
获取可用服务和所有服务,比较简单,分别维护了一个服务列表
@Override
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
根据状态获取服务,获取不可用服务时,就是从所有的服务列表中去除掉可用服务即可:
@Override
public List<Server> getServerList(ServerGroup serverGroup) {
switch (serverGroup) {
case ALL:
return allServerList;
case STATUS_UP:
return upServerList;
case STATUS_NOT_UP:
ArrayList<Server> notAvailableServers = new ArrayList<Server>(
allServerList);
ArrayList<Server> upServers = new ArrayList<Server>(upServerList);
notAvailableServers.removeAll(upServers);
return notAvailableServers;
}
return new ArrayList<Server>();
}
看到实现接口PrimeConnectionListener的方法只是简单返回了true:
@Override
public void primeCompleted(Server s, Throwable lastException) {
s.setReadyToServe(true);
}
选择服务可以看到是利用的IRule:
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
标记服务不可用:
public void markServerDown(Server server) {
if (server == null || !server.isAlive()) {
return;
}
logger.error("LoadBalancer [{}]: markServerDown called on [{}]", name, server.getId());
server.setAlive(false);
// forceQuickPing();
notifyServerStatusChangeListener(singleton(server));
}
调用的是:
private void notifyServerStatusChangeListener(final Collection<Server> changedServers) {
if (changedServers != null && !changedServers.isEmpty() && !serverStatusListeners.isEmpty()) {
for (ServerStatusChangeListener listener : serverStatusListeners) {
try {
listener.serverStatusChanged(changedServers);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server status change listener", name, e);
}
}
}
}
更新服务状态。
实时感知服务是否可用,是在创建LB时会启动一个定时任务,获取服务的状态,比较常规的heartbeat检测
class PingTask extends TimerTask {
public void run() {
try {
new Pinger(pingStrategy).runPinger();
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error pinging", name, e);
}
}
}
这几个方法里比较关键的就是chooseServer,其中涉及到了负载均衡策略;PingTask涉及到了服务可用性的维护;还有没有看的initWithNiwsConfig这个方法,初始化了负载均衡策略IRule和ping的策略IPing。
这些比较关键的明天深入研究。