Shows how the scheduler can interrupt your jobs and how to code your jobs to deal with interruptions
由于业务需要,中断Quartz中正在执行的任务,需要实现以下步骤:
- Job类需要实现InterruptableJob类,并实现interrupt()方法
- 在interrupt()方法中通常设定一个interrupted 变量为ture(默认时为false)实现从外界传递中断的指令。
- 在这个方法中进行了标记的改变,在execute()方法对这个标记判断,以实现中断任务,如:return,或抛出一个JobExecutionException异常
- 在调度器上调用方法:sched.interrupt(job.getKey()),它最终是执行Job中的interrupt()方法。
scheduler如何改变在Job中的interrupted 为true,实现原理:
1.执行sched.interrupt(job.getKey()),希望这个Job任务中断
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(7000L);
// tell the scheduler to interrupt our job
sched.interrupt(job.getKey());
} catch (Exception e) {
//
}
}
2.可以看到实际执行的是sched.interrupt(jobKey)
public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
return sched.interrupt(jobKey);
}
3.我再往下看interrupt(JobKey jobKey)的源码
可以注意到,其中有一句 ((InterruptableJob)job).interrupt();
相当真正执行的是job(实现了InterruptableJob)这个类实际的interrupt()中的代码。
而这个job对象是由我们每一步传入的,是我们要中断的Job,故这里相当于执行要中断的Job的interrupt()中的代码。
/**
* Interrupt all instances of the identified InterruptableJob executing in
* this Scheduler instance.
*
* <p>
* This method is not cluster aware. That is, it will only interrupt
* instances of the identified InterruptableJob currently executing in this
* Scheduler instance, not across the entire cluster.
* </p>
*
* @see org.quartz.core.RemotableQuartzScheduler#interrupt(JobKey)
*/
public boolean interrupt(JobKey jobKey) throws UnableToInterruptJobException {
List<JobExecutionContext> jobs = getCurrentlyExecutingJobs();
JobDetail jobDetail = null;
Job job = null;
boolean interrupted = false;
for(JobExecutionContext jec : jobs) {
jobDetail = jec.getJobDetail();
if (jobKey.equals(jobDetail.getKey())) {
job = jec.getJobInstance();
if (job instanceof InterruptableJob) {
((InterruptableJob)job).interrupt();
interrupted = true;
} else {
throw new UnableToInterruptJobException(
"Job " + jobDetail.getKey() +
" can not be interrupted, since it does not implement " +
InterruptableJob.class.getName());
}
}
}
return interrupted;
}
4.在实现了InterruptableJob的这个我们传入进去的,Job实现的interrupt()方法,我们修改了 _interrupted = true;
Job中就可以用 _interrupted 这个变量的值来判断外界有没有发出中断指令了。
执行 interrupt() 后,_interrupted发生了变量,在execute()方法中就可以用这个值来判断外界有无发出中断指令,然后就可以用这个值来做事了,如return,或抛出一个JobExecutionException异常
package org.quartz.examples.example7;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.UnableToInterruptJobException;
/**
* <p>
* A dumb implementation of an InterruptableJob, for unit testing purposes.
* </p>
*
* @author <a href="mailto:[email protected]">Chris Bonham</a>
* @author Bill Kratzer
*/
public class DumbInterruptableJob implements InterruptableJob {
// logging services
private static Logger _log = LoggerFactory.getLogger(DumbInterruptableJob.class);
// has the job been interrupted?
private boolean _interrupted = false;
// job name
private JobKey _jobKey = null;
/**
* <p>
* Empty constructor for job initialization
* </p>
*/
public DumbInterruptableJob() {
}
/**
* <p>
* Called by the <code>{@link org.quartz.Scheduler}</code> when a <code>{@link org.quartz.Trigger}</code>
* fires that is associated with the <code>Job</code>.
* </p>
*
* @throws JobExecutionException
* if there is an exception while executing the job.
*/
public void execute(JobExecutionContext context)
throws JobExecutionException {
//执行 interrupt() 后,_interrupted发生了变量,可以用这个值来判断外界有无发出中断指令
//然后就可以用这个值来做事了,如return,或抛出一个JobExecutionException异常
}
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @return void (nothing) if job interrupt is successful.
* @throws JobExecutionException
* if there is an exception while interrupting the job.
*/
public void interrupt() throws UnableToInterruptJobException {
_log.info("---" + _jobKey + " -- INTERRUPTING --");
_interrupted = true;
}
}
示例:中断演示
- 创建一个实例了InterruptableJob接口的Job
- 实现interrupt()方法,设_interrupted = true;
- 在execute()中每1秒(共查4秒)查一次interrupted的值是否为true,如果为true,则return中断Job
- 在scheduler中每7秒调用一次sched.interrupt(job.getKey());,让正在执行的Job中断
------------------------------------------------------------------------------------------------------------
/*
* All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/
package org.quartz.examples.example7;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.InterruptableJob;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.UnableToInterruptJobException;
/**
* <p>
* A dumb implementation of an InterruptableJob, for unit testing purposes.
* </p>
*
* @author <a href="mailto:[email protected]">Chris Bonham</a>
* @author Bill Kratzer
*/
public class DumbInterruptableJob implements InterruptableJob {
// logging services
private static Logger _log = LoggerFactory.getLogger(DumbInterruptableJob.class);
// has the job been interrupted?
private boolean _interrupted = false;
// job name
private JobKey _jobKey = null;
/**
* <p>
* Empty constructor for job initialization
* </p>
*/
public DumbInterruptableJob() {
}
/**
* <p>
* Called by the <code>{@link org.quartz.Scheduler}</code> when a <code>{@link org.quartz.Trigger}</code>
* fires that is associated with the <code>Job</code>.
* </p>
*
* @throws JobExecutionException
* if there is an exception while executing the job.
*/
public void execute(JobExecutionContext context)
throws JobExecutionException {
_jobKey = context.getJobDetail().getKey();
_log.info("---- " + _jobKey + " executing at " + new Date());
try {
// main job loop... see the JavaDOC for InterruptableJob for discussion...
// do some work... in this example we are 'simulating' work by sleeping... :)
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(1000L);
} catch (Exception ignore) {
ignore.printStackTrace();
}
// periodically check if we've been interrupted...
if(_interrupted) {
_log.info("--- " + _jobKey + " -- Interrupted... bailing out!");
return; // could also choose to throw a JobExecutionException
// if that made for sense based on the particular
// job's responsibilities/behaviors
}
}
} finally {
_log.info("---- " + _jobKey + " completed at " + new Date());
}
}
/**
* <p>
* Called by the <code>{@link Scheduler}</code> when a user
* interrupts the <code>Job</code>.
* </p>
*
* @return void (nothing) if job interrupt is successful.
* @throws JobExecutionException
* if there is an exception while interrupting the job.
*/
public void interrupt() throws UnableToInterruptJobException {
_log.info("---" + _jobKey + " -- INTERRUPTING --");
_interrupted = true;
}
}
------------------------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------------------
/*
* All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/
package org.quartz.examples.example7;
import static org.quartz.DateBuilder.nextGivenSecondDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SchedulerMetaData;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* Demonstrates the behavior of <code>StatefulJob</code>s, as well as how misfire instructions affect the firings of
* triggers of <code>StatefulJob</code> s - when the jobs take longer to execute that the frequency of the trigger's
* repitition.
* <p>
* While the example is running, you should note that there are two triggers with identical schedules, firing identical
* jobs. The triggers "want" to fire every 3 seconds, but the jobs take 10 seconds to execute. Therefore, by the time
* the jobs complete their execution, the triggers have already "misfired" (unless the scheduler's "misfire threshold"
* has been set to more than 7 seconds). You should see that one of the jobs has its misfire instruction set to
* <code>SimpleTrigger.MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT</code>- which causes it to fire
* immediately, when the misfire is detected. The other trigger uses the default "smart policy" misfire instruction,
* which causes the trigger to advance to its next fire time (skipping those that it has missed) - so that it does not
* refire immediately, but rather at the next scheduled time.
* </p>
*
* @author <a href="mailto:[email protected]">Chris Bonham</a>
*/
public class InterruptExample {
public void run() throws Exception {
final Logger log = LoggerFactory.getLogger(InterruptExample.class);
log.info("------- Initializing ----------------------");
// First we must get a reference to a scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
log.info("------- Initialization Complete -----------");
log.info("------- Scheduling Jobs -------------------");
// get a "nice round" time a few seconds in the future...
Date startTime = nextGivenSecondDate(null, 15);
JobDetail job = newJob(DumbInterruptableJob.class).withIdentity("interruptableJob1", "group1").build();
SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1")//
.startAt(startTime)//
.withSchedule(simpleSchedule()//
.withIntervalInSeconds(5)//
.repeatForever())//
.build();
Date ft = sched.scheduleJob(job, trigger);
log.info(job.getKey() + " will run at: " + ft + " and repeat: " + trigger.getRepeatCount() + " times, every "
+ trigger.getRepeatInterval() / 1000 + " seconds");
// start up the scheduler (jobs do not start to fire until
// the scheduler has been started)
sched.start();
log.info("------- Started Scheduler -----------------");
log.info("------- Starting loop to interrupt job every 7 seconds ----------");
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(7000L);
// tell the scheduler to interrupt our job
sched.interrupt(job.getKey());
} catch (Exception e) {
//
}
}
log.info("------- Shutting Down ---------------------");
sched.shutdown(true);
log.info("------- Shutdown Complete -----------------");
SchedulerMetaData metaData = sched.getMetaData();
log.info("Executed " + metaData.getNumberOfJobsExecuted() + " jobs.");
}
public static void main(String[] args) throws Exception {
InterruptExample example = new InterruptExample();
example.run();
}
}
------------------------------------------------------------------------------------------------------------
Executing result:
15:20:17.751 INFO org.quartz.examples.example7.InterruptExample 58 run - ------- Initializing ----------------------
15:20:17.800 INFO org.quartz.impl.StdSchedulerFactory 1172 instantiate - Using default implementation for ThreadExecutor
15:20:17.804 INFO org.quartz.simpl.SimpleThreadPool 268 initialize - Job execution threads will use class loader of thread: main
15:20:17.817 INFO org.quartz.core.SchedulerSignalerImpl 61 <init> - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
15:20:17.818 INFO org.quartz.core.QuartzScheduler 240 <init> - Quartz Scheduler v.2.2.3 created.
15:20:17.819 INFO org.quartz.simpl.RAMJobStore 155 initialize - RAMJobStore initialized.
15:20:17.820 INFO org.quartz.core.QuartzScheduler 305 initialize - Scheduler meta-data: Quartz Scheduler (v2.2.3) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
15:20:17.821 INFO org.quartz.impl.StdSchedulerFactory 1327 instantiate - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
15:20:17.821 INFO org.quartz.impl.StdSchedulerFactory 1331 instantiate - Quartz scheduler version: 2.2.3
15:20:17.821 INFO org.quartz.examples.example7.InterruptExample 64 run - ------- Initialization Complete -----------
15:20:17.822 INFO org.quartz.examples.example7.InterruptExample 66 run - ------- Scheduling Jobs -------------------
15:20:17.830 INFO org.quartz.examples.example7.InterruptExample 81 run - group1.interruptableJob1 will run at: Mon Aug 13 15:20:30 CST 2018 and repeat: -1 times, every 5 seconds
15:20:17.831 INFO org.quartz.core.QuartzScheduler 575 start - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
15:20:17.831 INFO org.quartz.examples.example7.InterruptExample 87 run - ------- Started Scheduler -----------------
15:20:17.832 INFO org.quartz.examples.example7.InterruptExample 89 run - ------- Starting loop to interrupt job every 7 seconds ----------
15:20:30.036 INFO org.quartz.examples.example7.DumbInterruptableJob 72 execute - ---- group1.interruptableJob1 executing at Mon Aug 13 15:20:30 CST 2018
15:20:31.832 INFO org.quartz.examples.example7.DumbInterruptableJob 110 interrupt - ---group1.interruptableJob1 -- INTERRUPTING --
15:20:32.037 INFO org.quartz.examples.example7.DumbInterruptableJob 87 execute - --- group1.interruptableJob1 -- Interrupted... bailing out!
15:20:32.039 INFO org.quartz.examples.example7.DumbInterruptableJob 95 execute - ---- group1.interruptableJob1 completed at Mon Aug 13 15:20:32 CST 2018
15:20:35.000 INFO org.quartz.examples.example7.DumbInterruptableJob 72 execute - ---- group1.interruptableJob1 executing at Mon Aug 13 15:20:35 CST 2018
15:20:38.835 INFO org.quartz.examples.example7.DumbInterruptableJob 110 interrupt - ---group1.interruptableJob1 -- INTERRUPTING --
15:20:39.006 INFO org.quartz.examples.example7.DumbInterruptableJob 87 execute - --- group1.interruptableJob1 -- Interrupted... bailing out!
15:20:39.012 INFO org.quartz.examples.example7.DumbInterruptableJob 95 execute - ---- group1.interruptableJob1 completed at Mon Aug 13 15:20:39 CST 2018
15:20:40.002 INFO org.quartz.examples.example7.DumbInterruptableJob 72 execute - ---- group1.interruptableJob1 executing at Mon Aug 13 15:20:40 CST 2018
15:20:44.008 INFO org.quartz.examples.example7.DumbInterruptableJob 95 execute - ---- group1.interruptableJob1 completed at Mon Aug 13 15:20:44 CST 2018
15:20:45.002 INFO org.quartz.examples.example7.DumbInterruptableJob 72 execute - ---- group1.interruptableJob1 executing at Mon Aug 13 15:20:45 CST 2018
15:20:45.837 INFO org.quartz.examples.example7.DumbInterruptableJob 110 interrupt - ---group1.interruptableJob1 -- INTERRUPTING --
15:20:46.003 INFO org.quartz.examples.example7.DumbInterruptableJob 87 execute - --- group1.interruptableJob1 -- Interrupted... bailing out!
15:20:46.022 INFO org.quartz.examples.example7.DumbInterruptableJob 95 execute - ---- group1.interruptableJob1 completed at Mon Aug 13 15:20:46 CST 2018
15:20:50.002 INFO org.quartz.examples.example7.DumbInterruptableJob 72 execute - ---- group1.interruptableJob1 executing at Mon Aug 13 15:20:50 CST 2018
15:20:52.841 INFO org.quartz.examples.example7.DumbInterruptableJob 110 interrupt - ---group1.interruptableJob1 -- INTERRUPTING --
15:20:52.847 INFO org.quartz.examples.example7.InterruptExample 100 run - ------- Shutting Down ---------------------
15:20:52.849 INFO org.quartz.core.QuartzScheduler 694 shutdown - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
15:20:52.853 INFO org.quartz.core.QuartzScheduler 613 standby - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
15:20:53.007 INFO org.quartz.examples.example7.DumbInterruptableJob 87 execute - --- group1.interruptableJob1 -- Interrupted... bailing out!
15:20:53.008 INFO org.quartz.examples.example7.DumbInterruptableJob 95 execute - ---- group1.interruptableJob1 completed at Mon Aug 13 15:20:53 CST 2018
15:20:53.356 INFO org.quartz.core.QuartzScheduler 771 shutdown - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.
15:20:53.358 INFO org.quartz.examples.example7.InterruptExample 104 run - ------- Shutdown Complete -----------------
15:20:53.360 INFO org.quartz.examples.example7.InterruptExample 106 run - Executed 5 jobs.