public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval) throws SchedulerException { this.resources = resources; if (resources.getJobStore() instanceof JobListener) { addInternalJobListener((JobListener)resources.getJobStore()); } this.schedThread = new QuartzSchedulerThread(this, resources); ThreadExecutor schedThreadExecutor = resources.getThreadExecutor(); schedThreadExecutor.execute(this.schedThread); if (idleWaitTime > 0) { this.schedThread.setIdleWaitTime(idleWaitTime); } jobMgr = new ExecutingJobsManager(); addInternalJobListener(jobMgr); errLogger = new ErrorLogger(); addInternalSchedulerListener(errLogger); signaler = new SchedulerSignalerImpl(this, this.schedThread); getLog().info("Quartz Scheduler v." + getVersion() + " created."); }这里创建了一个QuartzSchedulerThread实例(负责定时任务管理的线程),并通过schedThreadExecutor启动。
public void execute(Thread thread) { thread.start(); }
其实此时QuartzSchedulerThread并未开始发挥作用,它阻塞在其方法内。我们先看QuartzSchedulerThread的构造方法
QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) { super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName()); this.qs = qs; this.qsRsrcs = qsRsrcs; this.setDaemon(setDaemon); if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) { log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName()); this.setContextClassLoader(Thread.currentThread().getContextClassLoader()); } this.setPriority(threadPrio); // start the underlying thread, but put this object into the 'paused' // state // so processing doesn't start yet... paused = true; halted = new AtomicBoolean(false); }我们可以看到其paused在构造中赋值为true,halted为false。我们继续来看其run()方法的开始的部分逻辑
synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } }
由于paused为true,所以线程在其中等待,不会马上准备定时任务的开启。直到我们调用QuartzScheduler的start()方法,start方法将paused设为false,并保证线程从等待中唤醒。这时候定时任务的管理逻辑也真正开始。
贴出完整的QuartzSchedulerThread的run方法。
@Override public void run() { int acquiresFailed = 0; while (!halted.get()) { try { // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't // wait again after unpausing acquiresFailed = 0; } if (halted.get()) { break; } } // wait a bit, if reading from job store is consistently // failing (e.g. DB is down or restarting).. if (acquiresFailed > 1) { try { long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed); Thread.sleep(delay); } catch (Exception ignore) { } } int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads(); if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads... List<OperableTrigger> triggers; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()) , qsRsrcs.getBatchTimeWindow()); acquiresFailed = 0; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { if (acquiresFailed == 0) { qs.notifySchedulerListenersError( "An error occurred while scanning for the next triggers to fire.", jpe); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } catch (RuntimeException e) { if (acquiresFailed == 0) { getLog().error("quartzSchedulerThreadLoop: RuntimeException " +e.getMessage(), e); } if (acquiresFailed < Integer.MAX_VALUE) acquiresFailed++; continue; } if (triggers != null && !triggers.isEmpty()) { now = System.currentTimeMillis(); long triggerTime = triggers.get(0).getNextFireTime().getTime(); long timeUntilTrigger = triggerTime - now; while(timeUntilTrigger > 2) { synchronized (sigLock) { if (halted.get()) { break; } if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; } now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; } // this happens if releaseIfScheduleChangedSignificantly decided to release triggers if(triggers.isEmpty()) continue; // set triggers to 'executing' List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { qs.notifySchedulerListenersError( "An error occurred while firing triggers '" + triggers + "'", se); //QTZ-179 : a problem occurred interacting with the triggers from the db //we release them and loop again for (int i = 0; i < triggers.size(); i++) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); } continue; } } for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail() , CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail() , CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; }一开始先得到工作线程池中的线程个数,我们看其SimpleThreadPool中的逻辑
public int blockForAvailableThreads() { Object var1 = this.nextRunnableLock; synchronized(this.nextRunnableLock) { while((this.availWorkers.size() < 1 || this.handoffPending) && !this.isShutdown) { try { this.nextRunnableLock.wait(500L); } catch (InterruptedException var4) { ; } } return this.availWorkers.size(); } }
这个线程池在Scheduler的获取中就已经生成,并根据配置的线程数量不断生成相应的线程数量,这里得到的工作线程数availThreadCount,会在后面作为一次性取定时任务数目的依据。
然后通过调用JobStore的acquireNextTriggers得到所需执行的定时任务集合。取得的任务数量是min(可用的工作线程数,配置中的一次最大执行任务的数量),再传入noLaterThan时间,即得到的任务集合中的所有任务的等待时长都应该小于noLaterThan。这里JobStore的实现是RAMJobStore,我们来看其acquireNextTriggers()方法
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) { synchronized (lock) { List<OperableTrigger> result = new ArrayList<OperableTrigger>(); Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>(); Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>(); long batchEnd = noLaterThan; // return empty list if store has no triggers. if (timeTriggers.size() == 0) return result; while (true) { TriggerWrapper tw; try { tw = timeTriggers.first(); if (tw == null) break; timeTriggers.remove(tw); } catch (java.util.NoSuchElementException nsee) { break; } if (tw.trigger.getNextFireTime() == null) { continue; } if (applyMisfire(tw)) { if (tw.trigger.getNextFireTime() != null) { timeTriggers.add(tw); } continue; } if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) { timeTriggers.add(tw); break; } // If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then // put it back into the timeTriggers set and continue to search for next trigger. JobKey jobKey = tw.trigger.getJobKey(); JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail; if (job.isConcurrentExectionDisallowed()) { if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { excludedTriggers.add(tw); continue; // go to next trigger in store. } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } } tw.state = TriggerWrapper.STATE_ACQUIRED; tw.trigger.setFireInstanceId(getFiredTriggerRecordId()); OperableTrigger trig = (OperableTrigger) tw.trigger.clone(); if (result.isEmpty()) { batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow; } result.add(trig); if (result.size() == maxCount) break; } // If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution // , we need to add them back to store. if (excludedTriggers.size() > 0) timeTriggers.addAll(excludedTriggers); return result; } }tw = timeTriggers.first(),我们可以看到每次取的是timeTriggers的第一个元素,且保证了其是timeTriggers中执行时间离当前最近的定时任务。我们来看下timeTriggers的数据结构。
protected TreeSet<TriggerWrapper> timeTriggers = new TreeSet<TriggerWrapper>(new TriggerWrapperComparator()); class TriggerWrapperComparator implements Comparator<TriggerWrapper>, java.io.Serializable { private static final long serialVersionUID = 8809557142191514261L; TriggerTimeComparator ttc = new TriggerTimeComparator(); public int compare(TriggerWrapper trig1, TriggerWrapper trig2) { return ttc.compare(trig1.trigger, trig2.trigger); } @Override public boolean equals(Object obj) { return (obj instanceof TriggerWrapperComparator); } @Override public int hashCode() { return super.hashCode(); } }我们可以看到timeTriggers用的是TreeSet,内部实现即一颗红黑树,并重写其compare函数,优先通过定时时间(任务下次触发时间)比较。
public static int compare(Date nextFireTime1, int priority1, TriggerKey key1, Date nextFireTime2, int priority2, TriggerKey key2) { if (nextFireTime1 != null || nextFireTime2 != null) { if (nextFireTime1 == null) { return 1; } if (nextFireTime2 == null) { return -1; } if(nextFireTime1.before(nextFireTime2)) { return -1; } if(nextFireTime1.after(nextFireTime2)) { return 1; } } int comp = priority2 - priority1; if (comp != 0) { return comp; } return key1.compareTo(key2); }每次插入移出元素,数据结构内部自动排序,通过这个方式保证第一个元素是最先触发的定时任务。
回到 acquireNextTriggers中,在得到最先触发的定时任务后,将其移出timeTriggers。得到最先触发的定时任务,在这里可能存在这样一个bug,定时任务的触发时间 在向Scheduler中配置时已经定下,但此时并未调用 Scheduler的start,如果在调用其start后,我们发现最先触发的定时任务的触发时间已经过了,言外之意,调用Scheduler的start时,最先触发的定时任务已经过期。applyMisFire()在这里确认了这个问题。
protected boolean applyMisfire(TriggerWrapper tw) { long misfireTime = System.currentTimeMillis(); if (getMisfireThreshold() > 0) { misfireTime -= getMisfireThreshold(); } Date tnft = tw.trigger.getNextFireTime(); if (tnft == null || tnft.getTime() > misfireTime || tw.trigger.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) { return false; } Calendar cal = null; if (tw.trigger.getCalendarName() != null) { cal = retrieveCalendar(tw.trigger.getCalendarName()); } signaler.notifyTriggerListenersMisfired((OperableTrigger)tw.trigger.clone()); tw.trigger.updateAfterMisfire(cal); if (tw.trigger.getNextFireTime() == null) { tw.state = TriggerWrapper.STATE_COMPLETE; signaler.notifySchedulerListenersFinalized(tw.trigger); synchronized (lock) { timeTriggers.remove(tw); } } else if (tnft.equals(tw.trigger.getNextFireTime())) { return false; } return true; }
将当前时间减去misfireThreshold得到misfireTime,如果定时任务时间大于misfireTime,那么可以认为并没有错过该任务的触发时机,那么return false继续正常的思路。如果一旦小于,那么说明错过了,这时候会重新计算该任务的下一次触发时机,如果没有下次任务的触发时间,那么说明该任务不会被再触发,于是状态改成COMPLETED。如果错过了时机,并计算出下次任务触发时间不为空,那么可以当做一个新的任务加入timeTriggers中,并重新下一轮循环。
if (applyMisfire(tw)) { if (tw.trigger.getNextFireTime() != null) { timeTriggers.add(tw); } continue; }如果此时得到的任务的等待时间大于设置的等待时间,那么退出循环(因为当前的任务的触发时间是最小的,之后触发时间之后大于设置的等待时间)。
if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) { timeTriggers.add(tw); break; }
之后通过一个简单的逻辑解决了Job并发的问题。 Job是有可能并发执行的,比如一个任务要执行10秒中,而调度算法是每秒中触发1次,那么就有可能多个任务被并发执行。有时候我们并不想任务并发执行,这个时候在job上加一个@DisallowConcurrentExecution注解解决这个问题,那么底层是如何实现?
if (job.isConcurrentExectionDisallowed()) { if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { excludedTriggers.add(tw); continue; // go to next trigger in store. } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } }
job.isConcurrentExectionDisallowed()无非看该job是否是@DisallowConcurrentExecution注解的实例,如果是进入条件分支。第一次进入把jobkey加入到acquiredJobKeysForNoConcurrentExec集合中,第二次计算后又选取到该job,此时acquiredJobKeysForNoConcurrentExec中已经包含该job的jobkey,于是把该jobkey放入excludedTriggers集合中,并重新选取新的任务,而excludedTriggers集合将会在最后把其中的任务重新放入TimeTriggers中。需要注意的是acquiredJobKeysForNoConcurrentExec跟excludedTriggers集合是局部变量,而TimeTriggers是成员,全局的。这样实现思路就很好理解了。
之后将取得的任务状态改成ACQUIRED表示该任务是准备触发的,并记录任务的下一次要触发的时间,然后将任务深克隆,加入到要返回的result数组中。然后不断取任务执行上面流程,知道取到任务数目最大值,或者直到没有在等待时长内的任务,acquireNextTriggers返回的Trigger数组就是接下来准备触发的任务。
其实此时并不能保证这些已经是最后要触发的任务。如果在上面这段时间内有新的定时任务加进来,并且它的执行时间早于所有刚刚取得的任务的触发时间,或者早于部分刚取得的任务的触发时间,这样都会有问题。
要提一下,每一个定时任务加进来都会更新管理线程的signaledNextFireTime,我们看下其逻辑
public void signalSchedulingChange(long candidateNewNextFireTime) { Object var3 = this.sigLock; synchronized(this.sigLock) { this.signaled = true; this.signaledNextFireTime = candidateNewNextFireTime; this.sigLock.notifyAll(); } }我们继续看QuartzSchedulerThread中run方法给出的这个问题的解决方案
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) { try { // we could have blocked a long while // on 'synchronize', so we must recompute now = System.currentTimeMillis(); timeUntilTrigger = triggerTime - now; if(timeUntilTrigger >= 1) sigLock.wait(timeUntilTrigger); } catch (InterruptedException ignore) { } } } if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) { break; }通过isCandidateNewTimeEarlierWithinReason()方法判断有无上述情况发生,若无则等待触发时间,若在等待的时间中有新的任务加入,则notifyAll唤醒当前线程,并进入releaseIfScheduleChangedSignificantly中继续检查,若有上述情况发生,则 之前取得Trigger就绪态取消,Trigger数组被清空,重新在下一次循环中取所要触发的任务。若有上述情况发生则直接进入releaseIfScheduleChangedSignificantly中,之前取得Trigger就绪态取消,Trigger数组被清空,重新在下一次循环中取所要触发的任务。我们看下isCandidateNewTimeEarlierWithinReason跟releaseIfScheduleChangedSignificantly的代码
private boolean isCandidateNewTimeEarlierWithinReason(long oldTime, boolean clearSignal) { // So here's the deal: We know due to being signaled that 'the schedule' // has changed. We may know (if getSignaledNextFireTime() != 0) the // new earliest fire time. We may not (in which case we will assume // that the new time is earlier than the trigger we have acquired). // In either case, we only want to abandon our acquired trigger and // go looking for a new one if "it's worth it". It's only worth it if // the time cost incurred to abandon the trigger and acquire a new one // is less than the time until the currently acquired trigger will fire, // otherwise we're just "thrashing" the job store (e.g. database). // // So the question becomes when is it "worth it"? This will depend on // the job store implementation (and of course the particular database // or whatever behind it). Ideally we would depend on the job store // implementation to tell us the amount of time in which it "thinks" // it can abandon the acquired trigger and acquire a new one. However // we have no current facility for having it tell us that, so we make // a somewhat educated but arbitrary guess ;-). synchronized(sigLock) { if (!isScheduleChanged()) return false; boolean earlier = false; if(getSignaledNextFireTime() == 0) earlier = true; else if(getSignaledNextFireTime() < oldTime ) earlier = true; if(earlier) { // so the new time is considered earlier, but is it enough earlier? long diff = oldTime - System.currentTimeMillis(); if(diff < (qsRsrcs.getJobStore().supportsPersistence() ? 70L : 7L)) earlier = false; } if(clearSignal) { clearSignaledSchedulingChange(); } return earlier; } }
private boolean releaseIfScheduleChangedSignificantly( List<OperableTrigger> triggers, long triggerTime) { if (isCandidateNewTimeEarlierWithinReason(triggerTime, true)) { // above call does a clearSignaledSchedulingChange() for (OperableTrigger trigger : triggers) { qsRsrcs.getJobStore().releaseAcquiredTrigger(trigger); } triggers.clear(); return true; } return false; }
在这之后逻辑中,保证将要触发的任务时间在2ms之内,开始准备执行Trigger数组的第一个任务,同时在triggersFired()方法中更新下一次触发时间,重新放入在TimeTriggers中,准备周期方法的下一次执行。
这里将需要执行的定时任务封装到JobRunShell内,其中定时任务的job在initialize()中被创建
public void initialize(QuartzScheduler sched) throws SchedulerException { this.qs = sched; Job job = null; JobDetail jobDetail = firedTriggerBundle.getJobDetail(); try { job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler); } catch (SchedulerException se) { sched.notifySchedulerListenersError( "An error occured instantiating job to be executed. job= '" + jobDetail.getKey() + "'", se); throw se; } catch (Throwable ncdfe) { // such as NoClassDefFoundError SchedulerException se = new SchedulerException( "Problem instantiating class '" + jobDetail.getJobClass().getName() + "' - ", ncdfe); sched.notifySchedulerListenersError( "An error occured instantiating job to be executed. job= '" + jobDetail.getKey() + "'", se); throw se; } this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job); }其中 JobRunShell在一开始的配置的SimpleThreadPool中被执行。线程池执行 JobRunShell的run方法
public void run() { qs.addInternalSchedulerListener(this); try { OperableTrigger trigger = (OperableTrigger) jec.getTrigger(); JobDetail jobDetail = jec.getJobDetail(); do { JobExecutionException jobExEx = null; Job job = jec.getJobInstance(); try { begin(); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't begin execution.", se); break; } // notify job & trigger listeners... try { if (!notifyListenersBeginning(jec)) { break; } } catch(VetoedException ve) { try { CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null); qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode); // QTZ-205 // Even if trigger got vetoed, we still needs to check to see if // it's the trigger's finalized run or not. if (jec.getTrigger().getNextFireTime() == null) { qs.notifySchedulerListenersFinalized(jec.getTrigger()); } complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } break; } long startTime = System.currentTimeMillis(); long endTime = startTime; // execute the job try { log.debug("Calling execute on job " + jobDetail.getKey()); job.execute(jec); endTime = System.currentTimeMillis(); } catch (JobExecutionException jee) { endTime = System.currentTimeMillis(); jobExEx = jee; getLog().info("Job " + jobDetail.getKey() + " threw a JobExecutionException: ", jobExEx); } catch (Throwable e) { endTime = System.currentTimeMillis(); getLog().error("Job " + jobDetail.getKey() + " threw an unhandled Exception: ", e); SchedulerException se = new SchedulerException( "Job threw an unhandled exception.", e); qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se); jobExEx = new JobExecutionException(se, false); } jec.setJobRunTime(endTime - startTime); // notify all job listeners if (!notifyJobListenersComplete(jec, jobExEx)) { break; } CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP; // update the trigger try { instCode = trigger.executionComplete(jec, jobExEx); } catch (Exception e) { // If this happens, there's a bug in the trigger... SchedulerException se = new SchedulerException( "Trigger threw an unhandled exception.", e); qs.notifySchedulerListenersError( "Please report this error to the Quartz developers.", se); } // notify all trigger listeners if (!notifyTriggerListenersComplete(jec, instCode)) { break; } // update job/trigger or re-execute job if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) { jec.incrementRefireCount(); try { complete(false); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); } continue; } try { complete(true); } catch (SchedulerException se) { qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se); continue; } qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode); break; } while (true); } finally { qs.removeInternalSchedulerListener(this); } }其中无非调用job的execute执行任务,并统计执行 的任务时长,在job的execute()执行完毕后将会调用QuartzScheduler的notifyJobStoreJobComplete()方法执行定时任务的结束操作。
protected void notifyJobStoreJobComplete(OperableTrigger trigger, JobDetail detail, CompletedExecutionInstruction instCode) { resources.getJobStore().triggeredJobComplete(trigger, detail, instCode); } public void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, CompletedExecutionInstruction triggerInstCode) { Object var4 = this.lock; synchronized(this.lock) { JobWrapper jw = (JobWrapper)this.jobsByKey.get(jobDetail.getKey()); TriggerWrapper tw = (TriggerWrapper)this.triggersByKey.get(trigger.getKey()); if(jw != null) { JobDetail jd = jw.jobDetail; if(jd.isPersistJobDataAfterExecution()) { JobDataMap newData = jobDetail.getJobDataMap(); if(newData != null) { newData = (JobDataMap)newData.clone(); newData.clearDirtyFlag(); } jd = jd.getJobBuilder().setJobData(newData).build(); jw.jobDetail = jd; } if(jd.isConcurrentExectionDisallowed()) { this.blockedJobs.remove(jd.getKey()); ArrayList<TriggerWrapper> trigs = this.getTriggerWrappersForJob(jd.getKey()); Iterator i$ = trigs.iterator(); while(i$.hasNext()) { TriggerWrapper ttw = (TriggerWrapper)i$.next(); if(ttw.state == 5) { ttw.state = 0; this.timeTriggers.add(ttw); } if(ttw.state == 6) { ttw.state = 4; } } this.signaler.signalSchedulingChange(0L); } } else { this.blockedJobs.remove(jobDetail.getKey()); } if(tw != null) { if(triggerInstCode == CompletedExecutionInstruction.DELETE_TRIGGER) { if(trigger.getNextFireTime() == null) { if(tw.getTrigger().getNextFireTime() == null) { this.removeTrigger(trigger.getKey()); } } else { this.removeTrigger(trigger.getKey()); this.signaler.signalSchedulingChange(0L); } } else if(triggerInstCode == CompletedExecutionInstruction.SET_TRIGGER_COMPLETE) { tw.state = 3; this.timeTriggers.remove(tw); this.signaler.signalSchedulingChange(0L); } else if(triggerInstCode == CompletedExecutionInstruction.SET_TRIGGER_ERROR) { this.getLog().info("Trigger " + trigger.getKey() + " set to ERROR state."); tw.state = 7; this.signaler.signalSchedulingChange(0L); } else if(triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR) { this.getLog().info("All triggers of Job " + trigger.getJobKey() + " set to ERROR state."); this.setAllTriggersOfJobToState(trigger.getJobKey(), 7); this.signaler.signalSchedulingChange(0L); } else if(triggerInstCode == CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_COMPLETE) { this.setAllTriggersOfJobToState(trigger.getJobKey(), 3); this.signaler.signalSchedulingChange(0L); } } } }
根据@PersistJobDataAfterExecution的配置持久化执行过的任务数据,将@DisallowConcurrentExecution注解阻塞的任务准备执行,移除不会再触发的定时任务等工作。