版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
通过Quartz实现动态的管理任务的执行,通过Quartz将任务持久化到数据库中,在集群环境中确保只有单节点执行,当某一节点出故障后,会切换到另一个节点执行。
1 运行环境准备:
去官网下载Quartz的压缩包 将docs/dbTables/tables_mysql.sql下的sql文件在Mysql数据库执行。这是Quartz框架负责实现任务调度的自带数据库。
2 导入Jar包和quartz.properties属性配置:
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.2</version>
</dependency>
quartz.properties配置文件:
org.quartz.scheduler.instanceName=quartzScheduler
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.makeSchedulerThreadDaemon=true
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.makeThreadsDaemons=true
#线程数量
org.quartz.threadPool.threadCount=20
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
#线程优先级
org.quartz.threadPool.threadPriority=5
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#org.quartz.jobStore.dataSource = springTxDataSource.schedulerFactoryBean
org.quartz.jobStore.tablePrefix=QRTZ_
#加入集群
org.quartz.jobStore.isClustered=true
#容许的最大作业延
org.quartz.jobStore.misfireThreshold=25000
#调度实例失效的检查时间间隔
org.quartz.jobStore.clusterCheckinInterval: 5000
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.dataSource = qzDS
org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL = jdbc:mysql://localhost:3306/test?serverTimezone=UTC
org.quartz.dataSource.qzDS.user = root
org.quartz.dataSource.qzDS.password = root
org.quartz.dataSource.qzDS.maxConnections = 10
3 配置Quartz所需的bean:
package com.irootech.config.quartz;
import org.quartz.ee.servlet.QuartzInitializerListener;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.io.IOException;
import java.util.Properties;
@Configuration
public class SchedulerConfig {
// 配置文件路径
private static final String QUARTZ_CONFIG = "/quartz.properties";
@Autowired
private AutowiredSpringBeanJobFactory autoWiredSpringBeanToJobFactory;
/**
* 从quartz.properties文件中读取Quartz配置属性
* @return
* @throws IOException
*/
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource(QUARTZ_CONFIG));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/**
* JobFactory与schedulerFactoryBean中的JobFactory相互依赖,注意bean的名称
* 在这里为JobFactory注入了Spring上下文
*
* @param applicationContext
* @return
*/
@Bean
public JobFactory buttonJobFactory(ApplicationContext applicationContext) {
AutowiredSpringBeanJobFactory jobFactory = new AutowiredSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
/*
* quartz初始化监听器,项目启动的时候自动执行任务
*/
@Bean
public QuartzInitializerListener executorListener() {
return new QuartzInitializerListener();
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(autoWiredSpringBeanToJobFactory);
factory.setOverwriteExistingJobs(true);
factory.setAutoStartup(true);
// factory.setStartupDelay(0);
factory.setQuartzProperties(quartzProperties());
// factory.setDataSource(dataSource);
return factory;
}
}
4 Quartz的Job任务实例没有被Spring容器管理,所以当在Job实例对象中需要依赖注入的对象时,这个对象是为空的,比如我要在Job的实现类中调用业务层,那么通过@Autowired注入的UserService是null对象,解决方法是:通过继承SpringBeanJobFactory,在生成Job实例对象的的时候通过AutowireCapbaleBeanFactory将Job实例注入到Spring容器中去。
package com.irootech.config.quartz;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import org.springframework.stereotype.Component;
@Component
public class AutowiredSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware {
private AutowireCapableBeanFactory autowireCapableBeanFactory;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
autowireCapableBeanFactory=applicationContext.getAutowireCapableBeanFactory();
}
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object obj=super.createJobInstance(bundle);
autowireCapableBeanFactory.autowireBean(obj);
return obj;
}
}
5 工具类编写,调度任务:
package com.irootech.config.quartz;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import springfox.documentation.annotations.Cacheable;
import java.util.List;
@Component
public class QuartzManager {
@Autowired
SchedulerFactoryBean schedulerFactory;
private Scheduler scheduler = null;
/* private static SchedulerFactory schedulerFactory = new StdSchedulerFactory();
private Scheduler scheduler = null;*/
/**
* @Description: 添加一个定时任务
*
* @param jobName 任务名
* @param jobGroupName 任务组名
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param jobClass 任务
* @param cron 时间设置,参考quartz说明文档
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, String cron) {
try {
// 任务名,任务组,任务执行类
Scheduler scheduler = schedulerFactory.getScheduler();
JobDetail jobDetail= JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
// 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 创建Trigger对象
CronTrigger trigger = (CronTrigger) triggerBuilder.build();
// 调度容器设置JobDetail和Trigger
scheduler.scheduleJob(jobDetail, trigger);
// 启动
if (!scheduler.isShutdown()) {
scheduler.start();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 修改一个任务的触发时间
*
* @param jobName
* @param jobGroupName
* @param triggerName 触发器名
* @param triggerGroupName 触发器组名
* @param cron 时间设置,参考quartz说明文档
*/
public void modifyJobTime(String jobName,String jobGroupName, String triggerName, String triggerGroupName, String cron) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (trigger == null) {
return;
}
String oldTime = trigger.getCronExpression();
if (!oldTime.equalsIgnoreCase(cron)) {
System.out.println("任务:"+jobName+"被修改");
/** 方式一 :调用 rescheduleJob 开始 */
/* // 触发器
TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
// 触发器名,触发器组
triggerBuilder.withIdentity(triggerName, triggerGroupName);
triggerBuilder.startNow();
// 触发器时间设定
triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
// 创建Trigger对象
trigger = (CronTrigger) triggerBuilder.build();
// 方式一 :修改一个任务的触发时间
scheduler.rescheduleJob(triggerKey, trigger);*/
/** 方式一 :调用 rescheduleJob 结束 */
/** 方式二:先删除,然后在创建一个新的Job */
JobDetail jobDetail = scheduler.getJobDetail(JobKey.jobKey(jobName, jobGroupName));
Class<? extends Job> jobClass = jobDetail.getJobClass();
removeJob(jobName, jobGroupName, triggerName, triggerGroupName);
addJob(jobName, jobGroupName, triggerName, triggerGroupName, jobClass,cron);
/** 方式二 :先删除,然后在创建一个新的Job */
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description: 移除一个任务
*
* @param jobName
* @param jobGroupName
* @param triggerName
* @param triggerGroupName
*/
public void removeJob(String jobName, String jobGroupName,String triggerName, String triggerGroupName) {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
scheduler.pauseTrigger(triggerKey);// 停止触发器
scheduler.unscheduleJob(triggerKey);// 移除触发器
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));// 删除任务
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:启动所有定时任务
*/
public void startJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @Description:关闭所有定时任务
*/
public void shutdownJobs() {
try {
Scheduler scheduler = schedulerFactory.getScheduler();
if (!scheduler.isShutdown()) {
scheduler.shutdown();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 获取当前正在执行的任务
* @return
*/
public boolean getCurrentJobs(String name){
try {
Scheduler scheduler = schedulerFactory.getScheduler();
List<JobExecutionContext> jobContexts = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext context : jobContexts) {
if (name.equals(context.getTrigger().getJobKey().getName())) {
return true;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}
public Scheduler getScheduler() {
return scheduler;
}
public void setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}
/**
* @description :暂停任务
* @param jobName :
* @param jobGroupName :
* @return :void
* @date : 2019/10/16 14:15
*/
public void pauseJob(String jobName, String jobGroupName){
try {
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroupName));
} catch (SchedulerException e) {
throw new RuntimeException();
}
}
/**
* @description :恢复任务
* @param jobName :
* @param jobGroupName :
* @return :void
* @date : 2019/10/16 14:17
*/
public void resumeJob(String jobName, String jobGroupName){
try {
Scheduler scheduler = schedulerFactory.getScheduler();
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroupName));
} catch (SchedulerException e) {
throw new RuntimeException();
}
}
}