调度,再到unscheduleJob,deleteJob
public class QuartzScheduler implements RemotableQuartzScheduler { public Date scheduleJob(JobDetail jobDetail, Trigger trigger) throws SchedulerException { validateState(); if(jobDetail == null) throw new SchedulerException("JobDetail cannot be null"); if(trigger == null) throw new SchedulerException("Trigger cannot be null"); if(jobDetail.getKey() == null) throw new SchedulerException("Job's key cannot be null"); if(jobDetail.getJobClass() == null) throw new SchedulerException("Job's class cannot be null"); OperableTrigger trig = (OperableTrigger)trigger; if(trigger.getJobKey() == null) //设置触发器的jobKey trig.setJobKey(jobDetail.getKey()); else if(!trigger.getJobKey().equals(jobDetail.getKey())) throw new SchedulerException("Trigger does not reference given job!"); trig.validate(); Calendar cal = null; if(trigger.getCalendarName() != null) cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName()); Date ft = trig.computeFirstFireTime(cal); if(ft == null) { throw new SchedulerException((new StringBuilder()).append("Based on configured schedule, the given trigger '").append(trigger.getKey()).append("' will never fire.").toString()); } else { //存储jobDetail, trig到RAMJobStore resources.getJobStore().storeJobAndTrigger(jobDetail, trig); //下面三个现在先不说以后再说 notifySchedulerListenersJobAdded(jobDetail); notifySchedulerThread(trigger.getNextFireTime().getTime()); notifySchedulerListenersSchduled(trigger); return ft; } public boolean unscheduleJob(TriggerKey triggerKey) throws SchedulerException { validateState(); //从容器中的RAMJobStore的triggersByKey Map中将triggerKey移除 if(resources.getJobStore().removeTrigger(triggerKey)) { notifySchedulerThread(0L); notifySchedulerListenersUnscheduled(triggerKey); } else { return false; } return true; } public boolean deleteJob(JobKey jobKey) throws SchedulerException { validateState(); boolean result = false; //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> List triggers = getTriggersOfJob(jobKey); for(Iterator i$ = triggers.iterator(); i$.hasNext();) { Trigger trigger = (Trigger)i$.next(); //移除触发器 if(!unscheduleJob(trigger.getKey())) { StringBuilder sb = (new StringBuilder()).append("Unable to unschedule trigger [").append(trigger.getKey()).append("] while deleting job [").append(jobKey).append("]"); throw new SchedulerException(sb.toString()); } result = true; } //从容器中的RAMJobStore的triggersByKey,trriger, //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除 //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息 result = resources.getJobStore().removeJob(jobKey) || result; if(result) { notifySchedulerThread(0L); notifySchedulerListenersJobDeleted(jobKey); } return result; } private static String VERSION_MAJOR; private static String VERSION_MINOR; private static String VERSION_ITERATION; private QuartzSchedulerResources resources; private QuartzSchedulerThread schedThread; private ThreadGroup threadGroup; private SchedulerContext context; private ListenerManager listenerManager; private HashMap internalJobListeners; private HashMap internalTriggerListeners; private ArrayList internalSchedulerListeners; private JobFactory jobFactory; ExecutingJobsManager jobMgr; ErrorLogger errLogger; private SchedulerSignaler signaler; private Random random; private ArrayList holdToPreventGC; private boolean signalOnSchedulingChange; private volatile boolean closed; private volatile boolean shuttingDown; private boolean boundRemotely; private QuartzSchedulerMBean jmxBean; private Date initialStart; private final Timer updateTimer; private final Logger log = LoggerFactory.getLogger(getClass()); static { Properties props; InputStream is; VERSION_MAJOR = "UNKNOWN"; VERSION_MINOR = "UNKNOWN"; VERSION_ITERATION = "UNKNOWN"; props = new Properties(); is = null; is = org/quartz/core/QuartzScheduler.getResourceAsStream("quartz-build.properties"); if(is != null) { props.load(is); String version = props.getProperty("version"); if(version != null) { String versionComponents[] = version.split("\\."); VERSION_MAJOR = versionComponents[0]; VERSION_MINOR = versionComponents[1]; if(versionComponents.length > 2) VERSION_ITERATION = versionComponents[2]; else VERSION_ITERATION = "0"; } else { LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Can't parse Quartz version from quartz-build.properties"); } } if(is != null) try { is.close(); } catch(Exception ignore) { } break MISSING_BLOCK_LABEL_181; Exception e; e; LoggerFactory.getLogger(org/quartz/core/QuartzScheduler).error("Error loading version info from quartz-build.properties.", e); if(is != null) try { is.close(); } catch(Exception ignore) { } break MISSING_BLOCK_LABEL_181; Exception exception; exception; if(is != null) try { is.close(); } catch(Exception ignore) { } throw exception; } }
//Job存储的实现
public class RAMJobStore implements JobStore { //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> public List getTriggersForJob(JobKey jobKey) { ArrayList trigList = new ArrayList(); synchronized(lock) { //从triggers中获取jobKey对应的TriggerWrapper Iterator i$ = triggers.iterator(); do { if(!i$.hasNext()) break; TriggerWrapper tw = (TriggerWrapper)i$.next(); if(tw.jobKey.equals(jobKey)) trigList.add((OperableTrigger)tw.trigger.clone()); } while(true); } return trigList; } public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) throws JobPersistenceException { storeJob(newJob, false); storeTrigger(newTrigger, false); } //存储newJob到jobsByKey,jobsByGroup public void storeJob(JobDetail newJob, boolean replaceExisting) throws ObjectAlreadyExistsException { JobWrapper jw = new JobWrapper((JobDetail)newJob.clone()); boolean repl = false; synchronized(lock) { if(jobsByKey.get(jw.key) != null) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newJob); repl = true; } if(!repl) { HashMap grpMap = (HashMap)jobsByGroup.get(newJob.getKey().getGroup()); if(grpMap == null) { grpMap = new HashMap(100); jobsByGroup.put(newJob.getKey().getGroup(), grpMap); } //存储job到jobsByGroup grpMap.put(newJob.getKey(), jw); //存储job到jobsByKey jobsByKey.put(jw.key, jw); } else { JobWrapper orig = (JobWrapper)jobsByKey.get(jw.key); orig.jobDetail = jw.jobDetail; } } } //将newTrigger存储到triggersByKey,trriger,triggersByGroup,timeTriggers public void storeTrigger(OperableTrigger newTrigger, boolean replaceExisting) throws JobPersistenceException { TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone()); synchronized(lock) { if(triggersByKey.get(tw.key) != null) { if(!replaceExisting) throw new ObjectAlreadyExistsException(newTrigger); removeTrigger(newTrigger.getKey(), false); } if(retrieveJob(newTrigger.getJobKey()) == null) throw new JobPersistenceException((new StringBuilder()).append("The job (").append(newTrigger.getJobKey()).append(") referenced by the trigger does not exist.").toString()); //存储trriger到triggers triggers.add(tw); HashMap grpMap = (HashMap)triggersByGroup.get(newTrigger.getKey().getGroup()); if(grpMap == null) { grpMap = new HashMap(100); triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap); } //存储trriger到triggersByGroup grpMap.put(newTrigger.getKey(), tw); //存储trriger到triggersByKey triggersByKey.put(tw.key, tw); if(pausedTriggerGroups.contains(newTrigger.getKey().getGroup()) || pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) { tw.state = 4; if(blockedJobs.contains(tw.jobKey)) tw.state = 6; } else if(blockedJobs.contains(tw.jobKey)) tw.state = 5; else // 将trriger添加到timeTriggers timeTriggers.add(tw); } } public boolean removeTrigger(TriggerKey triggerKey) { return removeTrigger(triggerKey, true); } //从triggersByKey,triggers,timeTriggers移除key private boolean removeTrigger(TriggerKey key, boolean removeOrphanedJob) { boolean found; synchronized(lock) { //从triggersByKey移除 found = triggersByKey.remove(key) != null; if(found) { TriggerWrapper tw = null; HashMap grpMap = (HashMap)triggersByGroup.get(key.getGroup()); if(grpMap != null) { ////从triggersByGroup移除 grpMap.remove(key); if(grpMap.size() == 0) triggersByGroup.remove(key.getGroup()); } Iterator tgs = triggers.iterator(); do { if(!tgs.hasNext()) break; tw = (TriggerWrapper)tgs.next(); if(!key.equals(tw.key)) continue; //从triggers移除 tgs.remove(); break; } while(true); //从timeTriggers移除 timeTriggers.remove(tw); if(removeOrphanedJob) { JobWrapper jw = (JobWrapper)jobsByKey.get(tw.jobKey); List trigs = getTriggersForJob(tw.jobKey); if((trigs == null || trigs.size() == 0) && !jw.jobDetail.isDurable() && removeJob(jw.key)) signaler.notifySchedulerListenersJobDeleted(jw.key); } } } return found; } //从容器中的RAMJobStore的triggersByKey,trriger, //triggersByGroup,timeTriggers,中将jobKey对应的TrrigerWapper移除 //同时,从jobsByKey,jobsByGroup的移除对应jobKey的JobWrapper相关信息 public boolean removeJob(JobKey jobKey) { boolean found = false; synchronized(lock) { //获取容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper> List triggersOfJob = getTriggersForJob(jobKey); for(Iterator i$ = triggersOfJob.iterator(); i$.hasNext();) { OperableTrigger trig = (OperableTrigger)i$.next(); //从triggersByKey,triggers,timeTriggers移除key removeTrigger(trig.getKey()); found = true; } //从jobsByKey移除jobKey found = (jobsByKey.remove(jobKey) != null) | found; if(found) { HashMap grpMap = (HashMap)jobsByGroup.get(jobKey.getGroup()); if(grpMap != null) { //从jobsByGroup对应的group总移除jobKey grpMap.remove(jobKey); if(grpMap.size() == 0) jobsByGroup.remove(jobKey.getGroup()); } } } return found; } protected HashMap jobsByKey;//HashMap<JobKey,JobWrapper> protected HashMap triggersByKey;//HashMap<TrrigerKey,TriggerWrapper> protected HashMap jobsByGroup;//HashMap<String,HashMap<JobKey,JobWrapper>>,,key为JobKey.group protected HashMap triggersByGroup;//HashMap<String,HashMap<TrrigerKey,TriggerWrapper>>,,key为TrrigerKey.group protected TreeSet timeTriggers;//TreeSet<TrrigerWrapper> 红黑树 protected HashMap calendarsByName; protected ArrayList triggers; //List<TriggerWrapper> protected final Object lock = new Object(); protected HashSet pausedTriggerGroups; protected HashSet pausedJobGroups; protected HashSet blockedJobs; protected long misfireThreshold; protected SchedulerSignaler signaler; private final Logger log = LoggerFactory.getLogger(getClass()); private static final AtomicLong ftrCtr = new AtomicLong(System.currentTimeMillis()); }
TriggerKey与JobKey包装类
class TriggerWrapper { TriggerWrapper(OperableTrigger trigger) { state = 0; if(trigger == null) { throw new IllegalArgumentException("Trigger cannot be null!"); } else { this.trigger = trigger; key = trigger.getKey(); jobKey = trigger.getJobKey(); return; } } public boolean equals(Object obj) { if(obj instanceof TriggerWrapper) { TriggerWrapper tw = (TriggerWrapper)obj; if(tw.key.equals(key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public OperableTrigger getTrigger() { return trigger; } public final TriggerKey key; public final JobKey jobKey; public final OperableTrigger trigger; public int state; public static final int STATE_WAITING = 0; public static final int STATE_ACQUIRED = 1; public static final int STATE_EXECUTING = 2; public static final int STATE_COMPLETE = 3; public static final int STATE_PAUSED = 4; public static final int STATE_BLOCKED = 5; public static final int STATE_PAUSED_BLOCKED = 6; public static final int STATE_ERROR = 7; }
JobKey与JobDetail包装类
class JobWrapper { JobWrapper(JobDetail jobDetail) { this.jobDetail = jobDetail; key = jobDetail.getKey(); } public boolean equals(Object obj) { if(obj instanceof JobWrapper) { JobWrapper jw = (JobWrapper)obj; if(jw.key.equals(key)) return true; } return false; } public int hashCode() { return key.hashCode(); } public JobKey key; public JobDetail jobDetail; }
//JobDetail
public class JobDetailImpl implements Cloneable, Serializable, JobDetail { private static final long serialVersionUID = -6069784757781506897L; private String name; private String group; private String description; private Class jobClass; private JobDataMap jobDataMap;//拥有共享数据,就是一个Map private boolean durability; private boolean shouldRecover; private transient JobKey key; }
//JobKey
public final class JobKey extends Key { public JobKey(String name) { super(name, null); } public JobKey(String name, String group) { super(name, group); } public static JobKey jobKey(String name) { return new JobKey(name, null); } public static JobKey jobKey(String name, String group) { return new JobKey(name, group); } private static final long serialVersionUID = -6073883950062574010L; }
//TriggerKey
public final class TriggerKey extends Key { public TriggerKey(String name) { super(name, null); } public TriggerKey(String name, String group) { super(name, group); } public static TriggerKey triggerKey(String name) { return new TriggerKey(name, null); } public static TriggerKey triggerKey(String name, String group) { return new TriggerKey(name, group); } private static final long serialVersionUID = 8070357886703449660L; }
//Key
package org.quartz.utils; import java.io.Serializable; import java.util.UUID; public class Key implements Serializable, Comparable { public Key(String name, String group) { if(name == null) throw new IllegalArgumentException("Name cannot be null."); this.name = name; if(group != null) this.group = group; else this.group = "DEFAULT"; } public String getName() { return name; } public String getGroup() { return group; } private static final long serialVersionUID = -7141167957642391350L; public static final String DEFAULT_GROUP = "DEFAULT"; private final String name; private final String group; }
总结:
首先从Scheduler.scheduleJob(JobDetail jobDetail, Trigger trigger) 调度job,
实际上就是将job存储到RAM中的jobsByGroup,jobsByKey对应的Map中, 将trigger存储到
triggers(List),triggersByKey,triggersByGroup对应的Map中,及timeTriggers的Treeset中。
Scheduler.unscheduleJob(TriggerKey triggerKey) 就是将triggerKey从triggersByKey,triggersByGroup,
triggers,timeTriggers中移除;Scheduler.deleteJob(JobKey jobKey)
除了从容器triggers中的TriggerWrapper的JobKey为jobKey的List<TriggerWrapper>,并unscheduleJob
(TriggerKey triggerKey)列表 List<TriggerWrapper>中的所有TriggerWrapper,同时从jobsByKey,jobsByGroup
的移除对应jobKey的相关信息