这个是之前在项目中写的一个quartz调度,可实时启动,运行,添加job,现将代码及思想做简要笔记整理:
使用quartz +springboot
1:在数据库创建job运行调度表,创建脚本如下:
DROP TABLE IF EXISTS `manage_ip_schedule_task`;
CREATE TABLE `manage_ip_schedule_task` (
`id` char(36) NOT NULL COMMENT 'id',
`task_code` varchar(100) NOT NULL COMMENT '任务名称编码',
`task_name` varchar(100) NOT NULL COMMENT '任务名',
`task_conf` varchar(100) NOT NULL COMMENT '任务执行表达式',
`task_class` varchar(100) NOT NULL COMMENT '任务执行类',
`task_server_ip` varchar(100) NOT NULL COMMENT '任务执行的服务器',
`status` char(1) NOT NULL DEFAULT '1' COMMENT '任务状态0:启用;1:禁用',
`remark` varchar(250) NOT NULL COMMENT '备注',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='系统运行调度表';
-- ----------------------------
-- Records of manage_ip_schedule_task
-- ----------------------------
INSERT INTO `manage_ip_schedule_task` VALUES ('DEFAULT', 'DEFAULT', 'DEFAULT', '0/5 * * * * ?', 'com.*.*.config.quartz.scan.DefaultScanJob', '127.0.0.1', '1', 'DEFAULT');
默认加上一个启动job,用来扫描后续添加的job
.每5秒执行一次;
task_class 此处填入扫描的job类,敏感信息我再这里用了*来代替了;
task_server_ip 表示需要执行的调度任务的服务器
1:在pom.xml中引入quartz 包。
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>
springboot 的引入 和环境配置,在此处略去不提,自行找资料吧,骚年。
application.yml文件中配置quartz的启动扫描开关,有时候不需要启动,所以做了此开关
#是否启动quartz任务扫描器
quartz:
scan: false
2:容器启动时,从数据库加载此配置job 到内存,持久层采用的mybatis 。 具体代码如下
package com.*.*.config.quartz.listener;
import com.*.core.utils.StringUtils;
import com.*.*.config.quartz.service.ScheduleService;
import org.quartz.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
/**
* 定时任务扫描Listener
* moss
*/
@WebListener
@ConfigurationProperties(prefix = "quartz")
public class StartupListener implements ServletContextListener {
private static final Logger LOGGER = LoggerFactory.getLogger(StartupListener.class);
private String scan;
private ScheduleService scheduleService;
private static final String SCHEDULE_SERVICE_NAME = "scheduleService";
private static final String SCHEDULER_NAME = "scheduler";
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
}
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
try {
this.initial(servletContextEvent);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
ex.printStackTrace();
}
}
public String getScan() {
return scan;
}
public void setScan(String scan) {
this.scan = scan;
}
/**
* 用来启动定时器的
* @param servletContextEvent
*/
private void initial(ServletContextEvent servletContextEvent) {
Boolean isScan;
if (StringUtils.isEmpty(this.getScan())) {
isScan = true;
} else {
isScan = Boolean.parseBoolean(this.getScan());
}
if (isScan) {
ServletContext context = servletContextEvent.getServletContext();
ApplicationContext applicationContext = WebApplicationContextUtils.getWebApplicationContext(context);
this.scheduleService = (ScheduleService) applicationContext.getBean(SCHEDULE_SERVICE_NAME);
Scheduler scheduler = (Scheduler) applicationContext.getBean(SCHEDULER_NAME);
this.scheduleService.setScheduler(scheduler);
this.scheduleService.proceedSchedule();
}
}
}
3:编写刚刚在数据库里配置的默认扫描job
package com.*.*.config.quartz.scan;
import com.*.*.config.quartz.service.ScheduleService;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
public class DefaultScanJob implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultScanJob.class);
@Autowired
private ScheduleService scheduleService;
@Override
public void execute(JobExecutionContext arg0) throws JobExecutionException {
LOGGER.debug("starting default scan job...");
try {
this.scheduleService.proceedSchedule();
} catch (Exception e) {
throw new JobExecutionException(e);
}
}
}
4: ScheduleService 接口及实现类编写:
package com.*.*.config.quartz.service;
import org.quartz.Scheduler;
/**
* quartz service
* Created by gaokx on 2017/4/19.
*/
public interface ScheduleService {
/**
* 执行定时调用
*/
void proceedSchedule();
void setScheduler(Scheduler scheduler);
}
ScheduleServiceImpl实现类编写:
package com.*.*.config.quartz.service.impl;
import com.*.*.config.quartz.service.ScheduleService;
import com.*.*.config.quartz.util.ScheduleStatus;
import com.*.*.config.quartz.util.ScheduleTaskNetwork;
import com.*.*.manage.dao.ManageIpScheduleTaskDao;
import com.*.*.manage.entity.ManageIpScheduleTask;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
@Service("scheduleService")
public class ScheduleServiceImpl implements ScheduleService {
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleServiceImpl.class);
@Autowired
private ManageIpScheduleTaskDao manageIpScheduleTaskDao;
private Scheduler scheduler = null;
/**
* 暴露接口,唯一提供可调用的接口
*/
@Override
public void proceedSchedule() {
try {
this.proceed();
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
}
}
//
// 以下方法皆为 私有方法,请勿调用
//
private void proceed() throws Exception {
List<ManageIpScheduleTask> list = manageIpScheduleTaskDao.find();
if(null == list || list.isEmpty()) {
return;
}
Set<String> currentScheduleCodeSet = this.getCurrentScheduleCodeSet();
this.proceed(list, currentScheduleCodeSet);
}
/**
* 细节调用
* @param list
* @param currentScheduleCodeSet
* @throws Exception
*/
private void proceed(List<ManageIpScheduleTask> list, Set<String> currentScheduleCodeSet) throws Exception {
for(ManageIpScheduleTask scheduleTask : list) {
if(null == scheduleTask) {
continue;
}
if(ScheduleStatus.START.getValue().equals(scheduleTask.getStatus())) {//启用任务
if(!this.isTaskCodeExist(scheduleTask.getTaskCode(), currentScheduleCodeSet)) {//未在进程中
this.startTask(scheduleTask);
}
} else if (ScheduleStatus.STOP.getValue().equals(scheduleTask.getStatus())) {//停用任务
if(this.isTaskCodeExist(scheduleTask.getTaskCode(), currentScheduleCodeSet)) {//已在进程中
this.stopTask(scheduleTask);
}
}
}
}
/**
* 启用任务
* @param scheduleTask
* @throws Exception
*/
private void startTask(ManageIpScheduleTask scheduleTask) throws Exception {
if(!this.validate(scheduleTask)) {
return;
}
ScheduleNaming naming = new ScheduleNaming(scheduleTask);
Class<? extends Job> clazz = this.getJobClass(naming.getClassName());
JobDetail jobDetail = this.getJobDetail(clazz, naming.getJobName());
Trigger trigger = this.getTrigger(naming.getTriggerName(), naming.getScheduleConfiguration());
this.scheduler.scheduleJob(jobDetail, trigger);
}
/**
* 停用任务
* @param scheduleTask
* @throws Exception
*/
private void stopTask(ManageIpScheduleTask scheduleTask) throws Exception {
if(!this.validate(scheduleTask)) {
return;
}
ScheduleNaming naming = new ScheduleNaming(scheduleTask);
this.scheduler.unscheduleJob(TriggerKey.triggerKey(naming.getTriggerName(), Scheduler.DEFAULT_GROUP));
this.scheduler.deleteJob(JobKey.jobKey(naming.getJobName(), Scheduler.DEFAULT_GROUP));
}
/**
* 验证参数
* @param schedule
* @return
*/
private boolean validate(ManageIpScheduleTask schedule) {
if(null == schedule) {
return false;
} else if (null == schedule.getTaskCode()
|| null == schedule.getTaskName()
|| null == schedule.getTaskConf()
|| null == schedule.getTaskClass()
|| null == schedule.getTaskServerIp()) {
return false;
}
if (!isLocalIp(schedule.getTaskServerIp())) {
return false;
}
return true;
}
/**
* 必须返回 继承 Job 的 Class
* @param className
* @return
* @throws ClassNotFoundException
*/
@SuppressWarnings("unchecked")
private Class<? extends Job> getJobClass(String className) throws ClassNotFoundException {
return (Class<? extends Job>) Class.forName(className);
}
/**
* 获取 jobDetail
* @param clazz
* @param jobName
* @return
*/
private JobDetail getJobDetail(Class<? extends Job> clazz, String jobName) {
JobDetail jobDetail = newJob(clazz)
.withIdentity(jobName, Scheduler.DEFAULT_GROUP)
.build()
;
return jobDetail;
}
/**
* 获取 trigger
* @param triggerName
* @param conf
* @return
*/
private Trigger getTrigger(String triggerName, String conf) {
CronTrigger trigger = newTrigger()
.withIdentity(triggerName, Scheduler.DEFAULT_GROUP)
.withSchedule(CronScheduleBuilder.cronSchedule(conf))
.startNow()
.build();
return trigger;
}
/**
* 判断当前传入 ip 是否是 本机的 ip
* @param ip
* @return
*/
private boolean isLocalIp(String ip) {
if(null != ip && ip.length() >= 0) {
return ScheduleTaskNetwork.isExist(ip);
}
return false;
}
private boolean isTaskCodeExist(String taskCode, Set<String> set) {
return set.contains(taskCode);
}
private Set<String> getCurrentScheduleCodeSet() throws Exception{
Set<JobKey> jobKeySet = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(Scheduler.DEFAULT_GROUP));
Set<String> scheduleCodeSet = new HashSet<String>();
for(JobKey jobKey : jobKeySet) {
scheduleCodeSet.add(jobKey.getName());
}
return scheduleCodeSet;
}
/**
* 内部类,用来统一定义名称
*/
class ScheduleNaming {
private String taskCode = "taskCode";
private String className;
private String triggerName;
private String jobName;
private String scheduleConfiguration;
ScheduleNaming(ManageIpScheduleTask schedule) {
if(null != schedule.getTaskCode()) {
this.taskCode = schedule.getTaskCode();
}
this.className = schedule.getTaskClass();
this.triggerName = this.taskCode;
this.jobName = this.taskCode;
this.scheduleConfiguration = schedule.getTaskConf();
}
public String getClassName() {
return className;
}
public String getTriggerName() {
return triggerName;
}
public String getJobName() {
return jobName;
}
public String getScheduleConfiguration() {
return scheduleConfiguration;
}
}
@Override
public void setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}
}
4:动态注入bean,反射解析配置在数据库中的job
package com.*.*.config.quartz.util;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
/**
* 动态注入bean
* Created by gaokx
*/
public class ScheduleJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
ScheduleStatus 是枚举类主要是做启用,停用管理
public enum ScheduleStatus {
STOP("0", "停用"), START("1", "启用");
}
ScheduleTaskNetwork ip控制处理
package com.*.*.config.quartz.util;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
public class ScheduleTaskNetwork {
private final static ScheduleTaskNetwork instance = new ScheduleTaskNetwork();
private Set<String> ips = new HashSet<>();
/**
* 暴露的接口,可以允许获得已经复制过的 IP Set
*
* @return
*/
public static Set<String> getIPs() {
return new HashSet<String>(instance.ips);
}
/**
* 暴露的接口,可以直接判断传入的 ip 是否在本地 ip 中
*
* @param ip
* @return
*/
public static boolean isExist(String ip) {
return instance.ips.contains(ip);
}
/**
* 私有化构造函数,不能显示创建
*/
private ScheduleTaskNetwork() {
this.initiaNetworkInterfacce();
}
private void initiaNetworkInterfacce() {
try {
this.procceed();
} catch (Exception ex) {
ex.printStackTrace();
}
}
private void procceed() throws SocketException {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
NetworkInterface networkInterface = null;
Enumeration<InetAddress> inetAddresses = null;
InetAddress inetAddress = null;
while (networkInterfaces.hasMoreElements()) {
networkInterface = networkInterfaces.nextElement();
inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
inetAddress = inetAddresses.nextElement();
ips.add(inetAddress.getHostAddress());
}
}
}
}
说明
manageIpScheduleTaskDao.find(); 是从数据库查找配置job,mybatis 对应的代码 此处略过
设计思路:
从数据库查找与指定在当前服务器ip运行的job ,如果有 则加入到 Scheduler 中 运行
5:配置config, 在容器启动时注册
package com.*.*.config;
import com.ucsmy.eaccount.config.quartz.util.ScheduleJobFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.util.Properties;
/**
* quartz配置
* Created by gaokx
*/
@Configuration
public class QuartzConfig {
@Bean
public ScheduleJobFactory getScheduleJobFactory() {
return new ScheduleJobFactory();
}
@Bean(name = "scheduler")
public SchedulerFactoryBean getSchedulerFactoryBean() {
SchedulerFactoryBean factoryBean = new SchedulerFactoryBean();
// 注入自定义工厂类
factoryBean.setJobFactory(getScheduleJobFactory());
factoryBean.setQuartzProperties(quartzProperties());
return factoryBean;
}
public Properties quartzProperties() {
Properties properties = new Properties();
// 默认的jobFactory
properties.setProperty("org.quartz.scheduler.jobFactory.class", "org.quartz.simpl.SimpleJobFactory");
// 默认的线程池
properties.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
// 目前线程数量
properties.setProperty("org.quartz.threadPool.threadCount", "5");
return properties;
}
}