前言
不管是使用spring boot的方式集成nacos还是使用spring cloud alibaba方式集成或者是单独使用nacos。最终具体实现注册或者获取服务实例的方法都是使用的 nacos-client包,nacos-api只是定义了接口,具体的操作还是 nacos-client完成的。我们的项目中集成nacos在整个nacos的接入中是客户端的角色,而所有数据的存储以及维护是有单独的nacos server来实现的,nacos的客户端与服务端是通过http的形式来通讯的。本文只介绍nacos的注册整个流程的源码分析。
一、客户端实现
1、客户端注册入口对象
在nacos-client包中的naming目录下有个类NacosNamingService,它是实现了nacos-api中NamingService接口。其中包含的主要方法:
- public NacosNamingService(String serverList) ##构造函数,传入nacos server地址
- public void registerInstance(String serviceName, String ip, int port) ## 注册服务
- public void deregisterInstance(String serviceName, String ip, int port) ## 取消注册
- …其它的后面的章节进行讲解
2、构造函数做了什么
/** 构造函数 */
public NacosNamingService(String serverList) {
// nacos server地址
this.serverList = serverList;
// 初始化主要是读取系统的环境变量,读取namespace、日志名称、日志等级、缓存地址等信息
init();
// 订阅服务,当服务实例有变化时会触发通知
eventDispatcher = new EventDispatcher();
// 具体的与远程nacos server通讯的类
serverProxy = new NamingProxy(namespace, endpoint, serverList);
// 心跳服务,客户端要定时给nacos server发送心跳包
beatReactor = new BeatReactor(serverProxy);
// 更新服务的,获取服务的实例后,可以订阅服务,会定时拉取最新的实例列表
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir);
}
3、注册服务
注册的方法有三个,主要是进行了重载,且不同的版本可能略有差异,但是大体上还是相同的。
/** 注册实例 */
/** 当前看的这个版本可能相对旧一点,最新的版本是还有一个参数,是否添加心跳的参数,可以选择是否对当前的实例一直与nacos服务端进行心跳检测 */
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
// 创建添加心跳的对象
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(serviceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
// 添加心跳
beatReactor.addBeatInfo(serviceName, beatInfo);
// 具体实现注册
serverProxy.registerService(serviceName, instance);
}
4、心跳实现
下面是实现心跳的对象,它的主要原理还是开启一个线程,定时去通过初始化时的NamingProxy对象请求nacos server接口,发送http请求
class BeatProcessor implements Runnable {
@Override
public void run() {
try {
for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
// 如果是健康的则不进行发送心跳
if (beatInfo.isScheduled()) {
continue;
}
// 如果不是健康的则会触发发送心跳
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "Exception while scheduling beat.", e);
}
}
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
// serverProxy就是NamingProxy
long result = serverProxy.sendBeat(beatInfo);
// 发送心跳后,当前的服务服务实例则置为不健康状态,这样才能触发下一次心跳发送
beatInfo.setScheduled(false);
if (result > 0) {
clientBeatInterval = result;
}
}
}
5、注册服务实例
NamingProxy中包含了注册的逻辑,这里的注册其实比较简单,主要是对当前的服务实例的数据封装,然后请求到nacos server
public void registerService(String serviceName, Instance instance) throws NacosException {
LogUtils.LOG.info("REGISTER-SERVICE", "{} registering service {} with instance: {}",
namespaceId, serviceName, instance);
final Map<String, String> params = new HashMap<String, String>(8);
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
params.put("serviceName", serviceName);
params.put("clusterName", instance.getClusterName());
//通用的发送请求
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
6、通用发送http请求
上面说的心跳,实际上也在NamingProxy对象中实现
public long sendBeat(BeatInfo beatInfo) {
try {
LogUtils.LOG.info("BEAT", "{} sending beat to server: {}", namespaceId, beatInfo.toString());
// 封装请求参数
Map<String, String> params = new HashMap<String, String>(4);
params.put("beat", JSON.toJSONString(beatInfo));
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, namespaceId);
params.put("serviceName", beatInfo.getServiceName());
// 请求远程接口
String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
JSONObject jsonObject = JSON.parseObject(result);
if (jsonObject != null) {
return jsonObject.getLong("clientBeatInterval");
}
} catch (Exception e) {
LogUtils.LOG.error("CLIENT-BEAT", "failed to send beat: " + JSON.toJSONString(beatInfo), e);
}
return 0L;
}
所有的接口请求都在reqAPI方法中完成
/** 请求远程接口,传入请求接口方法、请求参数、nacos server服务地址,请求方式(post/get) */
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(Constants.REQUEST_PARAM_NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new IllegalArgumentException("no server available");
}
// 获取nacos服务列表,随机获取地址进行请求
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
return callServer(api, params, server, method);
} catch (Exception e) {
LogUtils.LOG.error("NA", "req api:" + api + " failed, server(" + server, e);
}
// 如果随机获取的服务地址没有请求成功,则尝试下一个地址
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried");
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, nacosDomain);
} catch (Exception e) {
LogUtils.LOG.error("NA", "req api:" + api + " failed, server(" + nacosDomain, e);
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried");
}
二、服务端实现
1、接收客户端注册请求
/**
* Register new instance.
*
* @param request http request
* @return 'ok' if success
* @throws Exception any error during register
*/
@CanDistro
@PostMapping("/instance")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String registerInstance(@RequestParam(defaultValue = "v2", required = false) String ver,
HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
// 选择版本,选择V1还是v2,做了兼容
getInstanceOperator(ver).registerInstance(namespaceId, serviceName, instance);
return "ok";
}
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 检测service是否存在,不存在则进行创建
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 获取service
Service service = getService(namespaceId, serviceName);
// 校验service是否存在,不存在则抛出异常
checkServiceIsNull(service, namespaceId, serviceName);
// 将传入的实例对象进行添加操作
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 获取service
Service service = getService(namespaceId, serviceName);
// 防止并发处理
synchronized (service) {
// 将实例添加到目标service的实例列表中,返回添加后的实例列表
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//处理一致性的实例数据,保证nacos server集群的数据一致性
consistencyService.put(key, instances);
}
}
这里比较熟悉的就是raft算法
三、总结
版本跨度比较大,实现的方式也有所不同,但是大致实现的方向是这个,建议自己追一下源码