private void doUpdateOfMisfiredTrigger(Connection conn, SchedulingContext ctxt, Trigger trig, boolean forceState, String newStateIfNotComplete, boolean recovering) throws JobPersistenceException { Calendar cal = null; if (trig.getCalendarName() != null) { cal = retrieveCalendar(conn, ctxt, trig.getCalendarName()); } schedSignaler.notifyTriggerListenersMisfired(trig); trig.updateAfterMisfire(cal); if (trig.getNextFireTime() == null) { storeTrigger(conn, ctxt, trig, null, true, STATE_COMPLETE, forceState, recovering); } else { storeTrigger(conn, ctxt, trig, null, true, newStateIfNotComplete, forceState, false); } }
protected List<SchedulerStateRecord> clusterCheckIn(Connection conn) throws JobPersistenceException { List<SchedulerStateRecord> failedInstances = findFailedInstances(conn); try { // FUTURE_TODO: handle self-failed-out // check in... lastCheckin = System.currentTimeMillis(); if(getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) { getDelegate().insertSchedulerState(conn, getInstanceId(), lastCheckin, getClusterCheckinInterval()); } } catch (Exception e) { throw new JobPersistenceException("Failure updating scheduler state when checking-in: " + e.getMessage(), e); } return failedInstances; }
/** * <p> * Pause all of the <code>{@link org.quartz.Job}s</code> matching the given * groupMatcher - by pausing all of their <code>Trigger</code>s. * </p> * * @see #resumeJobs(org.quartz.impl.matchers.GroupMatcher) */ @SuppressWarnings("unchecked") public Set<String> pauseJobs(final GroupMatcher<JobKey> matcher) throws JobPersistenceException { return (Set<String>) executeInLock( LOCK_TRIGGER_ACCESS, new TransactionCallback() { public Set<String> execute(final Connection conn) throws JobPersistenceException { Set<String> groupNames = new HashSet<String>(); Set<JobKey> jobNames = getJobNames(conn, matcher); for (JobKey jobKey : jobNames) { List<OperableTrigger> triggers = getTriggersForJob(conn, jobKey); for (OperableTrigger trigger : triggers) { pauseTrigger(conn, trigger.getKey()); } groupNames.add(jobKey.getGroup()); } return groupNames; } } ); }
/** * <p> * Get the current state of the identified <code>{@link Trigger}</code>. * </p> * * @see Trigger#STATE_NORMAL * @see Trigger#STATE_PAUSED * @see Trigger#STATE_COMPLETE * @see Trigger#STATE_ERROR * @see Trigger#STATE_BLOCKED * @see Trigger#STATE_NONE */ public int getTriggerState(SchedulingContext ctxt, String triggerName, String groupName) throws JobPersistenceException { synchronized(lock) { TriggerWrapper tw = (TriggerWrapper) triggersByFQN.get(TriggerWrapper .getTriggerNameKey(triggerName, groupName)); if (tw == null) { return Trigger.STATE_NONE; } if (tw.state == TriggerWrapper.STATE_COMPLETE) { return Trigger.STATE_COMPLETE; } if (tw.state == TriggerWrapper.STATE_PAUSED) { return Trigger.STATE_PAUSED; } if (tw.state == TriggerWrapper.STATE_PAUSED_BLOCKED) { return Trigger.STATE_PAUSED; } if (tw.state == TriggerWrapper.STATE_BLOCKED) { return Trigger.STATE_BLOCKED; } if (tw.state == TriggerWrapper.STATE_ERROR) { return Trigger.STATE_ERROR; } return Trigger.STATE_NORMAL; } }
protected Trigger retrieveTrigger(Connection conn, String triggerName, String groupName) throws JobPersistenceException { try { Trigger trigger = getDelegate().selectTrigger(conn, triggerName, groupName); if (trigger == null) { return null; } // In case Trigger was BLOB, clear out any listeners that might // have been serialized. trigger.clearAllTriggerListeners(); String[] listeners = getDelegate().selectTriggerListeners(conn, triggerName, groupName); for (int i = 0; i < listeners.length; ++i) { trigger.addTriggerListener(listeners[i]); } return trigger; } catch (Exception e) { throw new JobPersistenceException("Couldn't retrieve trigger: " + e.getMessage(), e); } }
/** * <p> * Pause the <code>{@link org.quartz.Trigger}</code> with the given name. * </p> * * @see #resumeTrigger(Connection, SchedulingContext, String, String) */ public void pauseTrigger(Connection conn, SchedulingContext ctxt, String triggerName, String groupName) throws JobPersistenceException { try { String oldState = getDelegate().selectTriggerState(conn, triggerName, groupName); if (oldState.equals(STATE_WAITING) || oldState.equals(STATE_ACQUIRED)) { getDelegate().updateTriggerState(conn, triggerName, groupName, STATE_PAUSED); } else if (oldState.equals(STATE_BLOCKED)) { getDelegate().updateTriggerState(conn, triggerName, groupName, STATE_PAUSED_BLOCKED); } } catch (SQLException e) { throw new JobPersistenceException("Couldn't pause trigger '" + groupName + "." + triggerName + "': " + e.getMessage(), e); } }
public void storeJobsAndTriggers( final Map<JobDetail, Set<? extends Trigger>> triggersAndJobs, final boolean replace) throws JobPersistenceException { executeInLock( (isLockOnInsert() || replace) ? LOCK_TRIGGER_ACCESS : null, new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { // FUTURE_TODO: make this more efficient with a true bulk operation... for(JobDetail job: triggersAndJobs.keySet()) { storeJob(conn, job, replace); for(Trigger trigger: triggersAndJobs.get(job)) { storeTrigger(conn, (OperableTrigger) trigger, job, replace, Constants.STATE_WAITING, false, false); } } } }); }
/** * <p> * Resume (un-pause) all of the <code>{@link org.quartz.JobDetail}s</code> in the given group. * </p> * <p> * If any of the <code>Job</code> s had <code>Trigger</code> s that missed one or more fire-times, then the * <code>Trigger</code>'s misfire instruction will be applied. * </p> */ @Override public Collection<String> resumeJobs(GroupMatcher<JobKey> matcher) throws JobPersistenceException { Collection<String> groups = new HashSet<String>(); lock(); try { Set<JobKey> jobKeys = getJobKeys(matcher); for (JobKey jobKey : jobKeys) { if (groups.add(jobKey.getGroup())) { jobFacade.removePausedJobGroup(jobKey.getGroup()); } for (OperableTrigger trigger : getTriggersForJob(jobKey)) { resumeTrigger(trigger.getKey()); } } } finally { unlock(); } return groups; }
boolean applyMisfire(TriggerWrapper tw) throws JobPersistenceException { long misfireTime = System.currentTimeMillis(); if (getMisfireThreshold() > 0) { misfireTime -= getMisfireThreshold(); } Date tnft = tw.getNextFireTime(); if (tnft == null || tnft.getTime() > misfireTime || tw.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) { return false; } Calendar cal = null; if (tw.getCalendarName() != null) { cal = retrieveCalendar(tw.getCalendarName()); } signaler.notifyTriggerListenersMisfired(tw.getTriggerClone()); tw.updateAfterMisfire(cal, triggerFacade); if (tw.getNextFireTime() == null) { tw.setState(TriggerState.COMPLETE, terracottaClientId, triggerFacade); signaler.notifySchedulerListenersFinalized(tw.getTriggerClone()); timeTriggers.remove(tw); } else if (tnft.equals(tw.getNextFireTime())) { return false; } return true; }
/** * <p> * Pause all triggers - equivalent of calling <code>pauseTriggerGroup(group)</code> * on every group. * </p> * * <p> * When <code>resumeAll()</code> is called (to un-pause), trigger misfire * instructions WILL be applied. * </p> * * @see #resumeAll(Connection) * @see #pauseTriggerGroup(java.sql.Connection, org.quartz.impl.matchers.GroupMatcher) */ public void pauseAll(Connection conn) throws JobPersistenceException { List<String> names = getTriggerGroupNames(conn); for (String name: names) { pauseTriggerGroup(conn, GroupMatcher.triggerGroupEquals(name)); } try { if (!getDelegate().isTriggerGroupPaused(conn, ALL_GROUPS_PAUSED)) { getDelegate().insertPausedTriggerGroup(conn, ALL_GROUPS_PAUSED); } } catch (SQLException e) { throw new JobPersistenceException( "Couldn't pause all trigger groups: " + e.getMessage(), e); } }
protected boolean removeJob(Connection conn, SchedulingContext ctxt, String jobName, String groupName, boolean activeDeleteSafe) throws JobPersistenceException { try { Key[] jobTriggers = getDelegate().selectTriggerNamesForJob(conn, jobName, groupName); for (int i = 0; i < jobTriggers.length; ++i) { deleteTriggerAndChildren( conn, jobTriggers[i].getName(), jobTriggers[i].getGroup()); } return deleteJobAndChildren(conn, ctxt, jobName, groupName); } catch (SQLException e) { throw new JobPersistenceException("Couldn't remove job: " + e.getMessage(), e); } }
@Override public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) throws JobPersistenceException { try { return realJobStore.acquireNextTriggers(noLaterThan, maxCount, timeWindow); } catch (RejoinException e) { throw new JobPersistenceException("Trigger acquisition failed due to client rejoin", e); } }
@Override public List<String> getJobGroupNames() throws JobPersistenceException { try { return realJobStore.getJobGroupNames(); } catch (RejoinException e) { throw new JobPersistenceException("Job name retrieval failed due to client rejoin", e); } }
@Override public Set<JobKey> getJobKeys(GroupMatcher<JobKey> matcher) throws JobPersistenceException { try { return realJobStore.getJobKeys(matcher); } catch (RejoinException e) { throw new JobPersistenceException("Job key retrieval failed due to client rejoin", e); } }
@Override public int getNumberOfCalendars() throws JobPersistenceException { try { return realJobStore.getNumberOfCalendars(); } catch (RejoinException e) { throw new JobPersistenceException("Calendar count retrieval failed due to client rejoin", e); } }
/** * <p> * Check existence of a given job. * </p> */ protected boolean jobExists(Connection conn, JobKey jobKey) throws JobPersistenceException { try { return getDelegate().jobExists(conn, jobKey); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't determine job existence (" + jobKey + "): " + e.getMessage(), e); } }
/** * <p> * Check existence of a given job. * </p> */ protected boolean jobExists(Connection conn, String jobName, String groupName) throws JobPersistenceException { try { return getDelegate().jobExists(conn, jobName, groupName); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't determine job existence (" + groupName + "." + jobName + "): " + e.getMessage(), e); } }
@Override public List<String> getTriggerGroupNames() throws JobPersistenceException { try { return realJobStore.getTriggerGroupNames(); } catch (RejoinException e) { throw new JobPersistenceException("Trigger group retrieval failed due to client rejoin", e); } }
protected int getNumberOfCalendars(Connection conn) throws JobPersistenceException { try { return getDelegate().selectNumCalendars(conn); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't obtain number of calendars: " + e.getMessage(), e); } }
@Override public List<OperableTrigger> getTriggersForJob(JobKey jobKey) throws JobPersistenceException { try { return realJobStore.getTriggersForJob(jobKey); } catch (RejoinException e) { throw new JobPersistenceException("Trigger retrieval failed due to client rejoin", e); } }
protected int getNumberOfJobs(Connection conn) throws JobPersistenceException { try { return getDelegate().selectNumJobs(conn); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't obtain number of jobs: " + e.getMessage(), e); } }
@Override public void pauseJob(JobKey jobKey) throws JobPersistenceException { try { realJobStore.pauseJob(jobKey); } catch (RejoinException e) { throw new JobPersistenceException("Pausing job failed due to client rejoin", e); } }
/** * <p> * Inform the <code>JobStore</code> that the scheduler has completed the * firing of the given <code>Trigger</code> (and the execution its * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code> * in the given <code>JobDetail</code> should be updated if the <code>Job</code> * is stateful. * </p> */ public void triggeredJobComplete(final SchedulingContext ctxt, final Trigger trigger, final JobDetail jobDetail, final int triggerInstCode) throws JobPersistenceException { executeInNonManagedTXLock( LOCK_TRIGGER_ACCESS, new VoidTransactionCallback() { public void execute(Connection conn) throws JobPersistenceException { triggeredJobComplete(conn, ctxt, trigger, jobDetail,triggerInstCode); } }); }
/** * Determines if a Trigger for the given job should be blocked. * State can only transition to STATE_PAUSED_BLOCKED/BLOCKED from * PAUSED/STATE_WAITING respectively. * * @return STATE_PAUSED_BLOCKED, BLOCKED, or the currentState. */ protected String checkBlockedState( Connection conn, JobKey jobKey, String currentState) throws JobPersistenceException { // State can only transition to BLOCKED from PAUSED or WAITING. if ((!currentState.equals(STATE_WAITING)) && (!currentState.equals(STATE_PAUSED))) { return currentState; } try { List<FiredTriggerRecord> lst = getDelegate().selectFiredTriggerRecordsByJob(conn, jobKey.getName(), jobKey.getGroup()); if (lst.size() > 0) { FiredTriggerRecord rec = lst.get(0); if (rec.isJobDisallowsConcurrentExecution()) { // OLD_TODO: worry about failed/recovering/volatile job states? return (STATE_PAUSED.equals(currentState)) ? STATE_PAUSED_BLOCKED : STATE_BLOCKED; } } return currentState; } catch (SQLException e) { throw new JobPersistenceException( "Couldn't determine if trigger should be in a blocked state '" + jobKey + "': " + e.getMessage(), e); } }
protected String[] getTriggerGroupNames(Connection conn, SchedulingContext ctxt) throws JobPersistenceException { String[] groupNames = null; try { groupNames = getDelegate().selectTriggerGroups(conn); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't obtain trigger groups: " + e.getMessage(), e); } return groupNames; }
protected List<String> getJobGroupNames(Connection conn) throws JobPersistenceException { List<String> groupNames; try { groupNames = getDelegate().selectJobGroups(conn); } catch (SQLException e) { throw new JobPersistenceException("Couldn't obtain job groups: " + e.getMessage(), e); } return groupNames; }
@Override public void resumeTrigger(TriggerKey triggerKey) throws JobPersistenceException { try { realJobStore.resumeTrigger(triggerKey); } catch (RejoinException e) { throw new JobPersistenceException("Resuming trigger failed due to client rejoin", e); } }
@Override public Collection<String> resumeTriggers(GroupMatcher<TriggerKey> matcher) throws JobPersistenceException { try { return realJobStore.resumeTriggers(matcher); } catch (RejoinException e) { throw new JobPersistenceException("Resuming triggers failed due to client rejoin", e); } }
/** * @see org.quartz.spi.JobStore#getPausedTriggerGroups() */ public Set<String> getPausedTriggerGroups() throws JobPersistenceException { HashSet<String> set = new HashSet<String>(); set.addAll(pausedTriggerGroups); return set; }
/** * Recover any failed or misfired jobs and clean up the data store as * appropriate. * * @throws JobPersistenceException if jobs could not be recovered */ protected void recoverJobs() throws JobPersistenceException { executeInNonManagedTXLock( LOCK_TRIGGER_ACCESS, new VoidTransactionCallback() { public void execute(Connection conn) throws JobPersistenceException { recoverJobs(conn); } }); }
@Override public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) throws JobPersistenceException { try { return realJobStore.triggersFired(triggers); } catch (RejoinException e) { throw new JobPersistenceException("Trigger fire marking failed due to client rejoin", e); } }
/** * @see org.quartz.spi.JobStore#replaceTrigger(org.quartz.core.SchedulingContext, java.lang.String, java.lang.String, org.quartz.Trigger) */ public boolean replaceTrigger(final SchedulingContext ctxt, final String triggerName, final String groupName, final Trigger newTrigger) throws JobPersistenceException { return ((Boolean)executeInLock( LOCK_TRIGGER_ACCESS, new TransactionCallback() { public Object execute(Connection conn) throws JobPersistenceException { return replaceTrigger(conn, ctxt, triggerName, groupName, newTrigger) ? Boolean.TRUE : Boolean.FALSE; } })).booleanValue(); }
@Override public boolean checkExists(final TriggerKey triggerKey) throws JobPersistenceException { try { return realJobStore.checkExists(triggerKey); } catch (RejoinException e) { throw new JobPersistenceException("Trigger existence check failed due to client rejoin", e); } }
/** * Commit the supplied connection * * @param conn (Optional) * @throws JobPersistenceException thrown if a SQLException occurs when the * connection is committed */ protected void commitConnection(Connection conn) throws JobPersistenceException { if (conn != null) { try { conn.commit(); } catch (SQLException e) { throw new JobPersistenceException( "Couldn't commit jdbc connection. "+e.getMessage(), e); } } }
@Override public boolean removeJobs(List<JobKey> arg0) throws JobPersistenceException { try { return realJobStore.removeJobs(arg0); } catch (RejoinException e) { throw new JobPersistenceException("Removing jobs failed due to client rejoin", e); } }
protected List<OperableTrigger> getTriggersForJob(Connection conn, JobKey key) throws JobPersistenceException { List<OperableTrigger> list; try { list = getDelegate() .selectTriggersForJob(conn, key); } catch (Exception e) { throw new JobPersistenceException( "Couldn't obtain triggers for job: " + e.getMessage(), e); } return list; }
/** * <p> * Inform the <code>JobStore</code> that the scheduler has completed the * firing of the given <code>Trigger</code> (and the execution its * associated <code>Job</code>), and that the <code>{@link org.quartz.JobDataMap}</code> * in the given <code>JobDetail</code> should be updated if the <code>Job</code> * is stateful. * </p> */ public void triggeredJobComplete(final OperableTrigger trigger, final JobDetail jobDetail, final CompletedExecutionInstruction triggerInstCode) { retryExecuteInNonManagedTXLock( LOCK_TRIGGER_ACCESS, new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode); } }); }
/** * <p> * Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>. * </p> * * @param newJob The <code>JobDetail</code> to be stored. * @param newTrigger The <code>Trigger</code> to be stored. * @throws ObjectAlreadyExistsException if a <code>Job</code> with the same name/group already exists. */ @Override public void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger) throws JobPersistenceException { lock(); try { storeJob(newJob, false); storeTrigger(newTrigger, false); } finally { unlock(); } }
/** * <p> * Remove (delete) the <code>{@link org.quartz.Job}</code> with the given name, and any * <code>{@link org.quartz.Trigger}</code> s that reference it. * </p> * * @param jobKey The key of the <code>Job</code> to be removed. * @return <code>true</code> if a <code>Job</code> with the given name & group was found and removed from the store. */ @Override public boolean removeJob(JobKey jobKey) throws JobPersistenceException { boolean found = false; lock(); try { List<OperableTrigger> trigger = getTriggersForJob(jobKey); for (OperableTrigger trig : trigger) { this.removeTrigger(trig.getKey()); found = true; } found = (jobFacade.remove(jobKey) != null) | found; if (found) { Set<String> grpSet = toolkitDSHolder.getOrCreateJobsGroupMap(jobKey.getGroup()); grpSet.remove(jobKey.getName()); if (grpSet.isEmpty()) { toolkitDSHolder.removeJobsGroupMap(jobKey.getGroup()); jobFacade.removeGroup(jobKey.getGroup()); } } } finally { unlock(); } return found; }
@Override public boolean removeTriggers(List<TriggerKey> triggerKeys) throws JobPersistenceException { boolean allFound = true; lock(); try { for (TriggerKey key : triggerKeys) allFound = removeTrigger(key) && allFound; } finally { unlock(); } return allFound; }