# master 临时节点 值比如: 192.16.67.21@-@227032
/${JOB_NAME}/leader/electron/instance
# 具体的服务ip 多个实例同个ip,servers下只有1个ip 持久节点
# /${JOB_NAME}/servers/192.16.67.21
/${JOB_NAME}/servers/${ip}
# 具体的服务实例,1个服务1个实例 临时节点
# /${jobName}/instances/192.16.67.21-@1234
/${jobName}/instances/${instanceId}
LeaderService
设置主节点
@RequiredArgsConstructor
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) { // 当前无主节点 创建临时节点/${JOB_NAME}/leader/electron/instance 为当前节点 值为${jobInstanceId}
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
判断是否已经有主节点
/** 判断是否已经有主节点
* Judge has leader or not in current time.
*
* @return has leader or not in current time
*/
public boolean hasLeader() {
return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE); // /${JOB_NAME}/leader/electron/instance
}
ServerService
/**
* Persist online status of job server.
*
* @param enabled enable server or not
*/
public void persistOnline(final boolean enabled) {
if (!JobRegistry.getInstance().isShutdown(jobName)) { //如果当前jobName已经shutdown了 则 ${jobName}/servers/${ip} 的 值设置ENABLED或DISABLED
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name());
}
}
/** 是否有可用的服务器
* Judge has available servers or not.
*
* @return has available servers or not
*/
public boolean hasAvailableServers() {
List<String> servers = jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT); // /${jobName}/servers
for (String each : servers) {
if (isAvailableServer(each)) {
return true;
}
}
return false;
}
/** 当前ip是否是可用的
* Judge is available server or not.
*
* @param ip job server IP address
* @return is available server or not
*/
public boolean isAvailableServer(final String ip) {
return isEnableServer(ip) && hasOnlineInstances(ip); // 条件1: /${jobName}/servers/${ip} 的 值:ENABLED
}
private boolean hasOnlineInstances(final String ip) { //该ip是否有在线的服务实例
for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) { // /${jobName}/instances
if (each.startsWith(ip)) { // each 比如 192.16.67.21-@1234
return true;
}
}
return false;
}
/** /${jobName}/servers/${ip} 这个路径一定要存在,否则一直等候,一直取
* Judge is server enabled or not.
*
* @param ip job server IP address
* @return is server enabled or not true:enabled
*/
public boolean isEnableServer(final String ip) {
String serverStatus = jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip)); // /${jobName}/servers/${ip} 值:ENABLED
while (Strings.isNullOrEmpty(serverStatus)) { //返回是空值
BlockUtils.waitingShortTime(); //等候100ms
serverStatus = jobNodeStorage.getJobNodeData(serverNode.getServerNode(ip)); // 重新读取/${jobName}/servers/${ip}的值
}
return !ServerStatus.DISABLED.name().equals(serverStatus);
}
InstanceService
/** 临时节点 /${jobName}/instances/${instanceId} 值是空字符串
* Persist job online status.
*/
public void persistOnline() {
jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
}
/**
* Persist job instance.
*/
public void removeInstance() {
jobNodeStorage.removeJobNodeIfExisted(instanceNode.getLocalInstanceNode());
}
/**
* Clear trigger flag.
*/
public void clearTriggerFlag() {
jobNodeStorage.updateJobNode(instanceNode.getLocalInstanceNode(), "");
}
/** 获取当前jobname下所有可用的instance
* Get available job instances.
*
* @return available job instances
*/
public List<JobInstance> getAvailableJobInstances() {
List<JobInstance> result = new LinkedList<>();
for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) { //$jobName/instances
JobInstance jobInstance = new JobInstance(each);
if (serverService.isEnableServer(jobInstance.getIp())) {
result.add(new JobInstance(each));
}
}
return result;
}
/** 判断/$jobName/instances/${instanceId} 是否存在
* Judge is job instance existed or not in localhost.
*
* @return is job instance existed or not in localhost
*/
public boolean isLocalJobInstanceExisted() {
return jobNodeStorage.isJobNodeExisted(instanceNode.getLocalInstanceNode());
}