一、Elastic-job 集群方式、任务分片、简单示例。
1、简单示例。
public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext context) { String jobName = context.getJobName(); String jobParameter = context.getJobParameter(); int shardingItem = context.getShardingItem(); int shardingTotalCount = context.getShardingTotalCount(); String shardingParameter = context.getShardingParameter(); LocalDateTime now = LocalDateTime.now(); long threadId = Thread.currentThread().getId(); String info = "time : [%s],threadId:[%s],jobName : [%s],jobParameter:[%s],shardingItem:[%s],shardingTotalCount:[%s]," +"shardingParameter: [%s]"; System.out.println(String.format(info, now, threadId, jobName, jobParameter, shardingItem, shardingTotalCount, shardingParameter)); } } |
public class SimpleJobLatch { private static final String zk_address = "39.100.116.73:2181"; public static void main(String[] args) { new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); } private static CoordinatorRegistryCenter createRegistryCenter() { CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(zk_address, "elastic-job")); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration() { // 创建作业配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob1", "0/15 * * * * ?", 3).build(); // 定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); // 定义Lite作业根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); return simpleJobRootConfig; } } |
注意:
1)Elastic-job集群通过zookeeper实现。任务节点上线下线自动感知。经测试当新节点动态上线,任务会重新分配分片。分片作用在下面一点解释。
2)在上面代码示例中,JobCoreConfiguration.newBuilder("demoSimpleJob1", "0/15 * * * * ?", 3).build();
这里的3 代表分片数。所谓分片就是任务的并行度。当job内部逻辑一致则3个分片执行相同任务。job内部可以获得当前分片数和总分片数,若按当前分片数将一个任务逻辑拆分。就是将一个复杂耗时任务分解成多个在不同分片执行。
分片策略详细:http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy/
二、Elastic-job的三种任务。SimpleJob(很像Quartz的任务)、DataflowJob(用于流式处理的任务)、ScriptJob(脚本任务)
1)SimpleJob.示例参见上面的示例即可。
2)DataflowJob.流式处理任务。
public class MyDataFlowJob implements DataflowJob { /** * 拉取数据,当拉取的数据为空则任务停止,注意不同分片逻辑 */ @Override public List fetchData(ShardingContext arg0) { /** * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30 */ return null; } /** * 上面fetch的数据,在这里处理 */ @Override public void processData(ShardingContext arg0, List list) { } } |
public class DataflowJobLatch { private static final String zk_address = "39.100.116.73:2181"; public static void main(String[] args) throws IOException { new JobScheduler(registryCenter(),configuration()).init(); } private static CoordinatorRegistryCenter registryCenter() { //配置zookeeper CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(zk_address, "elastic-job-script-demo")); registryCenter.init(); return registryCenter; } private static LiteJobConfiguration configuration(){ JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob", "0/30 * * * * ?", 10).build(); // 定义DATAFLOW类型配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), true); // 定义Lite作业根配置 LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build(); return dataflowJobRootConfig; } } |
3)ScriptJob脚本任务示例。
public class ScriptJobLatch { private static final String zk_address = "39.100.116.73:2181"; public static void main(String[] args) throws IOException { new JobScheduler(registryCenter(),configuration()).init(); } private static CoordinatorRegistryCenter registryCenter() { //配置zookeeper CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(zk_address, "elastic-job-script-demo")); registryCenter.init(); return registryCenter; } private static LiteJobConfiguration configuration() throws IOException { // 定义作业核心配置 JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("scriptElasticJob", "0/5 * * * * ?", 3).build(); ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(coreConfig, buildScriptCommandLine()); LiteJobConfiguration config = LiteJobConfiguration.newBuilder(scriptJobConfig).build(); return config; } private static String buildScriptCommandLine() throws IOException { //判断当前系统 if (System.getProperties().getProperty("os.name").contains("Windows")) { return Paths.get(ScriptJobLatch.class.getResource("/script/demo.bat").getPath().substring(1)).toString(); } Path result = Paths.get(ScriptJobLatch.class.getResource("/script/demo.sh").getPath()); Files.setPosixFilePermissions(result, PosixFilePermissions.fromString("rwxr-xr-x")); return result.toString(); } } |
三、Elastic-Job任务错过机制(misfire)与幂等机制(monitorExecution)
1)任务错过机制
如果ElasticJob为开启幂等(monitorExecution)的情况下,才会创建${namespace}/jobname/sharding/{item}/running,misfire机制才能生效。
2)幂等机制
elastic-Job提供了一个配置参数:monitorExecution=true,开启幂等性。比如任务5s执行一次,但是单次执行时间超过5s时
四、Elastic-job 任务增删改查、启停。若不使用自带的管理方式,则需再封装一层Task,且提供与Elastic-job转换的方式。
1)Elastic-job自带一个web管理任务。在elastic-job-lite-console中。若需自己管理则需如下任务管理。
2)任务新增。这里只展示新增SimpleJob的,可以根据类型按第二点使用
public void addJob(String jobName,String cron,int shard,Class jobClass){ JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobName, cron, shard).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName()); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); new JobScheduler(center, simpleJobRootConfig).init(); } |
3)任务停止、任务启动
public void stopJob(String nameSpace,String jobName){ JobOperateAPI jobOperateAPI = JobAPIFactory.createJobOperateAPI(zk_address,nameSpace, Optional.fromNullable(null)); jobOperateAPI.disable(Optional.of(jobName), Optional.<String>absent()); } public void startJob(String nameSpace,String jobName){ JobOperateAPI jobOperateAPI = JobAPIFactory.createJobOperateAPI(zk_address,nameSpace, Optional.fromNullable(null)); jobOperateAPI.enable(Optional.of(jobName), Optional.<String>absent()); } |
4)任务删除。
public void removeJob(String nameSpace,String jobName){ JobOperateAPI jobOperateAPI = JobAPIFactory.createJobOperateAPI(zk_address,nameSpace, Optional.fromNullable(null)); jobOperateAPI.remove(Optional.of(jobName), Optional.<String>absent()); } |
五、Elastic-job 持久化任务执行痕迹。既执行记录
public class SimleJobLatchJdbc { private static final String zk_address = "39.100.116.73:2181"; private static CoordinatorRegistryCenter center = createRegistryCenter(); public static void main(String[] args) throws InterruptedException { JobEventRdbConfiguration jdbcConfig = getJDBCConfig(); new JobScheduler(center, createJobConfiguration(), jdbcConfig).init(); } private static CoordinatorRegistryCenter createRegistryCenter() { CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(zk_address, "elastic-job")); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration() { JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob1", "0/3 * * * * ?", 3).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); return simpleJobRootConfig; } private static JobEventRdbConfiguration getJDBCConfig(){ return new JobEventRdbConfiguration(getDataSource()); } private static DataSource getDataSource(){ BasicDataSource datasource = new BasicDataSource(); datasource.setDriverClassName("com.mysql.cj.jdbc.Driver"); datasource.setUrl("jdbc:mysql://39.100.116.73:3306/back?createDatabaseIfNotExist=true&useUnicode=true&autoReconnect=true&characterEncoding=utf8&connectionCollation=utf8_general_ci&useSSL=false&serverTimezone=UTC"); datasource.setUsername("root"); datasource.setPassword("root"); return datasource; } } |
总结Elastic-job优点:
1)任务分片。可以将耗时任务分解成分片数量个任务,在多个节点(当分片数大于节点数则节点内多个线程上)执行。
2)任务失败重试。任务进行中节点宕机会在新节点分片上重试。
3)提供web任务管理.elastic-job-lite-console
4)支持节点动态上下线。且上下线后分片信息会刷新。
5)支持脚本任务、cron任务、流式处理任务。任务类型丰富。