【引言】
之前接触过用XXL-JOB来做分布式定时任务,本篇文章将介绍一个新的分布式定时任务解决方案——Elastic-Job。
【整体概述】
Elastic-Job是由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite使用jar包的形式提供分布式任务的协调服务,外部依赖仅Zookeeper。
整体架构图如下:
【核心理念】
1. 分布式调度
Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
2. 作业高可用
Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
3. 最大限度利用资源
Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
【快速入门】
- 构建SpringBoot项目,添加elastic-job依赖,完整依赖如下:、
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- elastic job -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
</dependencies>
- 添加配置文件,配置项目名称及zk地址
spring.application.name=springboot-elasticjob
zookeeper.serverList=localhost:2181
zookeeper.namespace=springboot-elasticjob
- 添加zookeeper注册中心配置,代码如下:
@Configuration
@ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0")
public class JobRegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${zookeeper.serverList}") final String serverList,
@Value("${zookeeper.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
- 添加任务抽象类AbstractJob,实现Elastic-Job中提供的接口SimpleJob,主要方法包括:启动任务、执行任务及获取任务配置,代码如下:
/**
* 启动任务,调用此方法,将启动任务调度
* @param zookeeperRegistryCenter 任务注册中心
*/
protected synchronized void start(ZookeeperRegistryCenter zookeeperRegistryCenter) {
// 保证一个任务只执行一次start
if (this.started){
return;
}
this.started = true;
SpringJobScheduler s = new SpringJobScheduler(this,
zookeeperRegistryCenter, getLiteJobConfiguration());
s.init();
}
@Override
public void execute(ShardingContext shardingContext) {
LOG.info(String.format("Thread ID:%s,任务总片数:%s," + "当前分片项:%s,当前参数:%s,"
+ "当前任务名称:%s,当前任务参数:%s", Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(), shardingContext.getJobParameter()));
executeJob(shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobParameter());
}
/**
* 执行任务
*
* @param shardingTotalCount
* 任务分片数
* @param shardingItem
* 当前分配序号
* @param jobParameter
* 当前分配任务参数
*/
public abstract void executeJob(Integer shardingTotalCount,
Integer shardingItem, String itemParameter, String jobParameter);
/**
* 获取任务配置
* @return
*/
protected LiteJobConfiguration getLiteJobConfiguration() {
return LiteJobConfiguration
.newBuilder(
new SimpleJobConfiguration(JobCoreConfiguration
.newBuilder(this.getClass().getName(),
this.cron, this.shardingTotalCount)
.shardingItemParameters(
this.shardingItemParameters)
.jobParameter(this.jobParameter).build(), this
.getClass().getCanonicalName()))
.overwrite(true).build();
}
- 添加第一个任务FirstJob,执行表达式、分片数量添加在配置文件中即可,代码如下:
@Component
public class FirstJob extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger("firstJob");
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public FirstJob(
@Value("${first.job.cron}") String cron,
@Value("${first.job.shardingTotalCount}") Integer shardingTotalCount,
@Value("${first.job.shardingItemParameters}") String shardingItemParameters,
@Value("${first.job.jobParameter}") String jobParameter) {
super(cron, shardingTotalCount, shardingItemParameters, jobParameter);
}
@PostConstruct
public void init() {
start(zookeeperRegistryCenter);
}
@Override
public void executeJob(Integer shardingTotalCount, Integer shardingItem, String itemParameter, String jobParameter) {
log.info("这是第一个简单的任务");
}
}
//配置内容:
//两秒执行一次
first.job.cron=0/2 * * * * ?
first.job.shardingTotalCount=2
- 本地启动zookeeper,启动项目,可在控制台看到任务执行结果如下:
完整代码地址: