/** * thread pool of consumer */ @Bean(name="defaultThreadPoolProfile") ThreadPoolProfile threadPoolProfile(){ ThreadPoolProfile defaultThreadPoolProfile = new ThreadPoolProfile(); defaultThreadPoolProfile.setDefaultProfile(true); defaultThreadPoolProfile.setId("defaultThreadPoolProfile"); defaultThreadPoolProfile.setPoolSize(threadPoolSize); defaultThreadPoolProfile.setMaxPoolSize(threadMaxPoolSize); defaultThreadPoolProfile.setMaxQueueSize(threadMaxQueueSize); // 队列最大程度1000万 defaultThreadPoolProfile.setTimeUnit(TimeUnit.SECONDS); defaultThreadPoolProfile.setKeepAliveTime(60 * 5L); defaultThreadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); // camelContext().getExecutorServiceManager().registerThreadPoolProfile(defaultThreadPoolProfile); // setDefaultThreadPoolProfile(defaultThreadPoolProfile); return defaultThreadPoolProfile; }
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler, boolean daemon) { // use a profile with the settings ThreadPoolProfile profile = new ThreadPoolProfile(); profile.setPoolSize(corePoolSize); profile.setMaxPoolSize(maxPoolSize); profile.setMaxQueueSize(maxQueueSize); profile.setKeepAliveTime(keepAliveTime); profile.setTimeUnit(timeUnit); // must cast to ThreadPoolExecutor to be able to set the rejected execution handler ThreadPoolExecutor answer = (ThreadPoolExecutor) camelContext.getExecutorServiceManager().newThreadPool(source, name, profile); answer.setRejectedExecutionHandler(rejectedExecutionHandler); return answer; }
@Override public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) { String sanitizedName = URISupport.sanitizeUri(name); ObjectHelper.notNull(profile, "ThreadPoolProfile"); ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile(); profile.addDefaults(defaultProfile); ThreadFactory threadFactory = createThreadFactory(sanitizedName, true); ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory); onThreadPoolCreated(executorService, source, profile.getId()); if (LOG.isDebugEnabled()) { LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", source, sanitizedName, executorService); } return executorService; }
@Override public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); if (rejectedExecutionHandler == null) { rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); } ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); answer.setRemoveOnCancelPolicy(true); // need to wrap the thread pool in a sized to guard against the problem that the // JDK created thread pool has an unbounded queue (see class javadoc), which mean // we could potentially keep adding tasks, and run out of memory. if (profile.getMaxPoolSize() > 0) { return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); } else { return answer; } }
protected synchronized ScheduledExecutorService getExecutorService(CamelContext camelContext) { if (executorService == null || executorService.isShutdown()) { // camel context will shutdown the executor when it shutdown so no need to shut it down when stopping if (executorServiceRef != null) { executorService = camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); if (executorService == null) { ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); } if (executorService == null) { throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry."); } } else { // no explicit configured thread pool, so leave it up to the error handler to decide if it need // a default thread pool from CamelContext#getErrorHandlerExecutorService executorService = null; } } return executorService; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // create and register thread pool profile ThreadPoolProfile profile = new ThreadPoolProfile("myProfile"); profile.setPoolSize(2); profile.setMaxPoolSize(8); profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); context.getExecutorServiceManager().registerThreadPoolProfile(profile); from("direct:start") .aggregate(header("id"), new BodyInAggregatingStrategy()) // use our custom thread pool profile .completionSize(3).executorServiceRef("myProfile") .to("log:foo") .to("mock:aggregated"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { ThreadPoolProfile profile = new ThreadPoolProfile("custom"); profile.setPoolSize(5); profile.setMaxPoolSize(15); profile.setKeepAliveTime(25L); profile.setMaxQueueSize(250); profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); context.getExecutorServiceManager().registerThreadPoolProfile(profile); from("direct:start").threads().executorServiceRef("custom").to("mock:result"); from("direct:foo").threads().executorServiceRef("custom").to("mock:foo"); } }; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { ThreadPoolProfile profile = new ThreadPoolProfile("custom"); profile.setPoolSize(5); profile.setMaxPoolSize(15); profile.setKeepAliveTime(25L); profile.setMaxQueueSize(250); profile.setAllowCoreThreadTimeOut(true); profile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); context.getExecutorServiceManager().registerThreadPoolProfile(profile); from("direct:start").threads().executorServiceRef("custom").to("mock:result"); } }; }
public void testDefaultUnboundedQueueThreadPool() throws Exception { ThreadPoolProfile custom = new ThreadPoolProfile("custom"); custom.setPoolSize(10); custom.setMaxPoolSize(30); custom.setKeepAliveTime(50L); custom.setMaxQueueSize(Integer.MAX_VALUE); context.getExecutorServiceManager().setDefaultThreadPoolProfile(custom); assertEquals(true, custom.isDefaultProfile().booleanValue()); ExecutorService myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool"); assertEquals(false, myPool.isShutdown()); // should use default settings ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool; assertEquals(10, executor.getCorePoolSize()); assertEquals(30, executor.getMaximumPoolSize()); assertEquals(50, executor.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(Integer.MAX_VALUE, executor.getQueue().remainingCapacity()); context.stop(); assertEquals(true, myPool.isShutdown()); }
public void testDefaultNoMaxQueueThreadPool() throws Exception { ThreadPoolProfile custom = new ThreadPoolProfile("custom"); custom.setPoolSize(10); custom.setMaxPoolSize(30); custom.setKeepAliveTime(50L); custom.setMaxQueueSize(0); context.getExecutorServiceManager().setDefaultThreadPoolProfile(custom); assertEquals(true, custom.isDefaultProfile().booleanValue()); ExecutorService myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool"); assertEquals(false, myPool.isShutdown()); // should use default settings ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool; assertEquals(10, executor.getCorePoolSize()); assertEquals(30, executor.getMaximumPoolSize()); assertEquals(50, executor.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(0, executor.getQueue().remainingCapacity()); context.stop(); assertEquals(true, myPool.isShutdown()); }
public void testCustomDefaultThreadPool() throws Exception { ThreadPoolProfile custom = new ThreadPoolProfile("custom"); custom.setKeepAliveTime(20L); custom.setMaxPoolSize(40); custom.setPoolSize(5); custom.setMaxQueueSize(2000); context.getExecutorServiceManager().setDefaultThreadPoolProfile(custom); assertEquals(true, custom.isDefaultProfile().booleanValue()); ExecutorService myPool = context.getExecutorServiceManager().newDefaultThreadPool(this, "myPool"); assertEquals(false, myPool.isShutdown()); // should use default settings ThreadPoolExecutor executor = (ThreadPoolExecutor) myPool; assertEquals(5, executor.getCorePoolSize()); assertEquals(40, executor.getMaximumPoolSize()); assertEquals(20, executor.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(2000, executor.getQueue().remainingCapacity()); context.stop(); assertEquals(true, myPool.isShutdown()); }
public void testTwoGetThreadPoolProfile() throws Exception { assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setKeepAliveTime(20L); foo.setMaxPoolSize(40); foo.setPoolSize(5); foo.setMaxQueueSize(2000); context.getExecutorServiceManager().registerThreadPoolProfile(foo); ThreadPoolProfile bar = new ThreadPoolProfile("bar"); bar.setKeepAliveTime(40L); bar.setMaxPoolSize(5); bar.setPoolSize(1); bar.setMaxQueueSize(100); context.getExecutorServiceManager().registerThreadPoolProfile(bar); assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("foo")); assertSame(bar, context.getExecutorServiceManager().getThreadPoolProfile("bar")); assertNotSame(foo, bar); assertFalse(context.getExecutorServiceManager().getThreadPoolProfile("foo").isDefaultProfile()); assertFalse(context.getExecutorServiceManager().getThreadPoolProfile("bar").isDefaultProfile()); }
public void testGetThreadPoolProfileInheritCustomDefaultValues() throws Exception { ThreadPoolProfile newDefault = new ThreadPoolProfile("newDefault"); newDefault.setKeepAliveTime(30L); newDefault.setMaxPoolSize(50); newDefault.setPoolSize(5); newDefault.setMaxQueueSize(2000); newDefault.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort); context.getExecutorServiceManager().setDefaultThreadPoolProfile(newDefault); assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setMaxPoolSize(25); foo.setPoolSize(1); context.getExecutorServiceManager().registerThreadPoolProfile(foo); assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("foo")); ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyPool", "foo"); ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); assertEquals(25, tp.getMaximumPoolSize()); // should inherit the default values assertEquals(1, tp.getCorePoolSize()); assertEquals(30, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals("Abort", tp.getRejectedExecutionHandler().toString()); }
public void testGetThreadPoolProfileInheritCustomDefaultValues2() throws Exception { ThreadPoolProfile newDefault = new ThreadPoolProfile("newDefault"); // just change the max pool as the default profile should then inherit the old default profile newDefault.setMaxPoolSize(50); context.getExecutorServiceManager().setDefaultThreadPoolProfile(newDefault); assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setPoolSize(1); context.getExecutorServiceManager().registerThreadPoolProfile(foo); assertSame(foo, context.getExecutorServiceManager().getThreadPoolProfile("foo")); ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyPool", "foo"); ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); assertEquals(1, tp.getCorePoolSize()); // should inherit the default values assertEquals(50, tp.getMaximumPoolSize()); assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString()); }
public void testNewThreadPoolProfile() throws Exception { assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setKeepAliveTime(20L); foo.setMaxPoolSize(40); foo.setPoolSize(5); foo.setMaxQueueSize(2000); ExecutorService pool = context.getExecutorServiceManager().newThreadPool(this, "Cool", foo); assertNotNull(pool); ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool); assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(40, tp.getMaximumPoolSize()); assertEquals(5, tp.getCorePoolSize()); assertFalse(tp.isShutdown()); context.stop(); assertTrue(tp.isShutdown()); }
public void testNewThreadPoolProfileById() throws Exception { assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setKeepAliveTime(20L); foo.setMaxPoolSize(40); foo.setPoolSize(5); foo.setMaxQueueSize(2000); context.getExecutorServiceManager().registerThreadPoolProfile(foo); ExecutorService pool = context.getExecutorServiceManager().newThreadPool(this, "Cool", "foo"); assertNotNull(pool); ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, pool); assertEquals(20, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(40, tp.getMaximumPoolSize()); assertEquals(5, tp.getCorePoolSize()); assertFalse(tp.isShutdown()); context.stop(); assertTrue(tp.isShutdown()); }
public void testNewScheduledThreadPoolProfileById() throws Exception { assertNull(context.getExecutorServiceManager().getThreadPoolProfile("foo")); ThreadPoolProfile foo = new ThreadPoolProfile("foo"); foo.setKeepAliveTime(20L); foo.setMaxPoolSize(40); foo.setPoolSize(5); foo.setMaxQueueSize(2000); context.getExecutorServiceManager().registerThreadPoolProfile(foo); ExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(this, "Cool", "foo"); assertNotNull(pool); SizedScheduledExecutorService tp = assertIsInstanceOf(SizedScheduledExecutorService.class, pool); // a scheduled dont use keep alive assertEquals(0, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals(Integer.MAX_VALUE, tp.getMaximumPoolSize()); assertEquals(5, tp.getCorePoolSize()); assertFalse(tp.isShutdown()); context.stop(); assertTrue(tp.isShutdown()); }
public void testLowProfile() throws Exception { CamelContext context = getMandatoryBean(CamelContext.class, "camel-C"); ThreadPoolProfile profile = context.getExecutorServiceManager().getThreadPoolProfile("low"); assertEquals(1, profile.getPoolSize().intValue()); assertEquals(5, profile.getMaxPoolSize().intValue()); assertEquals(null, profile.getKeepAliveTime()); assertEquals(null, profile.getMaxQueueSize()); assertEquals(null, profile.getRejectedPolicy()); // create a thread pool from low ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyLow", "low"); ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); assertEquals(1, tp.getCorePoolSize()); assertEquals(5, tp.getMaximumPoolSize()); // should inherit default options assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals("CallerRuns", tp.getRejectedExecutionHandler().toString()); }
public void testBigProfile() throws Exception { CamelContext context = getMandatoryBean(CamelContext.class, "camel-C"); ThreadPoolProfile profile = context.getExecutorServiceManager().getThreadPoolProfile("big"); assertEquals(50, profile.getPoolSize().intValue()); assertEquals(100, profile.getMaxPoolSize().intValue()); assertEquals(ThreadPoolRejectedPolicy.DiscardOldest, profile.getRejectedPolicy()); assertEquals(null, profile.getKeepAliveTime()); assertEquals(null, profile.getMaxQueueSize()); // create a thread pool from big ExecutorService executor = context.getExecutorServiceManager().newThreadPool(this, "MyBig", "big"); ThreadPoolExecutor tp = assertIsInstanceOf(ThreadPoolExecutor.class, executor); assertEquals(50, tp.getCorePoolSize()); assertEquals(100, tp.getMaximumPoolSize()); // should inherit default options assertEquals(60, tp.getKeepAliveTime(TimeUnit.SECONDS)); assertEquals("DiscardOldest", tp.getRejectedExecutionHandler().toString()); }
protected static synchronized ExecutorService getExecutorService(CamelContext context) { // CamelContext will shutdown thread pool when it shutdown so we can // lazy create it on demand // but in case of hot-deploy or the likes we need to be able to // re-create it (its a shared static instance) if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { final ExecutorServiceManager manager = context.getExecutorServiceManager(); // try to lookup a pool first based on profile ThreadPoolProfile poolProfile = manager.getThreadPoolProfile( FacebookConstants.FACEBOOK_THREAD_PROFILE_NAME); if (poolProfile == null) { poolProfile = manager.getDefaultThreadPoolProfile(); } // create a new pool using the custom or default profile executorService = manager.newScheduledThreadPool(FacebookProducer.class, FacebookConstants.FACEBOOK_THREAD_PROFILE_NAME, poolProfile); } return executorService; }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // register a custom thread pool profile ThreadPoolProfile custom = createCustomProfile(); context.getExecutorServiceManager().registerThreadPoolProfile(custom); from("direct:start") // use the bigPool profile for creating the thread pool to be used .threads().executorServiceRef("bigPool") .to("log:foo") .to("mock:result"); } }; }
@Override public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); if (rejectedExecutionHandler == null) { rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); } ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor( profile.getPoolSize(), managedThreadFactory, rejectedExecutionHandler); if (profile.getMaxPoolSize() > 0) { return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); } else { return answer; } }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // register a custom thread pool profile ThreadPoolProfile custom = createCustomProfile(); context.getExecutorServiceStrategy().registerThreadPoolProfile(custom); from("direct:start") // use the bigPool profile for creating the thread pool to be used .threads().executorServiceRef("bigPool") .to("log:foo") .to("mock:result"); } }; }
private ThreadPoolProfile threadPoolProfileRemote() { //Define custom thread pool profile ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile("openex-remote-thread-profile"); threadPoolProfile.setPoolSize(10); threadPoolProfile.setMaxPoolSize(20); threadPoolProfile.setMaxQueueSize(500); threadPoolProfile.setAllowCoreThreadTimeOut(false); threadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Discard); return threadPoolProfile; }
private ThreadPoolProfile threadPoolProfileExecutor() { //Define custom thread pool profile ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile("openex-worker-thread-profile"); threadPoolProfile.setPoolSize(20); threadPoolProfile.setMaxPoolSize(40); threadPoolProfile.setMaxQueueSize(1000); threadPoolProfile.setAllowCoreThreadTimeOut(false); threadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); return threadPoolProfile; }
protected ThreadPoolRejectedPolicy resolveRejectedPolicy(RouteContext routeContext) { if (getExecutorServiceRef() != null && getRejectedPolicy() == null) { ThreadPoolProfile threadPoolProfile = routeContext.getCamelContext().getExecutorServiceManager().getThreadPoolProfile(getExecutorServiceRef()); if (threadPoolProfile != null) { return threadPoolProfile.getRejectedPolicy(); } } return getRejectedPolicy(); }
private static ExecutorService getExecutorService( Class<? extends AbstractApiEndpoint> endpointClass, CamelContext context, String threadProfileName) { // lookup executorService for extending class name final String endpointClassName = endpointClass.getName(); ExecutorService executorService = executorServiceMap.get(endpointClassName); // CamelContext will shutdown thread pool when it shutdown so we can // lazy create it on demand // but in case of hot-deploy or the likes we need to be able to // re-create it (its a shared static instance) if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { final ExecutorServiceManager manager = context.getExecutorServiceManager(); // try to lookup a pool first based on profile ThreadPoolProfile poolProfile = manager.getThreadPoolProfile( threadProfileName); if (poolProfile == null) { poolProfile = manager.getDefaultThreadPoolProfile(); } // create a new pool using the custom or default profile executorService = manager.newScheduledThreadPool(endpointClass, threadProfileName, poolProfile); executorServiceMap.put(endpointClassName, executorService); } return executorService; }
public ScheduledExecutorService lookupScheduled(Object source, String name, String executorServiceRef) { ScheduledExecutorService answer = camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); if (answer == null) { ThreadPoolProfile profile = getThreadPoolProfile(executorServiceRef); if (profile != null) { Integer poolSize = profile.getPoolSize(); if (poolSize == null) { poolSize = getDefaultThreadPoolProfile().getPoolSize(); } answer = newScheduledThreadPool(source, name, poolSize); } } return answer; }
public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize) { // use a profile with the settings ThreadPoolProfile profile = new ThreadPoolProfile(); profile.setPoolSize(corePoolSize); profile.setMaxPoolSize(maxPoolSize); profile.setMaxQueueSize(maxQueueSize); return camelContext.getExecutorServiceManager().newThreadPool(source, name, profile); }
public DefaultExecutorServiceManager(CamelContext camelContext) { this.camelContext = camelContext; defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId); defaultProfile.setDefaultProfile(true); defaultProfile.setPoolSize(10); defaultProfile.setMaxPoolSize(20); defaultProfile.setKeepAliveTime(60L); defaultProfile.setTimeUnit(TimeUnit.SECONDS); defaultProfile.setMaxQueueSize(1000); defaultProfile.setAllowCoreThreadTimeOut(false); defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); registerThreadPoolProfile(defaultProfile); }
@Override public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) { threadPoolProfiles.remove(defaultThreadPoolProfileId); defaultThreadPoolProfile.addDefaults(defaultProfile); LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile); this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId(); defaultThreadPoolProfile.setDefaultProfile(true); registerThreadPoolProfile(defaultThreadPoolProfile); }
@Override public ExecutorService newThreadPool(Object source, String name, String profileId) { ThreadPoolProfile profile = getThreadPoolProfile(profileId); if (profile != null) { return newThreadPool(source, name, profile); } else { // no profile with that id return null; } }
@Override public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) { ThreadPoolProfile profile = new ThreadPoolProfile(name); profile.setPoolSize(poolSize); profile.setMaxPoolSize(maxPoolSize); return newThreadPool(source, name, profile); }
@Override public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { ThreadPoolProfile profile = new ThreadPoolProfile(name); profile.setPoolSize(poolSize); profile.setMaxPoolSize(poolSize); profile.setKeepAliveTime(0L); return newThreadPool(source, name, profile); }
@Override public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) { String sanitizedName = URISupport.sanitizeUri(name); profile.addDefaults(getDefaultThreadPoolProfile()); ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true)); onThreadPoolCreated(answer, source, null); if (LOG.isDebugEnabled()) { LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", source, sanitizedName, answer); } return answer; }
@Override public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) { ThreadPoolProfile profile = getThreadPoolProfile(profileId); if (profile != null) { return newScheduledThreadPool(source, name, profile); } else { // no profile with that id return null; } }
@Override public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { // allow core thread timeout is default false if not configured boolean allow = profile.getAllowCoreThreadTimeOut() != null ? profile.getAllowCoreThreadTimeOut() : false; return newThreadPool(profile.getPoolSize(), profile.getMaxPoolSize(), profile.getKeepAliveTime(), profile.getTimeUnit(), profile.getMaxQueueSize(), allow, profile.getRejectedExecutionHandler(), factory); }
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { // register thread pool profile ThreadPoolProfile profile = new ThreadPoolProfileBuilder("myProfile").poolSize(5).maxPoolSize(10).maxQueueSize(20).build(); context.getExecutorServiceManager().registerThreadPoolProfile(profile); from("direct:start") .multicast(new AggregationStrategy() { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); return oldExchange; } }) // and refer to the profile here .parallelProcessing().executorServiceRef("myProfile").to("direct:a", "direct:b") // use end to indicate end of multicast route .end() .to("mock:result"); from("direct:a").delay(100).setBody(constant("A")); from("direct:b").setBody(constant("B")); } }; }
public void testAsyncErrorHandlerWait() throws Exception { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { ThreadPoolProfile profile = new ThreadPoolProfile("myAsyncPool"); profile.setPoolSize(5); context.getExecutorServiceManager().registerThreadPoolProfile(profile); errorHandler(deadLetterChannel("mock:dead") .maximumRedeliveries(2) .redeliveryDelay(0) .logStackTrace(false) .executorServiceRef("myAsyncPool")); from("direct:in") .threads(2) .to("mock:foo") .process(new Processor() { public void process(Exchange exchange) throws Exception { throw new Exception("Forced exception by unit test"); } }); } }); context.start(); getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); getMockEndpoint("mock:dead").expectedMessageCount(1); template.requestBody("direct:in", "Hello World"); assertMockEndpointsSatisfied(); }