我们知道nacos是支持CP与AP
模型的,如果有小伙伴不知道AP CP代表啥含义,可以百度搜下CAP理论学习下。像我们常见的注册中心Eureka是AP模型,保证服务可用性的,Zookeeper 是CP模型,保证数据的一致性。我们之前介绍过nacos基于raft协议来保证数据的一致性,是CP模型,但是并不是说nacos存储的所有内容完全保证一致性也就是所谓的CP模型,我们还知道nacos注册中心存储节点信息支持临时数据与持久数据的,持久数据集群间的同步就是使用的CP模型,使用raft协议保证数据的一致性,但是临时数据使用的是AP模型,保证服务的可用性,临时数据集群间的同步就没有某种协议来保证一致性了,接下来我们就来分析下临时数据集群间是怎样同步的。
1.回顾服务注册原理
建议看下Nacos注册中心8-Server端(处理注册请求)这篇服务注册的文章,介绍的很清楚了,其实就是nacos客户端将服务的一些信息发送到nacos注册中心服务端,服务端收到消息后进行异步注册,所谓的异步注册就是先写到一个map(dataStore)中,写一个事件到内存队列中,到这nacos服务端就告诉客户端服务注册成功了(其实还需要将数据同步给集群的别的机器,不过也是异步的,往内存队列中塞个延时任务),但是这个时候,你进行服务发现并不会拿到刚才注册的服务,因为它并没有将服务注册信息放在对应的数据结构下面,它只是放到一个map+ 内存队列,后台一个线程专门从队列中取数据,然后放到对应的数据结构下面,这个时候服务发现才会拿到它,也就是服务发现是找到对应数据结构,找某个namespace,group的一些实例列表,而不是去map中查找,map中只能通过key来定位。这就是nacos处理临时数据服务注册的核心原理,当然还有一些通知机制,集群间同步。
2.临时数据集群间同步原理
服务注册的时候,将数据写到map+ 内存队列
后,nacos还干了一件事情,就是集群间的同步,说实话,它这个临时数据集群间同步有点麻烦了。
-
它有两个执行引擎,一个是延时任务的,一个是普通任务的。
- 延时任务引擎中有个map集合,专门用来存放任务(task),有个延时任务,每100ms扫描一下map中的任务,只要达到分发间隔时间的任务,就会获取对应任务的processor来执行任务。
- 普通任务执行引擎 有很多的worker,也就是个worker数组,每个worker中有个一个任务队列,每个worker中还有一个内部线程,专门从任务队列中获取任务,执行任务。 如图:
-
当我们服务注册的时候,会将服务信息存储到一个map中+ 内存队列事件,同时会进行集群同步
- 集群同步的时候,根据集群成员信息,封装任务,每个成员一个同步任务(除去自己)。
- 先将任务交给延时任务引擎,这个时候分发任务时间是1s(总共2s),它首先会被加到任务map中,然后定时任务每100ms扫描一下,看看任务有没有超过分发任务时间(1s),就这样定时任务走了10遍,这个任务才符合要求,获取对应的processor并执行,这个processor将任务封装,交给了任务执行引擎,这个时候,任务执行引擎会选择一个worker,将任务添加到worker对应的任务队列中。worker内部的线程不断从队列中获取任务,执行任务。 如图:
-
这个时候从队列中获取任务并执行,执行的时候,将任务封装一层,又交给了延时任务执行引擎,设置的任务分发时间为1s,这个时候任务分发时间才会达到默认的2s。
- 接着又会走一遍流程,不过在获取processor的时候,获取到的processor并不是之前的,而是延时任务执行引擎默认的一个processor,它又会封装一个task,交给任务引擎,任务引擎还是会交给同一个worker里面的队列,内部线程不停的从队列中获取任务并执行,这个时候任务执行就会找到发送网络请求的组件,组装请求信息,发送同步请求。如图:
3.源码分析
临时服务注册先调用DistroConsistencyServiceImpl 的put方法,将服务信息存储到map中 + 内存队列中,然后就是集群之间的同步了。
// put 往存储器中放入数据
@Override
public void put(String key, Record value) throws NacosException {
// 1.往本地里面存储 + 内存队列
onPut(key, value);
// 2.集群间同步
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
/// 延迟1s ,默认是2000 ms ,然后/2
globalConfig.getTaskDispatchPeriod() / 2);
}
复制代码
我们可以看到put方法干了2件事情,1是往本地map中存储+ 内存队列中塞事件,2是 集群间同步。
这里需要注意的是,它的延时时间间隔默认是2s ,然后它取了一半。 接下来我们看下DistroProtocol 的sync
方法
public void sync(DistroKey distroKey, DataOperation action, long delay) {
/// 遍历所有的成员,抛去自己
for (Member each : memberManager.allMembersWithoutSelf()) {
// 重新封装DistroKey ,然后将对方的地址封装进去
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 封装延时task
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
/// key是distroKeyWithTarget , value是distroDelayTask
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
复制代码
可以看到,遍历除了自己以外集群成员,封装延时任务,然后交给延时任务执行引擎来处理,给集群每个成员封装延时任务,将任务交给延时任务引擎来处理。
3.1 延时任务执行引擎
3.1.1 添加任务
// 添加任务
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
//先获取
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {// 如果存在的话,就合并任务
newTask.merge(existTask);
}
// 放入map中去
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
复制代码
可以看到延时任务引擎添加任务,先是从task中获取一下(这个task就是个ConcurrentHashMap
)有没有存在这个key,如果有的话,进行任务的合并,最后将合并好的任务添加到map中。 其实到这服务注册的主流程就结束了,可以将响应写会给服务注册客户端了。
3.1.2 执行任务
延时任务执行引擎在实例化的时候,创建了一个定时任务,每100ms执行一次。我们看下构造里面的这段代码。
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
/// 初始大小32
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
/// 100ms 执行一次
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
复制代码
可以看到延时任务执行引擎 构造方法中,先是创建了一个map,也就是task,存储任务的map。接着就是创建延时任务执行,也就是每100ms执行一次。
// 处理任务
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
// 处理任务
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
复制代码
看看它是怎样执行任务的。
protected void processTasks() {
// 获取所有的task
Collection<Object> keys = getAllTaskKeys();
// 遍历
for (Object taskKey : keys) {
//获取任务
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// 获取processor
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// 如果处理失败的话,就进行重试
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
// 重试
retryFailedTask(taskKey, task);
}
}
}
复制代码
其实很简单,就是遍历map中的kv对,然后根据key获取对应的processor,如果processor不是null的话,就调用process方法进行处理,如果失败的话,就进行重试。其实这里它匹配到的processor是DistroHttpDelayTaskProcessor
。我们看下DistroHttpDelayTaskProcessor 的process
方法
public boolean process(NacosTask task) {
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig,
distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction());
// 交给任务执行引擎
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask);
return true;
}
复制代码
封装一个DistroHttpCombinedKeyExecuteTask
,然后将这个task交给了任务执行引擎处理。
3.2 任务执行引擎
3.2.1 添加任务
任务执行引擎是DistroExecuteTaskExecuteEngine
类,它自个是啥也没有的,它的一些方法都是继承父类NacosExecuteTaskExecuteEngine
的, 我们看下父类的addTask方法。
@Override
public void addTask(Object tag, AbstractExecuteTask task) {
//获取processor
NacosTaskProcessor processor = getProcessor(tag);
if (null != processor) {
processor.process(task);
return;
}
// 通过hash 取模方式获取worker
TaskExecuteWorker worker = getWorker(tag);
// 使用worker执行
worker.process(task);
}
复制代码
根据任务获取processor,这里其实是没有的,连默认的都没有,所以走到了getWorker
方法获取worker,调用worker的process方法执行任务task。
// hash %的方式选择哪个worker执行
private TaskExecuteWorker getWorker(Object tag) {
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return executeWorkers[idx];
}
复制代码
这里就是根据tag的hashcode值从Worker 数组中获取Worker,这个Worker数组其实是在任务执行引擎实例化的时候创建的。
我们这里看下worker(TaskExecuteWorker
类)的process方法。
//处理任务
@Override
public boolean process(NacosTask task) {
if (task instanceof AbstractExecuteTask) {
//put任务
putTask((Runnable) task);
}
return true;
}
private void putTask(Runnable task) {
try {
// 将任务放到 队列中
queue.put(task);
} catch (InterruptedException ire) {
log.error(ire.toString(), ire);
}
}
复制代码
这里就是将任务task放到worker对象里面的任务队列中。
3.2.2 执行任务
其实在创建worker实例的时候,每个worker实例会有一个内存队列存储任务,然后还有一个内部线程从队列中获取任务,执行任务,我们看下这个内部线程是怎样执行任务的。
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
// 执行任务
@Override
public void run() {
while (!closed.get()) {
try {
// 1.从队列中取出任务
Runnable task = queue.take();
long begin = System.currentTimeMillis();
//2.执行任务
task.run();
// 任务耗时
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
log.warn("distro task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[DISTRO-FAILED] " + e.toString(), e);
}
}
}
}
复制代码
先是从队列中获取任务,接着就是调用任务的run方法来执行任务。
在3.1.2小节中,任务被封装成了DistroHttpCombinedKeyExecuteTask
对象,它是现实runnable接口的,我们直接看下它的run方法
@Override
public void run() {
try {
DistroKey newKey = new DistroKey(DistroHttpCombinedKey.getSequenceKey(),
DistroHttpCombinedKeyDelayTask.class.getSimpleName(), singleDistroKey.getTargetServer());
DistroHttpCombinedKeyDelayTask combinedTask = new DistroHttpCombinedKeyDelayTask(newKey, taskAction,
globalConfig.getTaskDispatchPeriod() / 2, globalConfig.getBatchSyncKeyCount());
combinedTask.getActualResourceKeys().add(singleDistroKey.getResourceKey());
distroDelayTaskExecuteEngine.addTask(newKey, combinedTask);
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Combined key for http failed. ", e);
}
}
复制代码
它这里又封装了一个DistroHttpCombinedKeyDelayTask
任务对象,然后设置了globalConfig.getTaskDispatchPeriod() / 2
任务分发间隔,又是一个/2操作,我们在最开始介绍同步的时候封装的那个task的分发间隔就是/2。这样正好能够组成一个完整的TaskDispatchPeriod
。
最后它又将任务交给了延时任务执行引擎,这个时候又会走3.1 延时任务执行引擎 小节的执行流程,延时任务执行引擎又会合并任务,将任务放到task 这个map中,然后有个延时任务,每100ms执行一次,正好到了第10次的时候,会将这个task取出来(因为这个分发周期是1s),会根据这个任务获取processor,这一次是匹配不到的,这个时候就会拿到默认的processor,也就是DistroDelayTaskProcessor
类对象,我们看下它的process方法。
@Override
public boolean process(NacosTask task) {
// 如果不是DistroDelayTask 直接返回
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
// 如果是change操作的话
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
// 封装同步改变任务
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
// 交给 worker执行
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
复制代码
它这里封装了一个DistroSyncChangeTask
任务对象,然后将这个任务对象交给了执行引擎,这个时候又走执行引擎那一套,获取worker,将任务添加到worker的队列中,worker有个内部线程,专门从队列中获取任务,执行任务,这个时候,就会执行到DistroSyncChangeTask 的run
方法。
3.3 发送同步请求
我们上一小节介绍 执行到了DistroSyncChangeTask 的run
方法 ,其实这里面就是封装参数,发送http请求的部分了。我们看下
//执行任务
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
// 资源类型
String type = getDistroKey().getResourceType();
// 这个要去DistroHttpRegistry#doRegister 方法去看,因为是在这个方法里面向compont注册的
DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
// 获取到真实数据,不过这个数据是被序列化了的
DistroData distroData = dataStorage.getDistroData(getDistroKey());
// type是change
distroData.setType(DataOperation.CHANGE);
// 获取通信组件
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
//进行发送数据
boolean result =transportAgent.syncData(distroData, getDistroKey().getTargetServer());
if (!result) {// 如果失败
// 处理失败的task
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
// 处理失败的task
handleFailedTask();
}
}
复制代码
这里就是获取到真实的数据,我们在那些任务中,流转的都是它的key,并没有将真实数据放进去,这个时候,就会去之前咱们说的那个map中根据key获取真实数据了。获取到真实数据之后,就会调用DistroTransportAgent 的syncData 方法进行同步数据。
@Override
public boolean syncData(DistroData data, String targetServer) {
// 发送数据
byte[] dataContent = data.getContent();
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
复制代码
调用了NamingProxy
的syncData
方法向集群其他节点发送数据,同步数据。
public static boolean syncData(byte[] data, String curServer) {
// 封装请求头
Map<String, String> headers = new HashMap<>(128);
headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
headers.put(HttpHeaderConsts.ACCEPT_ENCODING, "gzip,deflate,sdch");
headers.put(HttpHeaderConsts.CONNECTION, "Keep-Alive");
headers.put(HttpHeaderConsts.CONTENT_ENCODING, "gzip");
try {
//发送请求
RestResult<String> result = HttpClient.httpPutLarge(
/// /nacos/v1/ns/distro/datum
"http://" + curServer + ApplicationUtils.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ DATA_ON_SYNC_URL, headers, data);
if (result.ok()) {
return true;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.getCode()) {
return true;
}
throw new IOException("failed to req API:" + "http://" + curServer + ApplicationUtils.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.getCode() + " msg: "
+ result.getData());
} catch (Exception e) {
Loggers.SRV_LOG.warn("NamingProxy", e);
}
return false;
}
复制代码
可以很明显的看到,这里就是封装请求信息,设置请求头,调用 /nacos/v1/ns/distro/datum
这个url发送http请求进行数据同步。
4. 集群节点处理临时数据请求
介绍了服务注册临时数据在集群间是怎样同步的,临时数据节点之间的关系是平等的,也就是peer to peer,就像eureka同步机制差不多,某一台nacos实例收到注册请求,首先会写入本地map中,然后向内存队列中塞入一个事件,异步更新结构化存储数据,接着就是异步同步,默认延时是2s,分别发送同步请求给集群中每个实例。本文主要看看集群中的节点收到临时数据同步请求是怎样处理
。
我们知道同步请求的uri是/nacos/v1/ns/distro/datum
,我们直接找到DistroController
的onSyncDatum
方法即可(都是SpringMVC的@RequestMapping注解,很好找到):
@PutMapping("/datum")
public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {
for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {
if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
String serviceName = KeyBuilder.getServiceName(entry.getKey());
if (!serviceManager.containService(namespaceId, serviceName) && switchDomain
.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), entry.getValue());
distroProtocol.onReceive(distroHttpData);
}
}
return ResponseEntity.ok("ok");
}
复制代码
遍历这个map,判断这个key是不是临时数据的key(就是看看这个key是不是com.alibaba.nacos.naming.iplist.ephemeral.
开头的),如果是的话,通过key解析出来namespace,serviceName
这些东西。
接着就是调用serviceManager
看看有没有存储结构中有没有这个namespace+ serviceName
对应的数据结构。
如果没有的话,就创建一个空的(为啥是空的, 因为一会异步更新进去,就跟服务注册一样),结构如图所示。
这个结构化数据存储的作用就是查找方便,供服务发现等功能使用,这里我们就深入展开了。
最后就是DistroProtocol 的onReceive
方法,我们看下
public boolean onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
return dataProcessor.processData(distroData);
}
复制代码
通过resourceType 获取到了一个DistroDataProcessor
,调用processor的 processData 方法处理数据。这里我们也不绕弯子,实现类就是DistroConsistencyServiceImpl
,是唯一一个实现类。 我们看下DistroConsistencyServiceImpl的processData
方法。
@Override
public boolean processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
onPut(datum.key, datum.value);
return true;
}
复制代码
可以看到,这里反序列化数据,调用了onPut 方法,这个onPut 方法看过服务注册流程的小伙伴很熟悉,干了2件事
- 将数据存入
DataStore
(也就是我们常说的那个map)中。 - 将事件变更放入内存队列中(会有一个线程专门从内存队列中获取事件,处理事件)
public void onPut(String key, Record value) {
// 判断是否是临时
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {// 如果是临时
//封装datum
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
// 自增
datum.timestamp.incrementAndGet();
// 放到datastore中进行存储
dataStore.put(key, datum);
}
// 如果listener里面没有这个key的话 直接返回 ,这个玩意是发布订阅的东西,进行通知,这个玩意service在初始化的会添加进去
if (!listeners.containsKey(key)) {
return;
}
// 添加通知任务
notifier.addTask(key, DataOperation.CHANGE);
}
复制代码
可以对照着代码再来看看,1是将数据存入dataStore (map)中,2是将变更事件封装成一个任务,放到内存队列中。
在往后就是有个线程从内存队列中将事件取出来,然后处理事件,将变更key的数据从dataStore 中取出来,更新到对应的结构化数据结构中。
4.1 小节
本文主要介绍集群节点收到临时数据同步请求是怎样处理的,原理其实跟服务注册一样,还是先存入Datastore
(map)中,接着就是创建事件变更任务,放入内存队列中,这个时候就完事了,会有一个线程不断的从内存队列中将事件取到,然后进行处理,这里是会将变更事件取到,到datastore中取到对应key的数据,将数据刷到结构化数据结构中。