最近系统在使用esjob进行定时任务管理,现将接入过程分享给大家:
引入依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.2</version>
</dependency>
新建spring-mvc-servlet.xml
简单任务
指定分片数后,当分片数大于机器数量的时候,每台机器分配到的片数会是平均的,例如:第一片是从0开始的,比如总共分6片,有两台机器,则第一台机器会分得0,1,2三片,而第二台机器会分得3,4,5三片;当有机器宕机了或者有新机器加入的时候都会触发重新分片。如果有多台机器,而分片总数是1的时候即相当于1主多从的配置。sharding-item-parameters用于指定与分片对应的别名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"job-sharding-strategy-class:可以通过它来指定作业分片策略,可选策略可参考官方文档http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/。编写任务
package com.el.test.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.el.test.model.Name;
import com.el.test.service.NameServiceBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 简单作业:
* 指定分片数后,当分片数大于机器数量的时候,每台机器分配到的片数会是平均的,
* 例如:第一片是从0开始的,比如总共分6片,有两台机器,
* 则第一台机器会分得0,1,2三片,而第二台机器会分得3,4,5三片;
* 当有机器宕机了或者有新机器加入的时候都会触发重新分片。
* 如果有多台机器,而分片总数是1的时候即相当于1主多从的配置。
* sharding-item-parameters用于指定与分片对应的别名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"
* job-sharding-strategy-class:可以通过它来指定作业分片策略,
* 可选策略可参考官方文档http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/。
*/
public class TestJob implements SimpleJob {
@Autowired
private NameServiceBean nameServiceBean;
/**
* 具体执行逻辑,包含根据分片信息获取数据与业务逻辑处理
* @param shardingContext 分片信息
*/
@Override
public void execute(ShardingContext shardingContext) {
System.out.println("SpringSimpleJob 简单任务-------------任务名:"+shardingContext.getJobName()+"\n"
+",---ShardingParameter:"+shardingContext.getShardingParameter()+"\n"
+",----TaskId:"+shardingContext.getTaskId()+"\n"
+",----JobParameter:"+shardingContext.getJobParameter()+"\n"
+",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
+",----ShardingTotalCount:"+shardingContext.getShardingTotalCount()+"\n"
);
HashMap parm = new HashMap();
List<Name> shardingList=nameServiceBean.list(parm);
for(Name oneObj : shardingList){
System.out.println(
"SpringSimpleJob 简单任务-------------id为:"+oneObj.getId()+"\n" +
",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
);
}
}
}
配置xml
<reg:zookeeper id="regCenter" server-lists="192.168.0.79:2181"
namespace="el-esjob-test-xvshu"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="2000"
max-retries="3"/>
<!--esjob 默认启动系统cpu核心数*2的线程操作数据,允许系统自定义executor-service-handle来操作具体核心数
默认实现com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler
public ExecutorService createExecutorService(String jobName) {
return (new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2)).createExecutorService();
}
-->
<!--job:simple
id: job的名称,一旦定义完后不可更改,更改后会认为一个新的job
class: job的具体实现类
registry-center-ref: 使用的注册中心,regCenter不用更改
sharding-total-count: 总的分片数(如果配置成1,则部署多个节点只有一个节点运行定时job,如果此节点出问题(非业务问题),则此次触发会转移到其他节点上)
sharding-item-parameters: 用于指定与分片对应的别名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"
cron: job执行的时间表达式 ,Quartz格式
monitor-execution: 是否监控
failover: 是否失败转移
description: job的描述信息
disabled: 是否禁用
overwrite: 是否覆盖zk中的配置(以zk的为准还是以本地的为准)
-->
<job:simple id="TestJobS"
class="com.el.test.job.TestJob"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
description="测试的简单job"
overwrite="true"
/>
流式任务
每次调度触发的时候都会先调fetchData获取数据,如果获取到了数据再调度processData方法处理数据。DataflowJob在运行时有两种方式,流式的和非流式的,通过属性streamingProcess控制,如果是基于SpringXML的配置方式则是streaming-process属性,boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了重新分片才会停止。
core code:com.dangdang.ddframe.job.executor.type.DataflowJobExecutor
DataflowJobExecutor
protected void process(ShardingContext shardingContext) {
DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration)this.getJobRootConfig().getTypeConfig();
if (dataflowConfig.isStreamingProcess()) {
this.streamingExecute(shardingContext);
} else {
this.oneOffExecute(shardingContext);
}
}
private void streamingExecute(ShardingContext shardingContext) {
for(List data = this.fetchData(shardingContext); null != data && !data.isEmpty(); data = this.fetchData(shardingContext)) {
this.processData(shardingContext, data);
if (!this.getJobFacade().isEligibleForJobRunning()) {
break;
}
}
}
private void oneOffExecute(ShardingContext shardingContext) {
List<Object> data = this.fetchData(shardingContext);
if (null != data && !data.isEmpty()) {
this.processData(shardingContext, data);
}
}
编写任务
package com.el.test.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.el.test.model.Name;
import com.el.test.service.NameServiceBean;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.List;
/**
* 流式作业:
* 每次调度触发的时候都会先调fetchData获取数据,
* 如果获取到了数据再调度processData方法处理数据。
* DataflowJob在运行时有两种方式,流式的和非流式的,
* 通过属性streamingProcess控制,如果是基于Spring XML的配置方式则是streaming-process属性,
* boolean类型。当作业配置为流式的时候,每次触发作业后会调度一次fetchData获取数据,
* 如果获取到了数据会调度processData方法处理数据,处理完后又继续调fetchData获取数据,
* 再调processData处理,如此循环,就像流水一样。直到fetchData没有获取到数据或者发生了
* 重新分片才会停止。
* core code:com.dangdang.ddframe.job.executor.type.DataflowJobExecutor
*
* protected void process(ShardingContext shardingContext) {
DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration)this.getJobRootConfig().getTypeConfig();
if (dataflowConfig.isStreamingProcess()) {
this.streamingExecute(shardingContext);
} else {
this.oneOffExecute(shardingContext);
}
}
private void streamingExecute(ShardingContext shardingContext) {
for(List data = this.fetchData(shardingContext); null != data && !data.isEmpty(); data = this.fetchData(shardingContext)) {
this.processData(shardingContext, data);
if (!this.getJobFacade().isEligibleForJobRunning()) {
break;
}
}
}
private void oneOffExecute(ShardingContext shardingContext) {
List<Object> data = this.fetchData(shardingContext);
if (null != data && !data.isEmpty()) {
this.processData(shardingContext, data);
}
}
*/
public class TestJobFlow implements DataflowJob<Name> {
@Autowired
private NameServiceBean nameServiceBean;
/**
* 获取数据(可循环调用,直至获取不到数据或重新分片)
* 例如:本片处理一万条数据,每次获取一千条,则或执行十次
* @param shardingContext 分片信息
* @return 需要处理的数据
*/
@Override
public List<Name> fetchData(ShardingContext shardingContext) {
System.out.println("SpringDataflowJob Dataflow类型作业-------------任务名:"+shardingContext.getJobName()+"\n"
+",---ShardingParameter:"+shardingContext.getShardingParameter()+"\n"
+",----TaskId:"+shardingContext.getTaskId()+"\n"
+",----JobParameter:"+shardingContext.getJobParameter()+"\n"
+",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
+",----ShardingTotalCount:"+shardingContext.getShardingTotalCount()+"\n"
);
HashMap parm = new HashMap();
List<Name> shardingList=nameServiceBean.list(parm);
return shardingList;
}
/**
* 拿到fetchData返回的list进行业务逻辑处理
* @param shardingContext 分片信息
* @param list 需要处理的数据
*/
@Override
public void processData(ShardingContext shardingContext, List<Name> list) {
if(list!=null){
for(Name oneObj : list){
System.out.println(
"SpringDataflowJob Dataflow类型作业-------------id为:"+oneObj.getId()+"\n" +
",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
);
}
}
}
}
配置xml
<reg:zookeeper id="regCenter" server-lists="192.168.0.79:2181"
namespace="el-esjob-test-xvshu"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="2000"
max-retries="3"/>
<!--esjob 默认启动系统cpu核心数*2的线程操作数据,允许系统自定义executor-service-handle来操作具体核心数
默认实现com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler
public ExecutorService createExecutorService(String jobName) {
return (new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2)).createExecutorService();
}
-->
<!--job:dataflow
id: job的名称,一旦定义完后不可更改,更改后会认为一个新的job
class: job的具体实现类
registry-center-ref: 使用的注册中心,regCenter不用更改
sharding-total-count: 总的分片数(如果配置成1,则部署多个节点只有一个节点运行定时job,如果此节点出问题(非业务问题),则此次触发会转移到其他节点上)
cron: job执行的时间表达式 ,Quartz格式
monitor-execution: 是否监控
failover: 是否失败转移
description: job的描述信息
disabled: 是否禁用
overwrite: 是否覆盖zk中的配置(以zk的为准还是以本地的为准)
streaming-process: 是否循环流式处理任务
-->
<job:dataflow id="TestJobF"
class="com.el.test.job.TestJobFlow"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
streaming-process="true"
description="测试的流式的Job"
/>
搭建运维平台
从git上down下代码,https://github.com/elasticjob/elastic-job-lite
本人有一套直接编译成功的,欢迎大家下载 【elastic-job-lite-console-2.1.2.tar.gz】
解压缩elastic-job-lite-console-2.1.2.tar.gz并执行bin\start.sh。打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号。
运维平台提供两种账户,管理员及访客,管理员拥有全部操作权限,访客仅拥有察看权限。默认管理员用户名和密码是root/root,访客用户名和密码是guest/guest,可通过conf\auth.properties修改管理员及访客用户名及密码。
主页如下:
添加zk
附录:
spring完整配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<reg:zookeeper id="regCenter" server-lists="192.168.0.79:2181"
namespace="el-esjob-test-xvshu"
base-sleep-time-milliseconds="1000"
max-sleep-time-milliseconds="2000"
max-retries="3"/>
<!--esjob 默认启动系统cpu核心数*2的线程操作数据,允许系统自定义executor-service-handle来操作具体核心数
默认实现com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler
public ExecutorService createExecutorService(String jobName) {
return (new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2)).createExecutorService();
}
-->
<!--job:simple
id: job的名称,一旦定义完后不可更改,更改后会认为一个新的job
class: job的具体实现类
registry-center-ref: 使用的注册中心,regCenter不用更改
sharding-total-count: 总的分片数(如果配置成1,则部署多个节点只有一个节点运行定时job,如果此节点出问题(非业务问题),则此次触发会转移到其他节点上)
sharding-item-parameters: 用于指定与分片对应的别名。sharding-item-parameters="0=A,1=B,2=C,3=D,4=E"
cron: job执行的时间表达式 ,Quartz格式
monitor-execution: 是否监控
failover: 是否失败转移
description: job的描述信息
disabled: 是否禁用
overwrite: 是否覆盖zk中的配置(以zk的为准还是以本地的为准)
-->
<job:simple id="TestJobS"
class="com.el.test.job.TestJob"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
description="测试的简单job"
overwrite="true"
/>
<!--job:dataflow
id: job的名称,一旦定义完后不可更改,更改后会认为一个新的job
class: job的具体实现类
registry-center-ref: 使用的注册中心,regCenter不用更改
sharding-total-count: 总的分片数(如果配置成1,则部署多个节点只有一个节点运行定时job,如果此节点出问题(非业务问题),则此次触发会转移到其他节点上)
cron: job执行的时间表达式 ,Quartz格式
monitor-execution: 是否监控
failover: 是否失败转移
description: job的描述信息
disabled: 是否禁用
overwrite: 是否覆盖zk中的配置(以zk的为准还是以本地的为准)
streaming-process: 是否循环流式处理任务
-->
<job:dataflow id="TestJobF"
class="com.el.test.job.TestJobFlow"
registry-center-ref="regCenter"
sharding-total-count="3"
cron="* 0/1 * * * ?"
failover="true"
streaming-process="true"
description="测试的流式的Job"
/>
</beans>
到这一步,接入esjob的任务已经基本成功了,下一步就是部署服务,进行job管理。