@BeforeClass public void setUp() throws SchedulerException, InterruptedException { fSignaler = new SampleSignaler(); Config config = new Config(); config.setProperty("hazelcast.logging.type", "slf4j"); hazelcastInstance = Hazelcast.newHazelcastInstance(config); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); HazelcastJobStore.setHazelcastClient(hazelcastInstance); jobStore = createJobStore("AbstractJobStoreTest"); jobStore.initialize(loadHelper, this.fSignaler); jobStore.schedulerStarted(); jobDetail = JobBuilder.newJob(NoOpJob.class).withIdentity("job1", "jobGroup1").build(); jobStore.storeJob(jobDetail, false); }
@Test public void testAcquireNextTriggerAfterMissFire_doesNotTrigger_ifNextScheduleTimeOutOfRange() throws Exception { long baseFireTime = newDate().build().getTime(); JobDetail job = JobBuilder.newJob(NoOpJob.class).build(); jobStore.storeJob(job, true); ScheduleBuilder scheduleBuilder = simpleSchedule().withIntervalInSeconds(3).repeatForever() .withMisfireHandlingInstructionNextWithExistingCount(); OperableTrigger t1 = buildAndComputeTrigger("trigger1", "triggerGroup1", job, baseFireTime + 500, null, scheduleBuilder); jobStore.storeTrigger(t1, false); assertAcquiredAndRelease(baseFireTime, 1); Thread.sleep(5000); // missed one execution (3 seconds tick is more than 1 seconds ago), next execution (at 6 seconds tick) is not yet picked up assertAcquiredAndRelease(newDate().build().getTime() + 250, 0); // try acquire on larger interval (containing 6 sec tick) assertAcquiredAndRelease(newDate().build().getTime() + 1550, 1); }
@Test public void testStoreAndRetrieveJobs() throws Exception { final int nJobs = 10; SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetrieveJobs"); store.initialize(loadHelper, schedSignaler); // Store jobs. for (int i = 0; i < nJobs; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); store.storeJob(job, false); } // Retrieve jobs. for (int i = 0; i < nJobs; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(storedJob.getKey(), jobKey); } }
@Test() public void testStoreAndRetrieveJobs() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetrieveJobs"); store.initialize(loadHelper, schedSignaler); // Store jobs. for (int i = 0; i < 10; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); store.storeJob(job, false); } // Retrieve jobs. for (int i = 0; i < 10; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(jobKey, storedJob.getKey()); } }
@BeforeClass public void setUp() throws SchedulerException, InterruptedException { Config config = new Config(); config.setProperty("hazelcast.logging.type", "slf4j"); hazelcastInstance = Hazelcast.newHazelcastInstance(config); this.fSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); this.jobStore = createJobStore("AbstractJobStoreTest"); this.jobStore.initialize(loadHelper, this.fSignaler); this.jobStore.schedulerStarted(); this.fJobDetail = new JobDetailImpl("job1", "jobGroup1", NoOpJob.class); this.fJobDetail.setDurability(true); this.jobStore.storeJob(this.fJobDetail, false); }
@Test public void testAcquireNextTrigger() throws Exception { long baseFireTime = DateBuilder.newDate().build().getTime(); JobDetail job = JobBuilder.newJob(NoOpJob.class).build(); jobStore.storeJob(job, true); OperableTrigger t1 = buildAndComputeTrigger("trigger1", "testAcquireNextTrigger", job, baseFireTime + 2000); OperableTrigger t2 = buildAndComputeTrigger("trigger2", "testAcquireNextTrigger", job, baseFireTime + 500); OperableTrigger t3 = buildAndComputeTrigger("trigger3", "testAcquireNextTrigger", job, baseFireTime + 1000); assertTrue(jobStore.acquireNextTriggers(baseFireTime, 1, 0L).isEmpty()); jobStore.storeTrigger(t1, false); assertEquals(jobStore.acquireNextTriggers(baseFireTime + 2000, 1, 0L).get(0), t1); jobStore.storeTrigger(t2, false); assertEquals(jobStore.acquireNextTriggers(baseFireTime + 600, 1, 0L).get(0), t2); assertTrue(jobStore.acquireNextTriggers(baseFireTime + 600, 1, 0L).isEmpty()); jobStore.storeTrigger(t3, false); assertEquals(jobStore.acquireNextTriggers(baseFireTime + 5000, 1, 0L).get(0), t3); // release trigger3 jobStore.releaseAcquiredTrigger(t3); assertEquals(jobStore.acquireNextTriggers(t3.getNextFireTime().getTime() + 5000, 1, 1L).get(0), t3); assertTrue(jobStore.acquireNextTriggers(baseFireTime + 10000, 1, 0L).isEmpty()); jobStore.removeTrigger(t1.getKey()); jobStore.removeTrigger(t2.getKey()); jobStore.removeTrigger(t3.getKey()); }
@Test public void testAcquireNextTriggerAfterMissFire_triggersImmediately_ifNextScheduleTimeInRange() throws Exception { long baseFireTime = newDate().build().getTime(); JobDetail job = JobBuilder.newJob(NoOpJob.class).build(); jobStore.storeJob(job, true); SimpleScheduleBuilder scheduleBuilder = simpleSchedule().withIntervalInSeconds(3).repeatForever() .withMisfireHandlingInstructionFireNow(); OperableTrigger t1 = buildAndComputeTrigger("trigger1", "triggerGroup1", job, baseFireTime + 500, null, scheduleBuilder); jobStore.storeTrigger(t1, false); assertAcquiredAndRelease(baseFireTime, 1); Thread.sleep(5000); // missed one execution, next execution is immediate assertAcquiredAndRelease(newDate().build().getTime() + 500, 1); Thread.sleep(1000); // next execution is at 8 seconds tick (5 + 3) outside interval (6 sec to 7 sec tick), no triggers should be acquired assertAcquiredAndRelease(newDate().build().getTime() + 1050, 0); // increase interval to contain 8 seconds tick assertAcquiredAndRelease(newDate().build().getTime() + 2550, 1); }
@Test public void testStoreAndRetriveTriggers() throws Exception { final int nJobs = 10; SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetriveTriggers"); store.initialize(loadHelper, schedSignaler); // Store jobs and triggers. for (int i = 0; i < nJobs; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); store.storeJob(job, true); OperableTrigger trigger = buildTrigger("job" + i, DEFAULT_GROUP, job); store.storeTrigger((OperableTrigger) trigger, true); } // Retrieve jobs and triggers. for (int i = 0; i < nJobs; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(storedJob.getKey(), jobKey); TriggerKey triggerKey = TriggerKey.triggerKey("job" + i); OperableTrigger storedTrigger = store.retrieveTrigger(triggerKey); Assert.assertEquals(storedTrigger.getKey(), triggerKey); } }
@Test public void testTriggersFired() throws Exception { long baseFireTime = DateBuilder.newDate().build().getTime(); JobDetail newJob = JobBuilder.newJob(NoOpJob.class).withIdentity("job1", "testTriggersFired").build(); jobStore.storeJob(newJob, false); OperableTrigger trigger1 = buildAndComputeTrigger("triggerFired1", "triggerFiredGroup", newJob, baseFireTime + 100, baseFireTime + 100); jobStore.storeTrigger(trigger1, false); long firstFireTime = new Date(trigger1.getNextFireTime().getTime()).getTime(); List<OperableTrigger> acquiredTriggers = jobStore.acquireNextTriggers(firstFireTime + 500, 1, 0L); assertEquals(acquiredTriggers.size(), 1); List<TriggerFiredResult> triggerFired = jobStore.triggersFired(acquiredTriggers); assertEquals(triggerFired.size(), 1); assertTrue(jobStore.checkExists(trigger1.getKey())); assertEquals(jobStore.getTriggerState(trigger1.getKey()), Trigger.TriggerState.COMPLETE); jobStore.removeTrigger(trigger1.getKey()); }
@Test() public void testStoreTriggerReplacesTrigger() throws Exception { String jobName = "StoreTriggerReplacesTrigger"; String jobGroup = "StoreTriggerReplacesTriggerGroup"; JobDetailImpl detail = new JobDetailImpl(jobName, jobGroup, NoOpJob.class); jobStore.storeJob(detail, false); String trName = "StoreTriggerReplacesTrigger"; String trGroup = "StoreTriggerReplacesTriggerGroup"; OperableTrigger tr = new SimpleTriggerImpl(trName, trGroup, new Date()); tr.setJobKey(new JobKey(jobName, jobGroup)); tr.setCalendarName(null); jobStore.storeTrigger(tr, false); assertEquals(tr, jobStore.retrieveTrigger(tr.getKey())); try { jobStore.storeTrigger(tr, false); fail("an attempt to store duplicate trigger succeeded"); } catch (ObjectAlreadyExistsException oaee) { // expected } tr.setCalendarName("QQ"); jobStore.storeTrigger(tr, true); // fails here assertEquals(tr, jobStore.retrieveTrigger(tr.getKey())); assertEquals("QQ", jobStore.retrieveTrigger(tr.getKey()).getCalendarName(), "StoreJob doesn't replace triggers"); }
@Test() public void testStoreAndRetriveTriggers() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetriveTriggers"); store.initialize(loadHelper, schedSignaler); // Store jobs and triggers. for (int i = 0; i < 10; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); store.storeJob(job, true); SimpleScheduleBuilder schedule = SimpleScheduleBuilder.simpleSchedule(); Trigger trigger = TriggerBuilder.newTrigger().withIdentity("job" + i) .withSchedule(schedule).forJob(job).build(); store.storeTrigger((OperableTrigger) trigger, true); } // Retrieve job and trigger. for (int i = 0; i < 10; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(jobKey, storedJob.getKey()); TriggerKey triggerKey = TriggerKey.triggerKey("job" + i); Trigger storedTrigger = store.retrieveTrigger(triggerKey); Assert.assertEquals(triggerKey, storedTrigger.getKey()); } }
@Test public void testAcquireNextTriggerAfterMissFire() throws Exception { long oldThreshold = jobStore.getMisfireThreshold(); long baseFireTime = newDate().build().getTime(); JobDetail job = JobBuilder.newJob(NoOpJob.class).build(); jobStore.storeJob(job, true); jobStore.setMisfireThreshold(500); OperableTrigger t1 = buildAndComputeTrigger( "trigger1", "testAcquireNextTriggerAfterMissFire", job, baseFireTime + 500, null, SimpleScheduleBuilder.simpleSchedule().withMisfireHandlingInstructionFireNow()); OperableTrigger t2 = buildAndComputeTrigger( "trigger2", "testAcquireNextTriggerAfterMissFire", job, baseFireTime + 500, null, SimpleScheduleBuilder.simpleSchedule().withMisfireHandlingInstructionFireNow()); jobStore.storeTrigger(t1, false); jobStore.storeTrigger(t2, false); List<OperableTrigger> acquired = jobStore.acquireNextTriggers(baseFireTime + 1000, 1, 0L); assertEquals(acquired.size(), 1); jobStore.triggersFired(acquired); Thread.sleep(800); long now = newDate().build().getTime(); //misfired is acquired immediately assertEquals(jobStore.acquireNextTriggers(now + 1000, 1, 0L).size(), 1); jobStore.removeTrigger(t1.getKey()); jobStore.removeTrigger(t2.getKey()); jobStore.setMisfireThreshold(oldThreshold); }
@Test public void testTriggerStates() throws Exception { JobDetail newJob = JobBuilder.newJob(NoOpJob.class).withIdentity("job1", "testTriggerStates").build(); jobStore.storeJob(newJob, false); OperableTrigger trigger = buildTrigger("trigger1", "testTriggerStates", newJob, DateBuilder.newDate().build().getTime() + 1000, DateBuilder.newDate().build().getTime() + 2000); trigger.computeFirstFireTime(null); assertEquals(jobStore.getTriggerState(trigger.getKey()), Trigger.TriggerState.NONE); jobStore.storeTrigger(trigger, false); assertEquals(jobStore.getTriggerState(trigger.getKey()), Trigger.TriggerState.NORMAL); jobStore.pauseTrigger(trigger.getKey()); assertEquals(jobStore.getTriggerState(trigger.getKey()), Trigger.TriggerState.PAUSED); jobStore.resumeTrigger(trigger.getKey()); assertEquals(jobStore.getTriggerState(trigger.getKey()), Trigger.TriggerState.NORMAL); OperableTrigger rt1 = jobStore.acquireNextTriggers( new Date(trigger.getNextFireTime().getTime()).getTime() + 10000, 1, 1L) .get(0); assertNotNull(rt1); jobStore.releaseAcquiredTrigger(rt1); OperableTrigger rt2 = jobStore.acquireNextTriggers( new Date(rt1.getNextFireTime().getTime()).getTime() + 1500, 1, 1L) .get(0); assertNotNull(rt2); assertEquals(rt2.getJobKey(), rt1.getJobKey()); assertTrue(jobStore.acquireNextTriggers(new Date(rt2.getNextFireTime().getTime()).getTime() + 1500, 1, 1L) .isEmpty()); }
@Test public void testAcquireTriggers() throws Exception { final int nJobs = 10; SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggers"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < nJobs; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder.repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) newTrigger() .withIdentity("job" + i) .withSchedule(schedule).forJob(job) .startAt(startTime) .build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertNotNull(fireTime); store.storeJobAndTrigger(job, trigger); } // Test acquire one trigger at a time for (int i = 0; i < nJobs; i++) { long noLaterThan = (startTime0.getTime() + i * MIN); int maxCount = 1; long timeWindow = 0; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(triggers.size(), 1); Assert.assertEquals(triggers.get(0).getKey().getName(), "job" + i); // Let's remove the trigger now. store.removeJob(triggers.get(0).getJobKey()); } }
@Test public void testAcquireTriggersInBatch() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggersInBatch"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < 10; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder.repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) newTrigger() .withIdentity("job" + i) .withSchedule(schedule) .forJob(job) .startAt(startTime) .build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertNotNull(fireTime); store.storeJobAndTrigger(job, trigger); } // Test acquire batch of triggers at a time long noLaterThan = startTime0.getTime() + 10 * MIN; int maxCount = 7; // time window needs to be big to be able to pick up multiple triggers when // they are a minute apart long timeWindow = 8 * MIN; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(triggers.size(), 7); }
@Test() public void testAcquireTriggers() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggers"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < 10; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder .repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) TriggerBuilder.newTrigger() .withIdentity("job" + i).withSchedule(schedule).forJob(job) .startAt(startTime).build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertEquals(true, fireTime != null); store.storeJobAndTrigger(job, trigger); } // Test acquire one trigger at a time for (int i = 0; i < 10; i++) { long noLaterThan = (startTime0.getTime() + i * MIN); int maxCount = 1; long timeWindow = 0; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(1, triggers.size()); Assert.assertEquals("job" + i, triggers.get(0).getKey().getName()); // Let's remove the trigger now. store.removeJob(triggers.get(0).getJobKey()); } }
@Test() public void testAcquireTriggersInBatch() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggersInBatch"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < 10; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder .repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) TriggerBuilder.newTrigger() .withIdentity("job" + i).withSchedule(schedule).forJob(job) .startAt(startTime).build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertEquals(true, fireTime != null); store.storeJobAndTrigger(job, trigger); } // Test acquire batch of triggers at a time long noLaterThan = startTime0.getTime() + 10 * MIN; int maxCount = 7; // time window needs to be big to be able to pick up multiple triggers when // they are a minute apart long timeWindow = 8 * MIN; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(7, triggers.size()); }