@Override public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); if (rejectedExecutionHandler == null) { rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); } ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); answer.setRemoveOnCancelPolicy(true); // need to wrap the thread pool in a sized to guard against the problem that the // JDK created thread pool has an unbounded queue (see class javadoc), which mean // we could potentially keep adding tasks, and run out of memory. if (profile.getMaxPoolSize() > 0) { return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); } else { return answer; } }
public void testNewScheduledThreadPoolProfileById() throws Exception { assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setKeepAliveTime(20L); foo.setMaxPoolSize(40); foo.setPoolSize(5); foo.setMaxQueueSize(2000); context.getExecutorServiceManager().registerThreadPoolProfile(foo); ExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(this, "Cool", "foo"); assertNotNull(pool); SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool); // a scheduled dont use keep alive assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize()); assertEquals(5, tp.getCorePoolSize()); assertFalse(tp.isShutdown()); context.stop(); assertTrue(tp.isShutdown()); }
@Override public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); if (rejectedExecutionHandler == null) { rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); } ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor( profile.getPoolSize(), managedThreadFactory, rejectedExecutionHandler); if (profile.getMaxPoolSize() > 0) { return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); } else { return answer; } }
private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) { ObjectHelper.notNull(executorService, "executorService"); List<Runnable> answer = null; if (!executorService.isShutdown()) { if (failSafe) { // log as warn, as we shutdown as fail-safe, so end user should see more details in the log. LOG.warn("Forcing shutdown of ExecutorService: {}", executorService); } else { LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); } answer = executorService.shutdownNow(); if (LOG.isTraceEnabled()) { LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", executorService, executorService.isShutdown(), executorService.isTerminated()); } } // let lifecycle strategy be notified as well which can let it be managed in JMX as well ThreadPoolExecutor threadPool = null; if (executorService instanceof ThreadPoolExecutor) { threadPool = (ThreadPoolExecutor) executorService; } else if (executorService instanceof SizedScheduledExecutorService) { threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); } if (threadPool != null) { for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { lifecycle.onThreadPoolRemove(camelContext, threadPool); } } // remove reference as its shutdown (do not remove if fail-safe) if (!failSafe) { executorServices.remove(executorService); } return answer; }
public void testNewScheduledThreadPool() throws Exception { ExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(this, "Cool", 5); assertNotNull(pool); SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool); // a scheduled dont use keep alive assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize()); assertEquals(5, tp.getCorePoolSize()); assertFalse(tp.isShutdown()); context.stop(); assertTrue(tp.isShutdown()); }
public void testNewSingleThreadScheduledExecutor() throws Exception { ExecutorService pool = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "Cool"); assertNotNull(pool); SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool); // a scheduled dont use keep alive assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize()); assertEquals(1, tp.getCorePoolSize()); assertFalse(tp.isShutdown()); context.stop(); assertTrue(tp.isShutdown()); }
public void testScheduledThreadPool() throws Exception { SizedScheduledExecutorService pool = context.getRegistry().lookupByNameAndType("myPool", SizedScheduledExecutorService.class); assertNotNull(pool); assertFalse("Should be started", pool.isShutdown()); assertEquals(5, pool.getCorePoolSize()); }
private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) { if (executorService == null) { return false; } boolean warned = false; // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively // and try shutting down again. In both cases we wait at most the given shutdown timeout value given // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus // we ought to shutdown much faster) if (!executorService.isShutdown()) { StopWatch watch = new StopWatch(); LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); executorService.shutdown(); if (shutdownAwaitTermination > 0) { try { if (!awaitTermination(executorService, shutdownAwaitTermination)) { warned = true; LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); executorService.shutdownNow(); // we are now shutting down aggressively, so wait to see if we can completely shutdown or not if (!awaitTermination(executorService, shutdownAwaitTermination)) { LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); } } } catch (InterruptedException e) { warned = true; LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); // we were interrupted during shutdown, so force shutdown executorService.shutdownNow(); } } // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log if (warned) { LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())); } else if (LOG.isDebugEnabled()) { LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())); } } // let lifecycle strategy be notified as well which can let it be managed in JMX as well ThreadPoolExecutor threadPool = null; if (executorService instanceof ThreadPoolExecutor) { threadPool = (ThreadPoolExecutor) executorService; } else if (executorService instanceof SizedScheduledExecutorService) { threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); } if (threadPool != null) { for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { lifecycle.onThreadPoolRemove(camelContext, threadPool); } } // remove reference as its shutdown (do not remove if fail-safe) if (!failSafe) { executorServices.remove(executorService); } return warned; }
/** * Invoked when a new thread pool is created. * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext, * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method, * which for example will enlist the thread pool in JMX management. * * @param executorService the thread pool * @param source the source to use the thread pool * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile */ private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) { // add to internal list of thread pools executorServices.add(executorService); String id; String sourceId = null; String routeId = null; // extract id from source if (source instanceof NamedNode) { id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory()); // and let source be the short name of the pattern sourceId = ((NamedNode) source).getShortName(); } else if (source instanceof String) { id = (String) source; } else if (source != null) { if (source instanceof StaticService) { // the source is static service so its name would be unique id = source.getClass().getSimpleName(); } else { // fallback and use the simple class name with hashcode for the id so its unique for this given source id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")"; } } else { // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")"; } // id is mandatory ObjectHelper.notEmpty(id, "id for thread pool " + executorService); // extract route id if possible if (source instanceof ProcessorDefinition) { RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source); if (route != null) { routeId = route.idOrCreate(this.camelContext.getNodeIdFactory()); } } // let lifecycle strategy be notified as well which can let it be managed in JMX as well ThreadPoolExecutor threadPool = null; if (executorService instanceof ThreadPoolExecutor) { threadPool = (ThreadPoolExecutor) executorService; } else if (executorService instanceof SizedScheduledExecutorService) { threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); } if (threadPool != null) { for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); } } // now call strategy to allow custom logic onNewExecutorService(executorService); }