一、首先,我们来看一个问题:
使用quartz或者spring-task实现任务调度在集群环境下有什么问题?
答:
1、不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误
2、quartz的集群基于数据库的状态标记,涉及的表多达十多张,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。
二、一个好的任务调度解决方案应该能满足以下几点:
1、一致性(保证定时任务单独执行)
2、高可用性 (当前定时任务因外部原因执行失败后有替代方案)
3、定时任务分布式处理(并行处理)
而这些问题已经有前辈都考虑到了,提供了很多的解决方案
TBSchedule:由淘宝开源的分布式调度解决方案,目前好像没有维护了,文档较少
Elastic-Job:由当当网开源的分布式调度解决方案,文档丰富友好,上手简单
Elastic-Job官网地址:http://elasticjob.io/index_zh.html
三、Elastic-Job介绍
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker(TBD)的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API开发作业,开发者仅需一次开发,即可根据需要以Lite或Cloud的方式部署
四、Elastic-Job-Lite优势及特点:
-
分布式调度 Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。 注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能
-
作业高可用 Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。 一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
-
最大限度利用资源 Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。 例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
-
运维平台:提供web控制台用于管理作业
五、简单使用:
请先安装3.4.6版本以上的zookeeper
1、maven添加jar依赖
<!-- elastic-job start -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.0.0</version>
</dependency>
<!-- elastic-job end -->
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
2、elastic-job与spring整合配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists="172.18.7.133:2181" namespace="dd-job"
base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置作业-->
<job:simple id="mySimpleJob" class="com.muheda.sipm.task.TestTask" registry-center-ref="regCenter"
sharding-total-count="1" cron="0 0/5 * * * ?" overwrite="true" failover="true"/>
</beans>
server-lists:zookeeper地址
sharding-total-count为分片数量,设置为1的时候,作业只会被分配到第一台机器进行处理;官网建议此值应为物理机的倍数
①分片策略:
1)、基于平均分配算法的分片策略,也是默认的分片策略; 如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器
2)、根据作业名的哈希值奇偶数决定IP升降序算法的分片策略
3)、根据作业名的哈希值对服务器列表进行轮转的分片策略
②分片后台处理:
如果我们的数据库只有一个,而物理机设置有多台集群,怎么使用分片查询功能?这里有一个处理办法是,第三步中的定时任务业务类中可以通过shardingContext.getShardingTotalCount()和shardingContext.getShardingItem()取到总的分片数量和当前分片编号,我们在执行业务查询的时候可以在sql后面这样处理:
shardingContext.getShardingItem() = Id%shardingContext.getShardingTotalCount()
3、定时任务业务类:
package com.muheda.sipm.task;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.apache.log4j.Logger;
/**
* @Author: Sorin
* @Descriptions:
* @Date: Created in 2018/2/1
*/
public class TestTask implements SimpleJob {
private static Logger logger = Logger.getLogger(SimpleJob.class);
@Override
public void execute(ShardingContext shardingContext) {
logger.info(String.format("ShardingItem: %s | Thread: %s | %s",
shardingContext.getShardingItem(), Thread.currentThread().getId(), "SIMPLE"));
logger.info("定时任务测试");
}
}
4、使用不依赖与Spring的方式:
LoadElasticJob在项目启动后加载一次,里面加载job
@Component
public class LoadElasticJob implements InitializingBean {
private Logger logger = Logger.getLogger(this.getClass());
static int initBoolean = 0;
@Override
public void afterPropertiesSet() throws Exception {
if(initBoolean==0){
init();
}
initBoolean++;
}
public void init(){
logger.info("定时任务开始启动!");
CoordinatorRegistryCenter regCenter = ElasticJobUtils.setUpRegistryCenter();
// 店铺统计定时任务
ElasticJobUtils.jobShopStat(regCenter);
logger.info("定时任务启动成功!");
}
}
ElasticJobUtils中定义具体的job属性,相当于之前的xml中的bean.业务类是一样的,这里就不贴出来了
public class ElasticJobUtils {
private static final String ZOOKEEPER_URL = PropertiesUtil.getValue("ZOOKEEPER_URL");
private static final String JOB_NAMESPACE = PropertiesUtil.getValue("JOB_NAMESPACE");
public static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_URL, JOB_NAMESPACE);
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
/**
* @Descripton: 店铺统计定时任务
* @Author: Sorin
* @param regCenter
* @param jobEventConfig
* @Date: 2018/3/19
*/
public static void jobShopStat(final CoordinatorRegistryCenter regCenter){
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder(PropertiesUtil.getValue("job_shop_stat"), PropertiesUtil.getValue("core_shop_stat"), 1).build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, StatManageAction.class.getCanonicalName());
new JobScheduler(regCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build()).init();
}
}
六、运维平台
解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh。 打开浏览器访问 http://localhost:8899/ 即可访问控制台