/** Directly test simple ThreadPoolExecutor RejectedExecutionHandlers. */ public void testStandardRejectedExecutionHandlers() { final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 1, SECONDS, new ArrayBlockingQueue<Runnable>(1)); final AtomicReference<Thread> thread = new AtomicReference<>(); final Runnable r = new Runnable() { public void run() { thread.set(Thread.currentThread()); }}; try { new AbortPolicy().rejectedExecution(r, p); shouldThrow(); } catch (RejectedExecutionException success) {} assertNull(thread.get()); new DiscardPolicy().rejectedExecution(r, p); assertNull(thread.get()); new CallerRunsPolicy().rejectedExecution(r, p); assertSame(Thread.currentThread(), thread.get()); // check that pool was not perturbed by handlers assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); assertEquals(0, p.getTaskCount()); assertTrue(p.getQueue().isEmpty()); }
/** 执行任务,当线程池处于关闭,将会重新创建新的线程池 */ public synchronized void execute(Runnable run) { if (run == null) { return; } if (mPool == null || mPool.isShutdown()) { //参数说明 //当线程池中的线程小于mCorePoolSize,直接创建新的线程加入线程池执行任务 //当线程池中的线程数目等于mCorePoolSize,将会把任务放入任务队列BlockingQueue中 //当BlockingQueue中的任务放满了,将会创建新的线程去执行, //但是当总线程数大于mMaximumPoolSize时,将会抛出异常,交给RejectedExecutionHandler处理 //mKeepAliveTime是线程执行完任务后,且队列中没有可以执行的任务,存活的时间,后面的参数是时间单位 //ThreadFactory是每次创建新的线程工厂 mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy()); } mPool.execute(run); }
public QuartzThreadPool(final int poolSize) { checkArgument(poolSize > 0, "Pool size must be greater than zero"); this.threadPoolExecutor = new NexusThreadPoolExecutor( poolSize, // core-size poolSize, // max-size 0L, // keep-alive TimeUnit.MILLISECONDS, new SynchronousQueue<>(), // no queuing new NexusThreadFactory("quartz", "nx-tasks"), new AbortPolicy()); // wrapper for Shiro integration this.nexusExecutorService = NexusExecutorService.forFixedSubject( threadPoolExecutor, FakeAlmightySubject.TASK_SUBJECT); }
/** * 连接到线上服务器。 */ public void connect() throws LinkException { if (!connected.compareAndSet(false, true)) { return; } if (this.removeDuplicate) { this.tmcHandler = new DuplicateRemoverTmcHandler(this); } else { this.tmcHandler = new TmcHandler(this); } this.client.setMessageHandler(this.tmcHandler); this.threadPool = new ThreadPoolExecutor(threadCount, threadCount, fetchPeriod * 2, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<Runnable>(queueSize), new NamedThreadFactory("tmc-worker"), new AbortPolicy()); try { this.client.connect(uri); } catch (LinkException e) { connected.set(false); throw e; } this.doPullRequest(); }
public final synchronized void a(Runnable runnable) { if (runnable != null) { try { if (this.a == null || this.a.isShutdown()) { this.a = new ThreadPoolExecutor(this.b, this.c, this.d, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), Executors.defaultThreadFactory(), new AbortPolicy()); } this.a.execute(runnable); } catch (Exception e) { e.printStackTrace(); } } }
/** * The default rejected execution handler is AbortPolicy. */ public void testDefaultRejectedExecutionHandler() { final ThreadPoolExecutor p = new ThreadPoolExecutor(1, 2, LONG_DELAY_MS, MILLISECONDS, new ArrayBlockingQueue<Runnable>(10)); try (PoolCleaner cleaner = cleaner(p)) { assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy); } }
public VSphereAdapterResourceEnumerationService() { this.enumerationThreadPool = new ThreadPoolExecutor(MAX_CONCURRENT_ENUM_PROCESSES, MAX_CONCURRENT_ENUM_PROCESSES, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new AbortPolicy()); this.tagCache = new TagCache(); }
public void execute(Runnable r) { if (executor == null) { executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy()); // 参1:核心线程数;参2:最大线程数;参3:线程休眠时间;参4:时间单位;参5:线程队列;参6:生产线程的工厂;参7:线程异常处理策略 } // 线程池执行一个Runnable对象, 具体运行时机线程池说了算 executor.execute(r); }
@Bean(destroyMethod = "shutdown") public ExecutorService executor(final Tracer tracer) { return TracingExecutors.preserve( new ThreadPoolExecutor( 1, 20, 1, MINUTES, new ArrayBlockingQueue<>(0), new CustomizableThreadFactory("http-example-"), new AbortPolicy()), tracer); }
/** * 执行任务,当线程池处于关闭,将会重新创建新的线程池 */ public synchronized void execute(Runnable run) { if (run == null) { return; } if (mPool == null || mPool.isShutdown()) { mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy()); } mPool.execute(run); }
/** * Create a new {@link ObservableExecutorService} with the given * fixed pool size. See {@link Executors#newFixedThreadPool(int)} * for details. * * @param poolSize The pool size * @return The {@link ObservableExecutorService} */ public static ObservableExecutorService newFixedThreadPool(int poolSize) { return new ObservableExecutorService( poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy()); }
/** * Create a new {@link ObservableExecutorService} with a cached thread * pool. See {@link Executors#newCachedThreadPool()}. * * @return The {@link ObservableExecutorService} */ public static ObservableExecutorService newCachedThreadPool() { return new ObservableExecutorService( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy()); }
static ThreadPoolExecutor newDefaultThreadPool() { final ThreadPoolExecutor result = new ThreadPoolExecutor( // Bound the pool. Most foreground dependency managers should be called only very rarely, so // keep a minimal core pool around and only grow it on demand. 1, 16, // Threads will be kept alive in an idle state for a minute or two. After that, they may be // garbage-collected, so that we're keeping a larger thread pool only during weird periods of // congestion. (Note: the background manager will typically keep all threads pretty active, since it's // repeatedly launching new pingers. The live manager will spin them up and down based on traffic to // the rather uncommonly used /healthcheck/live uri). 30, TimeUnit.SECONDS, // Use a blocking queue just to keep track of checks when the world is going wrong. This is mostly useful // when we're adding a bunch of checks at the same time, such as during a live healthcheck. Might as well // keep this pretty small, because any nontrivial wait to execute is going to blow up a timeout anyway. new SynchronousQueue<Runnable>(), // Name your threads. new ThreadFactoryBuilder() .setNameFormat("dependency-default-" + DEFAULT_THREAD_POOL_COUNT.getAndIncrement() + "-checker-%d") .setDaemon(true) .setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { Logger.getLogger(AbstractDependencyManager.class) .error("Uncaught throwable in thread " + t.getName() + "/" + t.getId(), e); } }) .build(), // Explicitly restating the default policy here, because healthchecks should Just Not Work if there // are insufficient resources to support them. Given the smallish queue above, this means that // we're going to end up throwing exceptions if we get too blocked up somehow. new AbortPolicy()); result.prestartAllCoreThreads(); return result; }
@Override RejectedExecutionHandler getHandler() { return new ThreadPoolExecutor.AbortPolicy(); }