@Test @SuppressWarnings("unchecked") public void triggersFired() throws Exception { // store some jobs with triggers Map<JobDetail, Set<? extends Trigger>> jobsAndTriggers = getJobsAndTriggers(2, 2, 2, 2, "* * * * * ?"); jobStore.storeCalendar("testCalendar", new WeeklyCalendar(), false, true); jobStore.storeJobsAndTriggers(jobsAndTriggers, false); List<OperableTrigger> acquiredTriggers = jobStore.acquireNextTriggers(System.currentTimeMillis() - 1000, 500, 4000); assertThat(acquiredTriggers, hasSize(16)); // ensure that all triggers are in the NORMAL state and have been ACQUIRED for (Map.Entry<JobDetail, Set<? extends Trigger>> jobDetailSetEntry : jobsAndTriggers.entrySet()) { for (Trigger trigger : jobDetailSetEntry.getValue()) { assertEquals(Trigger.TriggerState.NORMAL, jobStore.getTriggerState(trigger.getKey())); String triggerHashKey = schema.triggerHashKey(trigger.getKey()); assertThat(jedis.zscore(schema.triggerStateKey(RedisTriggerState.ACQUIRED), triggerHashKey), not(nullValue())); } } Set<? extends OperableTrigger> triggers = (Set<? extends OperableTrigger>) new ArrayList<>(jobsAndTriggers.entrySet()).get(0).getValue(); List<TriggerFiredResult> triggerFiredResults = jobStore.triggersFired(new ArrayList<>(triggers)); assertThat(triggerFiredResults, hasSize(4)); }
@Override public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) throws JobPersistenceException { if (logger.isInfoEnabled()) { logger.info("Triggers fired " + triggers.size()); logger.trace(triggers); } List<CouchDbTrigger> couchdbTriggers = fetchCouchDbTriggers(triggers); Map<String, Calendar> triggerCalendars = fetchCalendars(triggers); Map<JobKey, JobDetail> jobDetailMap = fetchJobDetails(triggers); List<TriggerFiredResult> firedResults = new ArrayList<TriggerFiredResult>(); List<CouchDbTrigger> firedTriggers = triggerStore.triggersFired(couchdbTriggers, triggerCalendars); for (CouchDbTrigger firedTrigger : firedTriggers) { Date prevFireTime = find(couchdbTriggers, firedTrigger.getKey()).getPreviousFireTime(); Calendar calendar = triggerCalendars.get(firedTrigger.getCalendarName()); JobDetail job = jobDetailMap.get(firedTrigger.getJobKey()); TriggerFiredBundle triggerFiredBundle = buildTriggerFiredBundle(firedTrigger, prevFireTime, calendar, job); firedResults.add(new TriggerFiredResult(triggerFiredBundle)); } return firedResults; }
@Test public void shouldFireTriggers() throws JobPersistenceException { final String triggerName = id("fuuid1"); SimpleTriggerImpl trigger = (SimpleTriggerImpl) newTrigger() .withIdentity(triggerName, "borgroup1") .forJob(JobKey.jobKey("fooid", "bargroup")) .startAt(new Date(2010 - 1900, 10, 20)) .withSchedule(simpleSchedule() .withIntervalInMinutes(2) .repeatForever()) .build(); trigger.computeFirstFireTime(null); couchdbStore.storeTrigger(trigger, false); List<TriggerFiredResult> firedResults = couchdbStore.triggersFired(Arrays.<OperableTrigger>asList(trigger)); assertEquals(1, firedResults.size()); assertEquals(TriggerKey.triggerKey(triggerName, "borgroup1"), firedResults.get(0).getTriggerFiredBundle().getTrigger().getKey()); }
@Override public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) throws JobPersistenceException { try { return realJobStore.triggersFired(triggers); } catch (RejoinException e) { throw new JobPersistenceException("Trigger fire marking failed due to client rejoin", e); } }
@Test public void testTriggersFired() throws Exception { long baseFireTime = DateBuilder.newDate().build().getTime(); JobDetail newJob = JobBuilder.newJob(NoOpJob.class).withIdentity("job1", "testTriggersFired").build(); jobStore.storeJob(newJob, false); OperableTrigger trigger1 = buildAndComputeTrigger("triggerFired1", "triggerFiredGroup", newJob, baseFireTime + 100, baseFireTime + 100); jobStore.storeTrigger(trigger1, false); long firstFireTime = new Date(trigger1.getNextFireTime().getTime()).getTime(); List<OperableTrigger> acquiredTriggers = jobStore.acquireNextTriggers(firstFireTime + 500, 1, 0L); assertEquals(acquiredTriggers.size(), 1); List<TriggerFiredResult> triggerFired = jobStore.triggersFired(acquiredTriggers); assertEquals(triggerFired.size(), 1); assertTrue(jobStore.checkExists(trigger1.getKey())); assertEquals(jobStore.getTriggerState(trigger1.getKey()), Trigger.TriggerState.COMPLETE); jobStore.removeTrigger(trigger1.getKey()); }
@Override public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { return clusteredJobStore.triggersFired(triggers); }
/** * <p> * Inform the <code>JobStore</code> that the scheduler is now firing the * given <code>Trigger</code> (executing its associated <code>Job</code>), * that it had previously acquired (reserved). * </p> */ public List<TriggerFiredResult> triggersFired(List<OperableTrigger> firedTriggers) { synchronized (lock) { List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>(); for (OperableTrigger trigger : firedTriggers) { TriggerWrapper tw = triggersByKey.get(trigger.getKey()); // was the trigger deleted since being acquired? if (tw == null || tw.trigger == null) { continue; } // was the trigger completed, paused, blocked, etc. since being acquired? if (tw.state != TriggerWrapper.STATE_ACQUIRED) { continue; } Calendar cal = null; if (tw.trigger.getCalendarName() != null) { cal = retrieveCalendar(tw.trigger.getCalendarName()); if(cal == null) continue; } Date prevFireTime = trigger.getPreviousFireTime(); // in case trigger was replaced between acquiring and firing timeTriggers.remove(tw); // call triggered on our copy, and the scheduler's copy tw.trigger.triggered(cal); trigger.triggered(cal); //tw.state = TriggerWrapper.STATE_EXECUTING; tw.state = TriggerWrapper.STATE_WAITING; TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob( tw.jobKey), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); JobDetail job = bndle.getJobDetail(); if (job.isConcurrentExectionDisallowed()) { ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(job.getKey()); for (TriggerWrapper ttw : trigs) { if (ttw.state == TriggerWrapper.STATE_WAITING) { ttw.state = TriggerWrapper.STATE_BLOCKED; } if (ttw.state == TriggerWrapper.STATE_PAUSED) { ttw.state = TriggerWrapper.STATE_PAUSED_BLOCKED; } timeTriggers.remove(ttw); } blockedJobs.add(job.getKey()); } else if (tw.trigger.getNextFireTime() != null) { synchronized (lock) { timeTriggers.add(tw); } } results.add(new TriggerFiredResult(bndle)); } return results; } }
/** * Inform the <code>JobStore</code> that the scheduler is now firing the * given <code>Trigger</code> (executing its associated <code>Job</code>), * that it had previously acquired (reserved). * * @param triggers a list of triggers * @param jedis a thread-safe Redis connection * @return may return null if all the triggers or their calendars no longer exist, or * if the trigger was not successfully put into the 'executing' * state. Preference is to return an empty list if none of the triggers * could be fired. */ @Override public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, JedisCluster jedis) throws JobPersistenceException, ClassNotFoundException { List<TriggerFiredResult> results = new ArrayList<>(); for (OperableTrigger trigger : triggers) { final String triggerHashKey = redisSchema.triggerHashKey(trigger.getKey()); logger.debug(String.format("Trigger %s fired.", triggerHashKey)); Boolean triggerExistsResponse = jedis.exists(triggerHashKey); Double triggerAcquiredResponse = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.ACQUIRED), triggerHashKey); if (!triggerExistsResponse || triggerAcquiredResponse == null) { // the trigger does not exist or the trigger is not acquired if (!triggerExistsResponse) { logger.debug(String.format("Trigger %s does not exist.", triggerHashKey)); } else { logger.debug(String.format("Trigger %s was not acquired.", triggerHashKey)); } continue; } Calendar calendar = null; final String calendarName = trigger.getCalendarName(); if (calendarName != null) { calendar = retrieveCalendar(calendarName, jedis); if (calendar == null) { continue; } } final Date previousFireTime = trigger.getPreviousFireTime(); trigger.triggered(calendar); JobDetail job = retrieveJob(trigger.getJobKey(), jedis); TriggerFiredBundle triggerFiredBundle = new TriggerFiredBundle(job, trigger, calendar, false, new Date(), previousFireTime, previousFireTime, trigger.getNextFireTime()); // handling jobs for which concurrent execution is disallowed if (isJobConcurrentExecutionDisallowed(job.getJobClass())) { final String jobHashKey = redisSchema.jobHashKey(trigger.getJobKey()); final String jobTriggerSetKey = redisSchema.jobTriggersSetKey(job.getKey()); for (String nonConcurrentTriggerHashKey : jedis.smembers(jobTriggerSetKey)) { Double score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.WAITING), nonConcurrentTriggerHashKey); if (score != null) { setTriggerState(RedisTriggerState.BLOCKED, score, nonConcurrentTriggerHashKey, jedis); } else { score = jedis.zscore(redisSchema.triggerStateKey(RedisTriggerState.PAUSED), nonConcurrentTriggerHashKey); if (score != null) { setTriggerState(RedisTriggerState.PAUSED_BLOCKED, score, nonConcurrentTriggerHashKey, jedis); } } } jedis.set(redisSchema.jobBlockedKey(job.getKey()), schedulerInstanceId); jedis.sadd(redisSchema.blockedJobsSet(), jobHashKey); } // release the fired trigger if (trigger.getNextFireTime() != null) { final long nextFireTime = trigger.getNextFireTime().getTime(); jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, Long.toString(nextFireTime)); logger.debug(String.format("Releasing trigger %s with next fire time %s. Setting state to WAITING.", triggerHashKey, nextFireTime)); setTriggerState(RedisTriggerState.WAITING, (double) nextFireTime, triggerHashKey, jedis); } else { jedis.hset(triggerHashKey, TRIGGER_NEXT_FIRE_TIME, ""); unsetTriggerState(triggerHashKey, jedis); } results.add(new TriggerFiredResult(triggerFiredBundle)); } return results; }
@Override public List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) throws JobPersistenceException { List<TriggerFiredResult> results = new ArrayList<>(); try (Jedis jedis = pool.getResource()) { lockPool.acquire(); for (OperableTrigger trigger : triggers) { String triggerHashKey = createTriggerHashKey(trigger.getKey().getGroup(), trigger.getKey().getName()); log.debug("trigger: " + triggerHashKey + " fired"); if (!jedis.exists(triggerHashKey)) continue; // the trigger does not exist if (jedis.zscore(RedisTriggerState.ACQUIRED.getKey(), triggerHashKey) == null) continue; // the trigger is not acquired Calendar cal = null; if (trigger.getCalendarName() != null) { String calendarName = trigger.getCalendarName(); cal = retrieveCalendar(calendarName, jedis); if(cal == null) continue; } Date prevFireTime = trigger.getPreviousFireTime(); trigger.triggered(cal); TriggerFiredBundle bundle = new TriggerFiredBundle(retrieveJob(trigger.getJobKey(), jedis), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); // handling job concurrent execution disallowed String jobHashKey = createJobHashKey(trigger.getJobKey().getGroup(), trigger.getJobKey().getName()); if (isJobConcurrentExectionDisallowed(jedis.hget(jobHashKey, JOB_CLASS))) { String jobTriggerSetKey = createJobTriggersSetKey(trigger.getJobKey().getGroup(), trigger.getJobKey().getName()); Set<String> nonConcurrentTriggerHashKeys = jedis.smembers(jobTriggerSetKey); for (String nonConcurrentTriggerHashKey : nonConcurrentTriggerHashKeys) { Double score = jedis.zscore(RedisTriggerState.WAITING.getKey(), nonConcurrentTriggerHashKey); if (score != null) { setTriggerState(RedisTriggerState.BLOCKED, score, nonConcurrentTriggerHashKey); } else { score = jedis.zscore(RedisTriggerState.PAUSED.getKey(), nonConcurrentTriggerHashKey); if (score != null) setTriggerState(RedisTriggerState.PAUSED_BLOCKED, score, nonConcurrentTriggerHashKey); } } jedis.hset(jobHashKey, BLOCKED_BY, instanceId); jedis.hset(jobHashKey, BLOCK_TIME, Long.toString(System.currentTimeMillis())); jedis.sadd(BLOCKED_JOBS_SET, jobHashKey); } // releasing the fired trigger if (trigger.getNextFireTime() != null) { jedis.hset(triggerHashKey, NEXT_FIRE_TIME, Long.toString(trigger.getNextFireTime().getTime())); setTriggerState(RedisTriggerState.WAITING, (double)trigger.getNextFireTime().getTime(), triggerHashKey); } else { jedis.hset(triggerHashKey, NEXT_FIRE_TIME, ""); unsetTriggerState(triggerHashKey); } results.add(new TriggerFiredResult(bundle)); } } catch (JobPersistenceException | ClassNotFoundException | InterruptedException ex) { log.error("could not acquire next triggers", ex); throw new JobPersistenceException(ex.getMessage(), ex.getCause()); } finally { lockPool.release(); } return results; }
@Override public List<TriggerFiredResult> triggersFired( List<OperableTrigger> firedTriggers) throws JobPersistenceException { List<TriggerFiredResult> results = new ArrayList<>(); for (OperableTrigger trigger : firedTriggers) { TriggerWrapper tw = triggersByKey.get(trigger.getKey()); // was the trigger deleted since being acquired? if (tw == null || tw.trigger == null) { continue; } // was the trigger completed, paused, blocked, etc. since being acquired? if (tw.getState() != ACQUIRED) { continue; } Calendar cal = null; if (tw.trigger.getCalendarName() != null) { cal = retrieveCalendar(tw.trigger.getCalendarName()); if (cal == null) { continue; } } Date prevFireTime = trigger.getPreviousFireTime(); // call triggered on our copy, and the scheduler's copy tw.trigger.triggered(cal); trigger.triggered(cal); // tw.state = TriggerWrapper.STATE_EXECUTING; TriggerFiredBundle bndle = new TriggerFiredBundle(retrieveJob(tw.jobKey), trigger, cal, false, new Date(), trigger.getPreviousFireTime(), prevFireTime, trigger.getNextFireTime()); JobDetail job = bndle.getJobDetail(); if (job.isConcurrentExectionDisallowed()) { ArrayList<TriggerWrapper> trigs = getTriggerWrappersForJob(job.getKey()); for (TriggerWrapper ttw : trigs) { if (ttw.getState() == WAITING) { ttw = newTriggerWrapper(ttw, BLOCKED); } if (ttw.getState() == PAUSED) { ttw = newTriggerWrapper(ttw, BLOCKED); } } } results.add(new TriggerFiredResult(bndle)); } return results; }
@Override public List<TriggerFiredResult> triggersFired(List<OperableTrigger> operableTriggers) throws JobPersistenceException { return null; //To change body of implemented methods use File | Settings | File Templates. }
/** * Inform the <code>JobStore</code> that the scheduler is now firing the * given <code>Trigger</code> (executing its associated <code>Job</code>), * that it had previously acquired (reserved). * @param triggers a list of triggers * @param jedis a thread-safe Redis connection * @return may return null if all the triggers or their calendars no longer exist, or * if the trigger was not successfully put into the 'executing' * state. Preference is to return an empty list if none of the triggers * could be fired. */ public abstract List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers, T jedis) throws JobPersistenceException, ClassNotFoundException;