@Inject public QuartzSchedulerSPI(final EventManager eventManager, final NodeAccess nodeAccess, final Provider<JobStore> jobStoreProvider, final JobFactory jobFactory, @Named("${nexus.quartz.poolSize:-20}") final int threadPoolSize) throws Exception { this.eventManager = checkNotNull(eventManager); this.nodeAccess = checkNotNull(nodeAccess); this.jobStoreProvider = checkNotNull(jobStoreProvider); this.jobFactory = checkNotNull(jobFactory); checkArgument(threadPoolSize > 0, "Invalid thread-pool size: %s", threadPoolSize); this.threadPoolSize = threadPoolSize; log.info("Thread-pool size: {}", threadPoolSize); this.scheduleFactory = new QuartzScheduleFactory(); this.triggerConverter = new QuartzTriggerConverter(this.scheduleFactory); // FIXME: sort out with refinement to lifecycle below this.active = true; }
@Test public void testStoreAndRetrieveJobs() throws Exception { final int nJobs = 10; SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetrieveJobs"); store.initialize(loadHelper, schedSignaler); // Store jobs. for (int i = 0; i < nJobs; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); store.storeJob(job, false); } // Retrieve jobs. for (int i = 0; i < nJobs; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(storedJob.getKey(), jobKey); } }
public static void initSched(int threadCount) { try { SimpleThreadPool threadPool = new SimpleThreadPool(threadCount, Thread.NORM_PRIORITY); threadPool.initialize(); // create the job store JobStore jobStore = new RAMJobStore(); DirectSchedulerFactory.getInstance().createScheduler(threadPool, jobStore); sched = DirectSchedulerFactory.getInstance().getScheduler(); //init heart sched SimpleThreadPool threadPoolHeart = new SimpleThreadPool(1, Thread.NORM_PRIORITY); threadPoolHeart.initialize(); JobStore jobStoreHeart = new RAMJobStore(); DirectSchedulerFactory.getInstance().createScheduler("HeartScheduler", "SIMPLE_NON_CLUSTERED", threadPoolHeart, jobStoreHeart); heartSched = DirectSchedulerFactory.getInstance().getScheduler("HeartScheduler"); } catch (Exception e) { log.error("error init sched", e); } }
@Test() public void testStoreAndRetrieveJobs() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetrieveJobs"); store.initialize(loadHelper, schedSignaler); // Store jobs. for (int i = 0; i < 10; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); store.storeJob(job, false); } // Retrieve jobs. for (int i = 0; i < 10; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(jobKey, storedJob.getKey()); } }
@Override protected Scheduler instantiate(QuartzSchedulerResources rsrcs, QuartzScheduler qs) { Scheduler scheduler = super.instantiate(rsrcs, qs); JobStore jobStore = rsrcs.getJobStore(); if (jobStore instanceof SchedulerAware) { ((SchedulerAware) jobStore).setScheduler(scheduler); } return scheduler; }
/** * Creates an in memory job store (<code>{@link RAMJobStore}</code>) * The thread priority is set to Thread.NORM_PRIORITY * * @param maxThreads * The number of threads in the thread pool * @throws SchedulerException * if initialization failed. */ public void createVolatileScheduler(int maxThreads) throws SchedulerException { SimpleThreadPool threadPool = new SimpleThreadPool(maxThreads, Thread.NORM_PRIORITY); threadPool.initialize(); JobStore jobStore = new RAMJobStore(); this.createScheduler(threadPool, jobStore); }
@Test public void testStoreAndRetriveTriggers() throws Exception { final int nJobs = 10; SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetriveTriggers"); store.initialize(loadHelper, schedSignaler); // Store jobs and triggers. for (int i = 0; i < nJobs; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); store.storeJob(job, true); OperableTrigger trigger = buildTrigger("job" + i, DEFAULT_GROUP, job); store.storeTrigger((OperableTrigger) trigger, true); } // Retrieve jobs and triggers. for (int i = 0; i < nJobs; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(storedJob.getKey(), jobKey); TriggerKey triggerKey = TriggerKey.triggerKey("job" + i); OperableTrigger storedTrigger = store.retrieveTrigger(triggerKey); Assert.assertEquals(storedTrigger.getKey(), triggerKey); } }
@Test() public void testStoreAndRetriveTriggers() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testStoreAndRetriveTriggers"); store.initialize(loadHelper, schedSignaler); // Store jobs and triggers. for (int i = 0; i < 10; i++) { JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); store.storeJob(job, true); SimpleScheduleBuilder schedule = SimpleScheduleBuilder.simpleSchedule(); Trigger trigger = TriggerBuilder.newTrigger().withIdentity("job" + i) .withSchedule(schedule).forJob(job).build(); store.storeTrigger((OperableTrigger) trigger, true); } // Retrieve job and trigger. for (int i = 0; i < 10; i++) { JobKey jobKey = JobKey.jobKey("job" + i); JobDetail storedJob = store.retrieveJob(jobKey); Assert.assertEquals(jobKey, storedJob.getKey()); TriggerKey triggerKey = TriggerKey.triggerKey("job" + i); Trigger storedTrigger = store.retrieveTrigger(triggerKey); Assert.assertEquals(triggerKey, storedTrigger.getKey()); } }
@Test public void testAcquireTriggers() throws Exception { final int nJobs = 10; SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggers"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < nJobs; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder.repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) newTrigger() .withIdentity("job" + i) .withSchedule(schedule).forJob(job) .startAt(startTime) .build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertNotNull(fireTime); store.storeJobAndTrigger(job, trigger); } // Test acquire one trigger at a time for (int i = 0; i < nJobs; i++) { long noLaterThan = (startTime0.getTime() + i * MIN); int maxCount = 1; long timeWindow = 0; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(triggers.size(), 1); Assert.assertEquals(triggers.get(0).getKey().getName(), "job" + i); // Let's remove the trigger now. store.removeJob(triggers.get(0).getJobKey()); } }
@Test public void testAcquireTriggersInBatch() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggersInBatch"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < 10; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i).build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder.repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) newTrigger() .withIdentity("job" + i) .withSchedule(schedule) .forJob(job) .startAt(startTime) .build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertNotNull(fireTime); store.storeJobAndTrigger(job, trigger); } // Test acquire batch of triggers at a time long noLaterThan = startTime0.getTime() + 10 * MIN; int maxCount = 7; // time window needs to be big to be able to pick up multiple triggers when // they are a minute apart long timeWindow = 8 * MIN; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(triggers.size(), 7); }
@Test() public void testAcquireTriggers() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggers"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < 10; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder .repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) TriggerBuilder.newTrigger() .withIdentity("job" + i).withSchedule(schedule).forJob(job) .startAt(startTime).build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertEquals(true, fireTime != null); store.storeJobAndTrigger(job, trigger); } // Test acquire one trigger at a time for (int i = 0; i < 10; i++) { long noLaterThan = (startTime0.getTime() + i * MIN); int maxCount = 1; long timeWindow = 0; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(1, triggers.size()); Assert.assertEquals("job" + i, triggers.get(0).getKey().getName()); // Let's remove the trigger now. store.removeJob(triggers.get(0).getJobKey()); } }
@Test() public void testAcquireTriggersInBatch() throws Exception { SchedulerSignaler schedSignaler = new SampleSignaler(); ClassLoadHelper loadHelper = new CascadingClassLoadHelper(); loadHelper.initialize(); JobStore store = createJobStore("testAcquireTriggersInBatch"); store.initialize(loadHelper, schedSignaler); // Setup: Store jobs and triggers. long MIN = 60 * 1000L; Date startTime0 = new Date(System.currentTimeMillis() + MIN); // a min from // now. for (int i = 0; i < 10; i++) { Date startTime = new Date(startTime0.getTime() + i * MIN); // a min apart JobDetail job = JobBuilder.newJob(NoOpJob.class).withIdentity("job" + i) .build(); SimpleScheduleBuilder schedule = SimpleScheduleBuilder .repeatMinutelyForever(2); OperableTrigger trigger = (OperableTrigger) TriggerBuilder.newTrigger() .withIdentity("job" + i).withSchedule(schedule).forJob(job) .startAt(startTime).build(); // Manually trigger the first fire time computation that scheduler would // do. Otherwise // the store.acquireNextTriggers() will not work properly. Date fireTime = trigger.computeFirstFireTime(null); Assert.assertEquals(true, fireTime != null); store.storeJobAndTrigger(job, trigger); } // Test acquire batch of triggers at a time long noLaterThan = startTime0.getTime() + 10 * MIN; int maxCount = 7; // time window needs to be big to be able to pick up multiple triggers when // they are a minute apart long timeWindow = 8 * MIN; List<OperableTrigger> triggers = store.acquireNextTriggers(noLaterThan, maxCount, timeWindow); Assert.assertEquals(7, triggers.size()); }
/** * Same as * {@link DirectSchedulerFactory#createScheduler(ThreadPool threadPool, JobStore jobStore)}, * with the addition of specifying the scheduler name and instance ID. This * scheduler can only be retrieved via * {@link DirectSchedulerFactory#getScheduler(String)} * * @param schedulerName * The name for the scheduler. * @param schedulerInstanceId * The instance ID for the scheduler. * @param threadPool * The thread pool for executing jobs * @param jobStore * The type of job store * @throws SchedulerException * if initialization failed */ public void createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool, JobStore jobStore) throws SchedulerException { createScheduler(schedulerName, schedulerInstanceId, threadPool, jobStore, null, 0, -1, -1); }
/** * Creates a scheduler using the specified thread pool and job store and * binds it to RMI. * * @param schedulerName * The name for the scheduler. * @param schedulerInstanceId * The instance ID for the scheduler. * @param threadPool * The thread pool for executing jobs * @param jobStore * The type of job store * @param rmiRegistryHost * The hostname to register this scheduler with for RMI. Can use * "null" if no RMI is required. * @param rmiRegistryPort * The port for RMI. Typically 1099. * @param idleWaitTime * The idle wait time in milliseconds. You can specify "-1" for * the default value, which is currently 30000 ms. * @throws SchedulerException * if initialization failed */ public void createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool, JobStore jobStore, String rmiRegistryHost, int rmiRegistryPort, long idleWaitTime, long dbFailureRetryInterval) throws SchedulerException { createScheduler(schedulerName, schedulerInstanceId, threadPool, jobStore, null, // plugins rmiRegistryHost, rmiRegistryPort, idleWaitTime, dbFailureRetryInterval, DEFAULT_JMX_EXPORT, DEFAULT_JMX_OBJECTNAME); }
/** * Creates a scheduler using the specified thread pool, job store, and * plugins, and binds it to RMI. * * @param schedulerName * The name for the scheduler. * @param schedulerInstanceId * The instance ID for the scheduler. * @param threadPool * The thread pool for executing jobs * @param jobStore * The type of job store * @param schedulerPluginMap * Map from a <code>String</code> plugin names to * <code>{@link org.quartz.spi.SchedulerPlugin}</code>s. Can use * "null" if no plugins are required. * @param rmiRegistryHost * The hostname to register this scheduler with for RMI. Can use * "null" if no RMI is required. * @param rmiRegistryPort * The port for RMI. Typically 1099. * @param idleWaitTime * The idle wait time in milliseconds. You can specify "-1" for * the default value, which is currently 30000 ms. * @throws SchedulerException * if initialization failed */ public void createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool, JobStore jobStore, Map<String, SchedulerPlugin> schedulerPluginMap, String rmiRegistryHost, int rmiRegistryPort, long idleWaitTime, long dbFailureRetryInterval, boolean jmxExport, String jmxObjectName) throws SchedulerException { createScheduler(schedulerName, schedulerInstanceId, threadPool, DEFAULT_THREAD_EXECUTOR, jobStore, schedulerPluginMap, rmiRegistryHost, rmiRegistryPort, idleWaitTime, dbFailureRetryInterval, jmxExport, jmxObjectName); }
/** * Creates a scheduler using the specified thread pool, job store, and * plugins, and binds it to RMI. * * @param schedulerName * The name for the scheduler. * @param schedulerInstanceId * The instance ID for the scheduler. * @param threadPool * The thread pool for executing jobs * @param threadExecutor * The thread executor for executing jobs * @param jobStore * The type of job store * @param schedulerPluginMap * Map from a <code>String</code> plugin names to * <code>{@link org.quartz.spi.SchedulerPlugin}</code>s. Can use * "null" if no plugins are required. * @param rmiRegistryHost * The hostname to register this scheduler with for RMI. Can use * "null" if no RMI is required. * @param rmiRegistryPort * The port for RMI. Typically 1099. * @param idleWaitTime * The idle wait time in milliseconds. You can specify "-1" for * the default value, which is currently 30000 ms. * @throws SchedulerException * if initialization failed */ public void createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool, ThreadExecutor threadExecutor, JobStore jobStore, Map<String, SchedulerPlugin> schedulerPluginMap, String rmiRegistryHost, int rmiRegistryPort, long idleWaitTime, long dbFailureRetryInterval, boolean jmxExport, String jmxObjectName) throws SchedulerException { createScheduler(schedulerName, schedulerInstanceId, threadPool, DEFAULT_THREAD_EXECUTOR, jobStore, schedulerPluginMap, rmiRegistryHost, rmiRegistryPort, idleWaitTime, dbFailureRetryInterval, jmxExport, jmxObjectName, DEFAULT_BATCH_MAX_SIZE, DEFAULT_BATCH_TIME_WINDOW); }
/** * <p> * Set the <code>{@link JobStore}</code> for the <code>{@link QuartzScheduler}</code> * to use. * </p> * * @exception IllegalArgumentException * if jobStore is null. */ public void setJobStore(JobStore jobStore) { if (jobStore == null) { throw new IllegalArgumentException("JobStore cannot be null."); } this.jobStore = jobStore; }
/** * Creates a scheduler using the specified thread pool and job store. This * scheduler can be retrieved via * {@link DirectSchedulerFactory#getScheduler()} * * @param threadPool * The thread pool for executing jobs * @param jobStore * The type of job store * @throws SchedulerException * if initialization failed */ public void createScheduler(ThreadPool threadPool, JobStore jobStore) throws SchedulerException { createScheduler(DEFAULT_SCHEDULER_NAME, DEFAULT_INSTANCE_ID, threadPool, jobStore); initialized = true; }
/** * Creates a scheduler using the specified thread pool, job store, and * plugins, and binds it to RMI. * * @param schedulerName * The name for the scheduler. * @param schedulerInstanceId * The instance ID for the scheduler. * @param threadPool * The thread pool for executing jobs * @param jobStore * The type of job store * @param schedulerPluginMap * Map from a <code>String</code> plugin names to * <code>{@link org.quartz.spi.SchedulerPlugin}</code>s. Can use * "null" if no plugins are required. * @param rmiRegistryHost * The hostname to register this scheduler with for RMI. Can use * "null" if no RMI is required. * @param rmiRegistryPort * The port for RMI. Typically 1099. * @param idleWaitTime * The idle wait time in milliseconds. You can specify "-1" for * the default value, which is currently 30000 ms. * @throws SchedulerException * if initialization failed */ public void createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool, JobStore jobStore, Map schedulerPluginMap, String rmiRegistryHost, int rmiRegistryPort, long idleWaitTime, long dbFailureRetryInterval, boolean jmxExport, String jmxObjectName) throws SchedulerException { createScheduler(schedulerName, schedulerInstanceId, threadPool, DEFAULT_THREAD_EXECUTOR, jobStore, schedulerPluginMap, rmiRegistryHost, rmiRegistryPort, idleWaitTime, dbFailureRetryInterval, jmxExport, jmxObjectName); }
/** * Creates a scheduler using the specified thread pool and job store. This * scheduler can be retrieved via * {@link DirectSchedulerFactory#getScheduler()} * * @param threadPool * The thread pool for executing jobs * @param jobStore * The type of job store * @throws SchedulerException * if initialization failed */ public void createScheduler(ThreadPool threadPool, JobStore jobStore) throws SchedulerException { createScheduler(DEFAULT_SCHEDULER_NAME, DEFAULT_INSTANCE_ID, threadPool, jobStore); }
/** * <p> * Get the <code>{@link JobStore}</code> for the <code>{@link QuartzScheduler}</code> * to use. * </p> */ public JobStore getJobStore() { return jobStore; }