1. 为什么不用quartz
通过定时任务来进行计算,如果数量不多,可以轻易的用quartz来完成,如果用户量特别大,可能短时间内处理不完需要处理的数据。另外如果我们将job直接放在我们的webapp里,webapp通常是多节点部署的,这样,项目需要每隔一段时间执行某个定时任务,但是由于同时部署在多台机器上,因此可能会出现任务被执行多次,造成重复数据的情况,我们的job也就是多节点,造成了多个job同时执行,导致job重复执行,为了避免这种情况,我们可能多job的节点进行加锁,保证只有一个节点能执行,或者将job从webapp里剥离出来,独自部署一个节点。Elastic job是当当网架构师张亮,曹昊和江树建基于Zookepper、Quartz开发并开源的一个Java分布式定时任务,解决了Quartz不支持分布式的弊端。Elastic job主要的功能有支持弹性扩容,通过Zookepper集中管理和监控job,支持失效转移等,这些都是Quartz等其他定时任务无法比拟的。
2. 原理
elastic底层的任务调度还是使用quartz,通过zookeeper来动态给job节点分片,使用elastic-job开发的作业都是客户的,假如我们需要使用3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配0,1,2的任务片,比如server0-->0,server1-->1,server2-->2,当server0执行时,可以只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。当分片数为1时,在同一个zookepper和jobname情况下,多台机器部署了Elastic job时,只有拿到shardingContext.getShardingItem()为0的机器得以执行,其他的机器不执行当分片数大于1时,假如有3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。此时每台服务器可根据拿到的shardingItem值进行相应的处理,
举例场景:
假如job处理数据库中的数据业务,方法为:A服务器处理数据库中Id以0,1,2结尾的数据,B处理数据库中Id以3,4,5结尾的数据,C处理器处理6,7,8,9结尾的数据,合计处理0-9为全部数据
如果服务器C崩溃,Elastic Job自动进行进行失效转移,将C服务器的分片转移到A和B服务器上,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9
此时,A服务器处理数据库中Id以0,1,2,3,4结尾的数据,B处理数据库中Id以5,6,7,8,9结尾的数据,合计处理0-9为全部数据.
在上述基础上,如果我们增加server3,此时,server3分不到任务分片,因为任务分片只有3片,已经分完了,没有分到任务分片的程序不执行。如果server2挂了,那么server2的任务分片会分给server3,server3有了分片后就会执行。如果server3也挂了,框架会自动将server3的分片随机分给server0或server1,这种特性称之为弹性扩容,也就是elastic-job的由来。
elastic-job不支持单机多实例,通过zk的协调分片是以ip为单元的,如果通过单机多实例来试验,结果会导致分片和预期不一致,可以通过虚拟机模拟多台机器。
3. 作业类型
elastic-job 提供了三种类型的作业:simple,dataflow,script。script类型作业为脚本类型作业,支持shell,python等类型脚本。simple类型需要实现SimpleJob接口,未经过任何封装,和quartz原生接口相似。dataflow类型用于处理数据流,需要实现DafaflowJob接口,该接口提供了两个方法可以覆盖,分别用于抓取fetchData和处理processData数据。
4. 代码演示
1. 依赖
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
2.编写job
public class OrderStatisticsJob implements SimpleJob { private static final Logger log = LoggerFactory.getLogger(OrderStatisticsJob.class); OrdersService ordersSerivce = null; /** 读取配置(配置文件以后上分布式配置动态维护) **/ private void readConfig() { ordersSerivce = (OrdersService) ApplicationHelp.getBean("ordersService"); } synchronized public void start(int sharding) { } @Override public void execute(ShardingContext shardingContext) { // TODO Auto-generated method stub log.info("shardingContext:{}", shardingContext.getShardingItem()); readConfig(); start(1); } }
public class MyDataFlowJob implements DataflowJob<User> { @Override public List<User> fetchData(ShardingContext shardingContext) { List<User> users = null;//查询users from db return users; } @Override public void processData(ShardingContext shardingContext, List<User> data) { for (User user: data) { user.setStatus(1); //update user } } }
3. Spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd"> <!--配置作业注册中心 --> <reg:zookeeper id="regCenter" server-lists="localhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置作业--> <job:simple id="orderStatisticsJob" class="com.beta.cb.mall.task.job.OrderStatisticsJob" registry-center-ref="regCenter" sharding-total-count="2" cron="0/2 * * * * ?" /> <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" /> </beans>