前言
- 最近一直在研究Nacos开源框架的源码和执行流程原理
- 本次简单聊下AP集群架构下服务状态变动的同步原理
Nacos集群下服务状态变动同步方式
- 客户端服务实例信息在集群节点间同步任务是通过ServiceManager.init()方法,创建一个定时任务线程ServiceReporter执行。
@Component
public class ServiceManager implements RecordListener<Service> {
/**
* Map(namespace, Map(group::serviceName, Service))
* 这个map是nacos的注册表结构
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
......
/**
* Init service maneger.
*/
@PostConstruct
public void init() {
// 客户端服务实例信息在集群节点间同步
GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
......
}
......
}
复制代码
- ServiceReporter类首先是获取所有客户端实例信息,再获取所有节点信息,然后依次把客户端服务信息发送到其他nacos节点。
注:发起同步的nacos节点是做心跳健康检查的节点
private class ServiceReporter implements Runnable {
@Override
public void run() {
try {
// 获取客户端实例信息
Map<String, Set<String>> allServiceNames = getAllServiceNames();
if (allServiceNames.size() <= 0) {
//ignore
return;
}
for (String namespaceId : allServiceNames.keySet()) {
ServiceChecksum checksum = new ServiceChecksum(namespaceId);
for (String serviceName : allServiceNames.get(namespaceId)) {
// 发起同步的节点就是做心跳健康检查的节点
if (!distroMapper.responsible(serviceName)) {
continue;
}
Service service = getService(namespaceId, serviceName);
if (service == null || service.isEmpty()) {
continue;
}
service.recalculateChecksum();
checksum.addItem(serviceName, service.getChecksum());
}
Message msg = new Message();
msg.setData(JacksonUtils.toJson(checksum));
// 获取所有的集群节点信息
Collection<Member> sameSiteServers = memberManager.allMembers();
if (sameSiteServers == null || sameSiteServers.size() <= 0) {
return;
}
for (Member server : sameSiteServers) {
if (server.getAddress().equals(NetUtils.localServer())) {
continue;
}
// 把客户端服务信息发送到其他nacos节点
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
} finally {
GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
TimeUnit.MILLISECONDS);
}
}
}
复制代码
- 这里的发送http请求用的是ServiceStatusSynchronizer.send()方法
public class ServiceStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIP, Message msg) {
if (serverIP == null) {
return;
}
// 拼接参数和URL
Map<String, String> params = new HashMap<String, String>(10);
params.put("statuses", msg.getData());
params.put("clientIP", NetUtils.localServer());
String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";
if (IPUtil.containsPort(serverIP)) {
url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ "/service/status";
}
try {
// 发送同步请求
HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}",
serverIP);
}
}
@Override
public void onError(Throwable throwable) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);
}
}
}
复制代码
最后
- 虚心学习,共同进步-_-