简介
ElasticJob-Lite 是面向互联网生态和海量任务的分布式调度解决方案。
区别于传统定时任务,它设计的初衷是为了 面对高并发以及复杂的业务 ,即使是业务量大,服务器多的时候也能做好任务调度,尽可能的利用服务器的资源。
初识架构图
从下图可以看到,ElasticJob-Lite 是依赖ZooKeeper
的,如果主服务器挂了,会自动通过 ZooKeeper 的选举机制选举出新的主服务器,因此 ElasticJob-Lite 具有良好的扩展性和可用性。
这张架构图是 ElasticJob-Lite 的运行全貌。看上去比较复杂,我们将此图分解成几个小块,通过 简化步骤 来方便大家认识。首先是注册:
注册
定时任务 APP 启动时,向 ZooKeeper 发送信息,在 ZooKeeper 上建立一个包含App信息的实例节点
,这就是注册。
这里还有一个分片
的概念需要了解一下,分片数是一个配置。简单来说就是同一时间定时任务总共需要执行的次数,假设下图的 App1 和 App2 都已注册,定时任务每小时跑一次:
分片数是1
:根据定义同一时间定时任务总共需要执行1次,那么下午3点时,App1 可以运行定时任务,而 App2 无定时任务可运行。分片数是2
:App1 可以运行定时任务,App2 也可以运行定时任务。分片数是3
:App1 运行定时任务2次,App2 运行定时任务1次。
一般来说,App注册数=分片数+1
。假如想要定时任务只有一个 App 在跑,分片数设置为 1,那么建议注册2个 App。即定时任务只有 App1 在跑,App2 当做是 App1 的备份。当 App1 因为意外掉线,App2 无缝衔接运行定时任务。 那么分片
和注册
有什么关系,为什么要在注册的时候讲分片呢?
因为在 App 注册的时候,ZooKeeper 生成的节点里包含了分片信息。每当有一个新的 App 注册的时候,所有的节点中的分片信息都会更新,比如想要两台机器跑定时任务,那么分片数设置为 2:
- App1 启动,注册并生成节点,节点信息里写入0、1,即 App1 跑 0 号分片和 1 号分片,实现跑两次逻辑。
- App2 启动,注册并生成节点,两个节点重新分配分片。
- App1 节点写入 0,代表跑0号分片;App2 节点写入 1,跑 1 号分片,实现各跑一次的逻辑。
- 更多节点注册,分片按策略不断重新分配。
至此,我们已经认识了注册,再来看看第二块内容,ElasticJob-Lite 的核心:
监听
上面的注册讲的是 App/服务的注册,是比较粗的粒度,这里的监听是指由注册启动,针对生成节点的内容变化作出相应改变的行为
。刚才说的分片也有对应的分片监听,它监听到 App/服务数量的改变,开始根据既定策略开始给每个 App/服务分片。
监听是针对 ZooKeeper 节点的监听
。改变 ZooKeeper 节点的方式有多种,比如服务上线,比如在控制台修改配置等,都可以触发监听。但无论哪种方式,都会修改 ZooKeeper 节点信息,因此只要注册 ZooKeeper 监控其节点变化即可。
简单说明 ElasticJob-Lite 虚线框中,右边的执行流程。它是服务从上线到失效的过程,是上一节登录的详细过程:
- 在线注册:把服务信息写到 ZooKeeper 节点中
- 计划触发:定时任务放入 quartz 框架的 schedule 中,由它掌控定时任务的启停
- zk选主:调用 ZooKeeper 选主节点,主节点负责判断是否需要分片
- 是否分片:可以看到图中分了两块,左边为主节点判断需要分片,走分片流程,右边是主节点判断不需要分片,继续执行
- 执行定时任务
- 失效:服务下线,退出,zk 中的对应节点失效
当App/服务失效后,主节点触发分片任务,zk上还存活的某个节点,会接过下线服务的接力棒,跑失效节点应该执行的定时任务。若节点原先是备份节点(没有得到分片),那么此时得到一个分片;已有一个分片,那么它跑两个分片。就是注册时讲的分片内容。
ElasticJob-Lite虚线框中左边,listener包含了许多监听器,他们在注册时被添加,下面列举部分监听器:
- 选举监听 - ElectionListener
- 分片监听 - ShardingListener
- 服务器失效监听 - FailoverListener
- 下线监听 - ShutdownListener
- cron修改监听 - RescheduleListener,等等。
可以看出,ElasticJob 启动会把所有信息都注册到 ZooKeeper 节点上保存,节点的一举一动,代表着各服务发生的变化,都会触发在线的App/服务发生改变。
日志
服务在运行的时候会生成日志,日志有两个存放的地方:一个是日志文件、一个是数据库。对应下图,左边是事件保存到数据库(按需)、右边是执行日志放入日志文件。 我在理解这部分的时候,是有疑惑的,整理了相关资料后,可以看到我对这张图做了修改:
- 增加 Events 块图中 DB 说明
- 删除 ELK 虚线块
解释一下,Events 其实也是日志的一种,如果不写 DB,我会疑惑 Events 为什么和 Logs 并列,难以理解它的真实含义。用默认图标表示DB,这种描述风格跟全文完全不搭,其他都是在方块内写的清清楚楚的,为什么到这里就画风突变。不同的表述风格加上隐藏的含义让人很难注意到这个图标,无法准确理解分块的含义。
另一个叉掉 ELK,是我觉得ELK读取日志和 ElasticJob 完全没有关系,ElasticJob 运行完全不依赖 ELK,因此没有必要在架构图里显示出来。
控制台
控制台展示的信息可以分为两块,一块是读取 ZooKeeper 节点信息展示,一块是读取数据库中的 Events 展示。对应 Status 和 Events Statistics 指向的绿色箭头。
Operation 表示,通过 Console 控制台修改 ZooKeeper 中的节点信息,后台服务监听到修改后,作出相应的改变。
下图的修改作业界面就是 Operation 的一种,属于配置信息修改。是 Console 读取 ZooKeeper 信息之后展示出来的,修改页面信息将同步修改 ZooKeeper 节点信息,同时,后台定时任务也会作出相应改变。
代码分析
代码结构图
ElasticJob-Lite 分为前端代码和后台代码。前端负责响应页面点击事件,之后发送请求至后台,后台负责各种功能的实现。
而 ElaticJob-Lite 后台代码主要逻辑是在 ElasticJob-Lite 中的 ecosystem 包和 core 包中。ecosystem 有 ElasticJob Executor 执行器。而本次要介绍的启动的实现和监听的实现,都在 core 包中实现。如果要深入了解,可以从 core 包的 setUpFacade 入手。
启动DEMO
启动前需要确认环境:
- Java8 或 更高版本
- Maven 3.5.0 或 更高版本
- ZooKeeper 3.6.0 或 更高版本
下面是DEMO代码
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere. ElasticJob .api.JobConfiguration;
import org.apache.shardingsphere. ElasticJob .lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere. ElasticJob .reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere. ElasticJob .reg. ZooKeeper . ZooKeeper Configuration;
import org.apache.shardingsphere. ElasticJob .reg. ZooKeeper . ZooKeeper RegistryCenter;
import org.apache.shardingsphere. ElasticJob .tracing.api.TracingConfiguration;
public class MyJobDemo {
public static void main(String[] args) {
// 入口
new ScheduleJobBootstrap(createRegistryCenter(),new MyJob(),createJobConfiguration()).schedule();
}
public static CoordinatorRegistryCenter createRegistryCenter(){
CoordinatorRegistryCenter registryCenter = new ZooKeeper RegistryCenter(
new ZooKeeper Configuration("localhost:2181","my-job"));
// 初始化
registryCenter.init();
return registryCenter;
}
public static JobConfiguration createJobConfiguration() {
// 配置基本信息
JobConfiguration jobConfiguration = JobConfiguration.newBuilder("Myjob",3)
.cron("0 0/30 * * * ?").shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
.jobErrorHandlerType("LOG").jobShardingStrategyType("ROUND_ROBIN").overwrite(true).failover(true)
.monitorExecution(true).build();
// 配置数据库日志 - 选配
TracingConfiguration tc = new TracingConfiguration("RDB",getDataSource());
jobConfiguration.getExtraConfigurations().add(tc);
return jobConfiguration;
}
public static BasicDataSource getDataSource(){
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName(com.mysql.cj.jdbc.Driver.class.getName());
dataSource.setUrl("jdbc:mysql://localhost:3306/batch_log?useUnicode=true&useSSL=false&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=GMT%2B8");
dataSource.setUsername("root");
dataSource.setPassword(你的密码);
return dataSource;
}
}
Maven依赖如下:
<properties>
<java.version>1.8</java.version>
<latest.release.version>3.0.0</latest.release.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere. ElasticJob </groupId>
<artifactId> ElasticJob-Lite-core</artifactId>
<version>${latest.release.version}</version>
</dependency>
</dependencies>
ZooKeeper 中的节点
ElasticJob-Lite 的运行时注册和监听 ZooKeeper 节点的一个过程,那么,节点都有哪些呢?下面是一些主要节点的介绍:
-
/config:配置节点,保存分片数等信息。
-
/instances:实例节点,一个服务注册一个。/{jobInstanceId} 是服务自己生成的,如 /192.128.0.1@-@123 节点。节点保存实例 ID 和服务的 IP。
-
/leader:选举相关的节点,有选举、主持分片、主持失效转移的作用。比如需要分片了,就新增 /sharding/necessary 节点,分片完毕后,会删除此节点。/failover 失效转移同理。
-
/servers:服务节点,按服务 IP 生成。可以控制此 IP 下定时任务的开启和关闭。节点内写入 enabled,则此 IP 下的定时任务失效。
-
/sharding:分片节点,服务注册时会生成。比如分片数为 3,就生成 0、1、2 三个节点,分片数 4 就生成 4 个节点。假设启动 1 个服务,配置4个分片,则三个节点的 /instance 下 instanceId 都为同一个;2 个服务 4 个分片,则 /0、/1 节点下同一个实例 ID,/2、/3 节点下同一个实例 ID。即每次注册后,谁执行多少个分片就已经定好了。
-
job_instance_id:实例 ID,启动时生成,由
IP+@-@+随机数
组成,作用是唯一标识一个实例。
下图是我按层级列出来的 ZooKeeper 部分节点,/xxx 代表节点,黑色方块表示节点中的内容: /namespace/jobname:服务器启动时注册同一个 jobname 表示他们执行同一个定时任务。
监听代码
通过主要的启动代码,可以让我们更加了解 ElasticJob ,下面是注册启动信息
public void registerStartUpInfo(final boolean enabled) {
// 启动监听器
listenerManager.startAllListeners();
// 使用curator选举Leader:/leader/election/latch
leaderService.electLeader();
// 服务持久化:初始化-创建父节点 /servers,已存在-事务性更新
serverService.persistOnline(enabled);
// 实例持久化:/instances
instanceService.persistOnline();
if (!reconcileService.isRunning()) {
// 调解冲突
reconcileService.startAsync();
}
}
我们看startAllListeners
一共启动了多少监听器:
public void startAllListeners() {
// 选举Listener
electionListenerManager.start();
// 分片Listener
shardingListenerManager.start();
// 故障转移Listener
failoverListenerManager.start();
// 监控执行
monitorExecutionListenerManager.start();
// 下线监听
shutdownListenerManager.start();
// 页面立即执行功能监听
triggerListenerManager.start();
// cron表达式修改监听
rescheduleListenerManager.start();
// 处理启动和竞争节点信息
guaranteeListenerManager.start();
// 处理连接信息:连接、重连
jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
分片实现
分片启动注册了两个监听器:
- 分片数变更
- 服务变更
@Override
public void start() {
// 分片数变更
addDataListener(new ShardingTotalCountChangedJobListener());
addDataListener(new ListenServersChangedJobListener());
}
分片数变更和服务变更触发的实现基本一致,我们来看一下分片数变更的实现逻辑:
class ShardingTotalCountChangedJobListener implements DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
// 修改/config下配置 && 分片数不为0
if (configNode.isConfigPath(event.getKey()) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
int newShardingTotalCount = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class).toJobConfiguration().getShardingTotalCount();
// 新分片数!=老分片数
if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
// 判断是否主节点,主节点才能执行
if (!leaderService.isLeaderUntilBlock()) {
return;
}
// 在/leader/sharding节点下新增/necessary节点
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
// 本地缓存分片数
JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
}
}
}
}
通过上述代码我们知道,分片不是实时的,而是通过新增节点来异步触发分片:
- 判断 ZooKeeper 触发的事件是否为配置修改、判断分片数 >0
- 判断是否主节点,只有主节点才能新增节点并执行分片
- 非主节点,返回;主节点,执行 4
- 在/leader/sharding 节点下新增 /necessary 节点,并在本地缓存分片数
监听器监听到修改后,只负责新增necessary
节点。定时任务启动前,会检查是否存在 /necessary 节点。如果存在,先分片、后执行。下面是分片的实现:
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
// 需要分片
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
// 主节点执行
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
// 如果存在正在执行作业中的分片,等待所有作业执行完毕
waitingOtherShardingItemCompleted();
// 获取配置
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
// 分片状态设置为执行中
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
// reset sharding节点
resetShardingInfo(shardingTotalCount);
/**
* 分片策略:AverageAllocationJobShardingStrategy default 平均分配
* OdevitySortByNameJobShardingStrategy 根据作业名的哈希值奇偶数决定IP升降序
* RoundRobinByNameJobShardingStrategy 根据作业名的哈希值对服务器列表进行轮转分片
*/
// add 根据负载粘性分配
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
/**
* 1.Check "/"
* 2.Create "/Myjob/Sharding/0/instances"
* "/Myjob/Sharding/2/instances"
* "/Myjob/Sharding/1/instances"
* 3.Delete "/Myjob/leader/sharding/necessary"
* 4.Delete "/Myjob/leader/sharding/processing"
*/
jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
- 主节点检查 /leader/.../necessary 节点,判断是否需要分片:不需要,返回;需要,执行下一步骤
- 等待其他定时任务执行完毕,再进行分片
- 获取配置,修改zk分片状态为执行中
- 封装对节点的操作。先删除原有 /sharding 下的所有节点,并新增新分片数节点。如分片数 1 修改为 2,则删除 /sharding/0,新增 /sharding/0 和 /sharding/1。
- 主节点选择分配策略,计算出哪些分片被哪些服务实例执行,将服务实例ID写入分片节点中,比如,在 /sharding/0/instance 节点写入服务实例的 ID。有三种策 - 略平均分配、IP 排序、RoundRobin,默认为平均分配。
- 事务执行:
- 删除原分片节点
- 新增分片节点
- 每个分片写入对应的执行服务的实例的 ID
- 删除 /Myjob/leader/sharding/necessary和/Myjob/leader/sharding/processing 节点
至此,整个分片流程执行完毕。
通过分析了分片功能的代码实现,我们初步了解了 ElasticJob-Lite 3.x 的监听、变更的流程。还有很多的功能,实现逻辑和分片大同小异,理解了分片,就能大致了解其实现。
ElasticJob-Lite-UI控制台
shardingsphere-ElasticJob 从 3.0.0-alpha 版本开始,将 Console 管理界面单独拆分出来,现有网上的很多教程都是基于 Console 未拆分出来的版本,下面教程是基于最新版 3.0.1 搭建的 ElasticJob 管理界面。
新的 Console 部署起来并不是非常简便,建议参考上述连接后进行部署。
推荐阅读
招贤纳士
政采云技术团队(Zero),一个富有激情、创造力和执行力的团队,Base 在风景如画的杭州。团队现有300多名研发小伙伴,既有来自阿里、华为、网易的“老”兵,也有来自浙大、中科大、杭电等校的新人。团队在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 [email protected]
微信公众号
文章同步发布,政采云技术团队公众号,欢迎关注