1、PowerJob简单介绍
PowerJob(原OhMyScheduler)是全新一代分布式任务调度与计算框架。
上面的介绍是来自PowerJob官方文档,我的理解PowerJob是一个中间服务,用以管理多个其它应用中的定时任务、延迟任务等。官网介绍还可以使用Map/Reduce处理器进行分布式任务处理,这个暂时不了解。
2、PowerJob通信
PowerJob提供了一个独立部署的Server端,用以统一管理不同客户端(PowerJob中叫Worker)中的任务。这个就涉及到了Server端和Worker端的通信问题。在一个分布式系统中,Server端和Client端之间的通信引申出一个服务发现的概念。
在一般的服务发现模式中,有3个角色:
- 服务提供者:提供服务的一端,往往是提供出API供其它服务使用。
- 服务消费者:使用服务提供者服务的一端,也就是使用服务提供者API的一方。
- 注册中心:服务提供者到注册中心进行注册,注册信息一般包括服务名称和服务地址。然后服务消费者到注册中心,使用服务名称获取服务地址,这样服务消费者才能和服务提供者进行通讯。
使用服务发现可以保证分布式系统的高可用,而PowerJob中没有注册中心这个服务,那它是怎么实现上述功能的呢?
2.1 Worker获取Server地址
在一般的分布式系统中,服务提供者会在一开始就向注册中心进行注册,这样服务消费者才可以通过注册中心获取服务提供者的信息,使用服务提供者的服务。
在PowerJob中也是类似的,在Worker初始化的时候,也会执行类似的操作。
public void init() throws Exception {
//初始化Akka
...
// 服务发现
currentServer = ServerDiscoveryService.discovery();
if (StringUtils.isEmpty(currentServer) && !config.isEnableTestMode()) {
throw new RuntimeException("can't find any available server, this worker has been quarantined.");
}
log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer);
...
}
复制代码
上面是Worker的初始化源码,可以看到在Worker进行初始化的时候,也会执行一个服务发现的操作,下面继续查看源码。
public static String discovery() {
if (IP2ADDRESS.isEmpty()) {
OhMyWorker.getConfig().getServerAddress().forEach(x -> IP2ADDRESS.put(x.split(":")[0], x));
}
String result = null;
// 先对当前机器发起请求
//1、判断之前是否已确定一个可用的Server,如果之前已经确定了一个Server,就再验证这个Server是否还是可用的
String currentServer = OhMyWorker.getCurrentServer();
if (!StringUtils.isEmpty(currentServer)) {
String ip = currentServer.split(":")[0];
// 直接请求当前Server的HTTP服务,可以少一次网络开销,减轻Server负担
String firstServerAddress = IP2ADDRESS.get(ip);
if (firstServerAddress != null) {
result = acquire(firstServerAddress);
}
}
//2、如果之前没有可用的Server,依次判断Server数组中的Server,查找出可用的Server地址
for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {
if (StringUtils.isEmpty(result)) {
result = acquire(httpServerAddress);
}else {
break;
}
}
//3、如果没有找到可用的Server,说明当前Worker与外界失联,进行错误处理
if (StringUtils.isEmpty(result)) {
log.warn("[OmsServerDiscovery] can't find any available server, this worker has been quarantined.");
// 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
//错误处理
return null;
}else {
// 重置失败次数
FAILED_COUNT = 0;
log.debug("[OmsServerDiscovery] current server is {}.", result);
return result;
}
}
复制代码
OhMyWorker.getConfig().getServerAddress()
返回的是在Worker进行初始化的时候,配置的Server地址。
从上面源码看出,整个服务发现基本有3步:
- 1、之前已经找到一个可用的Server的话,判断这个Server是否还可用,可用的话返回这个Server
- 2、之前没有找到可用的Server(可能是第一次进行服务发现),依次判断Server数组(初始化配置的服务列表)中的Server,返回第一个可用的Server
- 3、没有找到可用的Server,说明当前Worker失联,进行错误处理
2.1.1 怎么判断Server是否可用
在上面代码中使用private static String acquire(String httpServerAddress)
判断Server是否可用,下面我们看下具体是如果验证的。
private static String acquire(String httpServerAddress) {
String result = null;
String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer());
try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
try {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
}catch (Exception ignore) {
}
}
return null;
}
复制代码
上面代码很简单,就是向需要验证的Server地址发起一个Http请求,请求参数有这个Worker对应的AppId。判断这个请求的返回数据,如果这个Server返回的数据是符合预期的,就说明这个Server可用。
resultDTO.getData()
的值是Server的地址和Server中Akka的端口号。
接下来看Server端是如何处理这个请求的。
public String getServer(Long appId) {
Set<String> downServerCache = Sets.newHashSet();
for (int i = 0; i < RETRY_TIMES; i++) {
// 无锁获取当前数据库中的Server
//1、从数据库中获取这个应用的信息,信息中会有这个应用绑定的Server地址
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
throw new OmsException(appId + " is not registered!");
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
//判断当前应用绑定的Server是否存活,如果存活就返回这个Server信息
if (isActive(originServer, downServerCache)) {
return originServer;
}
//2、如果Server不可用,就尝试将应用绑定的Server换成本机
// 无可用Server,重新进行Server选举,需要加锁
String lockName = String.format(SERVER_ELECT_LOCK, appId);
boolean lockStatus = lockService.lock(lockName, 30000);
if (!lockStatus) {
try {
Thread.sleep(500);
}catch (Exception ignore) {
}
continue;
}
try {
// 可能上一台机器已经完成了Server选举,需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
if (isActive(appInfo.getCurrentServer(), downServerCache)) {
return appInfo.getCurrentServer();
}
// 篡位,本机作为Server
appInfo.setCurrentServer(OhMyServer.getActorSystemAddress());
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerSelectService] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return appInfo.getCurrentServer();
}catch (Exception e) {
log.warn("[ServerSelectService] write new server to db failed for app {}.", appName);
}finally {
lockService.unlock(lockName);
}
}
throw new RuntimeException("server elect failed for app " + appId);
}
复制代码
处理操作主要有下面几个步骤:
- 1、使用请求参数appId,获取应用信息,验证应用信息中的Server是否可用,可用的话直接返回这个Server信息
- 2、这个Server不可用的话,执行Server选举。其实就是在分布式锁的控制下,每个Server将自身设置为这个应用的绑定Server。最后返回选出的Server。
回到这个小节的标题,如果判断某个Server可用。
private boolean isActive(String serverAddress, Set<String> downServerCache) {
...
ActorSelection serverActor = OhMyServer.getFriendActor(serverAddress);
try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
downServerCache.remove(serverAddress);
return response.isSuccess();
}catch (Exception e) {
log.warn("[ServerSelectService] server({}) was down.", serverAddress);
}
downServerCache.add(serverAddress);
return false;
}
复制代码
其实也很简单,就是向这个Server中的Akka服务发送Ping信号,能够得到正确的响应,就说明这个Server是可用的。
至此,我们梳理完了Worker获取对应App绑定的Server地址信息。这样Worker就可以向这个Server报告自身的一些情况了,比如使用心跳与Server保持连接。
3、Server拿到Woker地址
上面梳理了Worker拿到对应的Server地址,下面梳理下Server是如果获取到自身管理的Worker信息的。
在Worker获取了Server地址后,会使用Akka定时向Server发送心跳信息。心跳信息中包含本机的地址(IP:port),这个端口是本机Akka服务的端口;本机Worker对应的appName和appId;本机系统的资源使用情况等。
我们下面看下Server在接收到心跳信息后做了哪些工作。
/**
* 更新状态
* @param heartbeat Worker的心跳包
*/
public static void updateStatus(WorkerHeartbeat heartbeat) {
Long appId = heartbeat.getAppId();
String appName = heartbeat.getAppName();
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
clusterStatusHolder.updateStatus(heartbeat);
}
复制代码
主要是更新Worker的状态,其中类ClusterStatusHolder
是管理同一个Worker集群的类,内部维护了一个Map,存储集群中每个机器的状态。
接下来到clusterStatusHolder.updateStatus(heartbeat)
内部看下。
public void updateStatus(WorkerHeartbeat heartbeat) {
String workerAddress = heartbeat.getWorkerAddress();
long heartbeatTime = heartbeat.getHeartbeatTime();
Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L);
if (heartbeatTime < oldTime) {
log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
return;
}
address2ActiveTime.put(workerAddress, heartbeatTime);
address2Metrics.put(workerAddress, heartbeat.getSystemMetrics());
List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
if (!CollectionUtils.isEmpty(containerInfos)) {
containerInfos.forEach(containerInfo -> {
Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
infos.put(workerAddress, containerInfo);
});
}
}
复制代码
里面就是根据心跳时间和Worker机器的资源使用情况。
虽然这个代码比较简单,但是通过这个心跳Server拿到了Worker的地址,后面Server就可以使用这个地址向Worker发送执行任务的信息了。
4、总结
从上面的分析来看,Worker就是分布式系统中的服务提供者了,Server就是服务消费者,消费的内容就是Server管理的Worker的定时任务等。
我们一开始还说了一个注册中心,那PowerJob中的注册中心是哪个?也是Server,Worker通过发送心跳信息,将appName和app地址注册到Server中。
上面说到一个概念就是Worker绑定的Server,也就是一个Worker的任务管理都是由一个Server完成的,不管在这个分布式系统中有多少个Server实例。 这样做的好处是任务的分组和隔离,这个任务的分组和隔离到底有啥好处,我现在也有点糊涂,后面会出解析PowerJob任务调度相关的博客,希望到时候能理解。