Java 类java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy 实例源码
项目:openjdk-jdk10
文件:ThreadPoolExecutorTest.java
/** Directly test simple ThreadPoolExecutor RejectedExecutionHandlers. */
public void testStandardRejectedExecutionHandlers() {
final ThreadPoolExecutor p =
new ThreadPoolExecutor(1, 1, 1, SECONDS,
new ArrayBlockingQueue<Runnable>(1));
final AtomicReference<Thread> thread = new AtomicReference<>();
final Runnable r = new Runnable() { public void run() {
thread.set(Thread.currentThread()); }};
try {
new AbortPolicy().rejectedExecution(r, p);
shouldThrow();
} catch (RejectedExecutionException success) {}
assertNull(thread.get());
new DiscardPolicy().rejectedExecution(r, p);
assertNull(thread.get());
new CallerRunsPolicy().rejectedExecution(r, p);
assertSame(Thread.currentThread(), thread.get());
// check that pool was not perturbed by handlers
assertTrue(p.getRejectedExecutionHandler() instanceof AbortPolicy);
assertEquals(0, p.getTaskCount());
assertTrue(p.getQueue().isEmpty());
}
项目:RekognitionS3Batch
文件:Processor.java
public Processor(ProcessorConfig config) {
ProfileCredentialsProvider creds = new ProfileCredentialsProvider(config.profile());
creds.getCredentials(); // credible credential criteria
if (config.disableCerts())
System.setProperty("com.amazonaws.sdk.disableCertChecking", "true");
// Rekognition init
rek = new AmazonRekognitionClient(creds);
if (config.endpointOverride())
rek.setEndpoint(config.endpoint());
minConfidence = Integer.parseInt(config.confidence());
// The SQS queue to find jobs on
sqs = new AmazonSQSClient(creds);
queueUrl = sqs.createQueue(config.queue()).getQueueUrl();
// Processors
if (config.wantCloudSearch())
processors.add(new CloudSearchIndexer(creds, config.cloudSearch()));
if (config.wantDynamo())
processors.add(new DynamoWriter(creds, config.dynamo()));
if (config.wantTags3())
processors.add(new S3ObjectTagger(creds, config.tagPrefix()));
// Executor Service
int maxWorkers = Integer.parseInt(config.concurrency());
executor = new ThreadPoolExecutor(
1, maxWorkers, 30, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxWorkers * 2, false),
new CallerRunsPolicy() // prevents backing up too many jobs
);
maxImagesToProcess = Long.parseLong(config.max());
}
项目:BJAF3.x
文件:ThroughputPipe.java
/**
* 流量管道(流量控制器)
*
* @param pipeSize
* --管道大小
* @param poolSize
* --流量消费线程池大小
* @param handler
* --消费接口实现
* @param stepOfDispatcher
* --流量消费指派速度参数,单位ms毫秒。例如,每秒消费1个请求,则值为1000ms<br>
* 特别地,值为0时,指派速度没有限制,管道流量消费速度只依赖于线程池的大小及handler计算速度。
*/
public ThroughputPipe(int pipeSize, int poolSize,
IConsumeHandler<T> handler, long stepOfDispatcher) {
if (pipeSize <= 0 || poolSize <= 0) {
throw new RuntimeException("size's value must >0!");
}
if (null == handler) {
throw new RuntimeException("handle can't be null!");
}
this.queue = new LinkedBlockingQueue<T>(pipeSize);
this.handler = handler;
tpe = new ThreadPoolExecutor(poolSize, poolSize, livetime,
TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new CallerRunsPolicy());
this.stepOfDispatcher = stepOfDispatcher;
}
项目:async-event
文件:Workbench.java
/**
* @throws java.io.IOException
*/
private void initWorkbench(String configFile) throws IOException {
if (initialized) {
return;
}
this.stressStrategy = getStressStrategy(configFile);
this.config = stressStrategy.getStressConfig();
if (this.config == null) {
throw new NullPointerException("未配置压力测试参数!");
}
servicePool = new ThreadPoolExecutor(config.getThreadNum(), config.getThreadNum(), 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new CallerRunsPolicy());
finish = new CountDownLatch(config.getThreadNum());
scheduledThreadPool = Executors.newScheduledThreadPool(1);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
public void run() {
statistic();
}
}, config.getStatPeriod(), config.getStatPeriod(), TimeUnit.MILLISECONDS);
if (!"".equals(config.getOutputFileName())) {
resultWriter = new FileWriter(config.getOutputFileName());
}
this.initialized = true;// 初始化完成
}
项目:nexus-public
文件:EventExecutor.java
/**
* Move from direct to asynchronous subscriber processing.
*/
@Override
protected void doStart() throws Exception {
// direct hand-off used! Host pool will use caller thread to execute async subscribers when pool full!
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
0,
HOST_THREAD_POOL_SIZE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new NexusThreadFactory("event", "event-manager"),
new CallerRunsPolicy()
);
// begin distributing events in truly asynchronous fashion
delegate = NexusExecutorService.forCurrentSubject(threadPool);
}
项目:coco
文件:ThreadPoolHolder.java
public static ExecutorService getFixedPool(String poolName, Integer min, Integer max, Integer queueSize,
Long aliveTimeInsecond) {
if (threadPoolHolder == null) {
synchronized (ThreadPoolHolder.class) {
if (threadPoolHolder == null) {
threadPoolHolder = new ThreadPoolHolder();
map = Maps.newHashMap();
lock = new ReentrantLock();
}
}
}
ExecutorService service = map.get(poolName);
if (service == null) {
lock.lock();
if (service == null) {
try {
int min2 = Runtime.getRuntime().availableProcessors();
int max2 = min2 * 2 + 2;
if (min == null || min < min2) {
min = min2;
}
if (max == null || max < max2) {
max = max2;
}
queueSize = queueSize == null ? 100 : queueSize;
aliveTimeInsecond = aliveTimeInsecond == null ? 100 : aliveTimeInsecond;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build();
service = new ThreadPoolExecutor(min, max, aliveTimeInsecond, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueSize), threadFactory, new CallerRunsPolicy());
map.put(poolName, service);
} finally {
lock.unlock();
}
}
}
return service;
}
项目:coco
文件:ThreadPoolHolder.java
public static ExecutorService getFixedPool(String poolName, Integer min, Integer max) {
if (threadPoolHolder == null) {
synchronized (ThreadPoolHolder.class) {
if (threadPoolHolder == null) {
threadPoolHolder = new ThreadPoolHolder();
map = Maps.newHashMap();
lock = new ReentrantLock();
}
}
}
ExecutorService service = map.get(poolName);
if (service == null) {
lock.lock();
if (service == null) {
try {
int min2 = Runtime.getRuntime().availableProcessors();
int max2 = min2 * 2 + 1;
if (min == null || min < min2) {
min = min2;
}
if (max == null || max < max2) {
max = max2;
}
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build();
service = new ThreadPoolExecutor(min, max, 100, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100), threadFactory, new CallerRunsPolicy());
map.put(poolName, service);
} finally {
lock.unlock();
}
}
}
return service;
}
项目:BJAF3.x
文件:ServiceServer.java
public ServiceServer(int port) {
this.port = port;
this.channelGroup = new DefaultChannelGroup();
bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(new NamedThreadFactory(
"ServiceServer-bossExecutor-", false)),
Executors.newCachedThreadPool(new NamedThreadFactory(
"ServiceServer-workerExecutor-", true))));
bootstrap.setOption("tcpNoDelay", Boolean.parseBoolean(AppProperties
.get("rpc_server_tcpNoDelay", "true")));
bootstrap.setOption("reuseAddress", Boolean.parseBoolean(AppProperties
.get("rpc_server_reuseAddress", "true")));
String c1 = AppProperties.get("rpc_server_child_tcpNoDelay");
if (c1 != null && c1.trim().length() > 0) {
bootstrap.setOption("child.tcpNoDelay", Boolean.parseBoolean(c1));
}
c1 = AppProperties.get("rpc_server_child_receiveBufferSize");
if (c1 != null && c1.trim().length() > 0) {
bootstrap
.setOption("child.receiveBufferSize", Integer.parseInt(c1));
}
this.taskThreadPool = new TaskThreadPool(AppProperties.getAsInt(
"rpc_server_workThreadPool_coreSize", 50),
AppProperties
.getAsInt("rpc_server_workThreadPool_MaxSize", 200),
AppProperties.getAsInt(
"rpc_server_workThreadPool_keepAliveTime",
60 * 1000 * 5), true, new CallerRunsPolicy());
}
项目:BJAF3.x
文件:AsynchroCallImp.java
private AsynchroCallImp() {
String size = AppProperties.get("command_asynchro_call_queue_size");
if (size == null || size.length() == 0) {
size = "10";
}
int size2 = Integer.parseInt(size.trim());
wtp = new TaskThreadPool(size2, size2, 5 * 1000 * 60, true,
new CallerRunsPolicy());
cmdQueue = new BlockQueue();
consumer = new Consumer("AsynchroCallConsumer");
consumer.start();
}
项目:BJAF3.x
文件:Publisher.java
private Publisher(int count, Map <String, SubWorker> docSubWorkerMap) {
this.pubWorkerAmount = 0;
this.docQueue = new BlockQueue();
this.docSubWorkerMap = docSubWorkerMap;
this.re = new TaskExecutor(new TaskThreadPool(count, count,
5 * 1000 * 60, false, new CallerRunsPolicy()));
}
项目:BJAF3.x
文件:Subscriber.java
private Subscriber(int count, IQueue publisherDocqueue, Map<String, SubWorker> docSubWorkerMap) {
this.concurrentCounter = new Counter(0);
this.docSubWorkerMap = docSubWorkerMap;
this.wkpool = new TaskThreadPool(count, count, 5 * 1000 * 60, false,
new CallerRunsPolicy());
dp = new Dispatcher(this.wkpool, this.concurrentCounter,
publisherDocqueue, this.docSubWorkerMap);
}
项目:hawkbit
文件:ExecutorAutoConfiguration.java
/**
* @return the executor for UI background processes.
*/
@Bean(name = "uiExecutor")
@ConditionalOnMissingBean(name = "uiExecutor")
public Executor uiExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(20);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS,
blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build());
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
}
项目:entity-essentials
文件:EventBus.java
public EventBus(Context context)
{
myContext = context;
myEventDispatcherFactory = new EventDispatcherFactoryImpl();
mySubscriptionFactory = new SubscriptionFactoryImpl();
myBookkeeper = new SubscriptionBookkeeperImpl();
myWorkerPool = new ThreadPoolExecutor(4, 4, 0L, SECONDS, new ArrayBlockingQueue<Runnable>(256), new EventBusThreadFactory(), new CallerRunsPolicy());
}
项目:asteria-3.0
文件:ServiceQueue.java
/**
* Creates and configures a new {@link ScheduledExecutorService} with a
* timeout value of {@code seconds}. If the timeout value is below or equal
* to zero then the returned executor will never timeout.
*
* @return the newly created and configured executor service.
*/
private static ScheduledExecutorService createServiceExecutor(long seconds) {
Preconditions.checkArgument(seconds >= 0, "The timeout value must be equal to or greater than 0!");
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("ServiceQueueThread").build());
if (seconds > 0) {
executor.setKeepAliveTime(seconds, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
}
return executor;
}
项目:killbill-analytics-plugin
文件:BusinessExecutor.java
public static ExecutorService newCachedThreadPool(final int nbThreads, final String name) {
// Note: we don't use the default rejection handler here (AbortPolicy) as we always want the tasks to be executed
return Executors.newCachedThreadPool(0,
nbThreads,
name,
60L,
TimeUnit.SECONDS,
new CallerRunsPolicy());
}
项目:logdb
文件:ParallelMergeSorter.java
public ParallelMergeSorter(Comparator<Item> comparator, int runLength, int memoryRunCount) {
this.runLength = runLength;
this.comparator = comparator;
this.buffer = new LinkedList<Item>();
this.runIndexer = new AtomicInteger();
this.executor = new ThreadPoolExecutor(8, 8, 10, TimeUnit.SECONDS, new LimitedQueue<Runnable>(8), new NamedThreadFactory(
"Sort Worker"), new CallerRunsPolicy());
this.cacheCount = new AtomicInteger(memoryRunCount);
}
项目:logdb
文件:StreamingResultEncoder.java
public StreamingResultEncoder(String name, int poolSize) {
if (poolSize < 1)
throw new IllegalArgumentException("pool size should be positive");
this.poolSize = poolSize;
this.executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
poolSize), new NamedThreadFactory(name), new CallerRunsPolicy());
slog.info("araqne logdb: created encoder thread pool [{}]", poolSize);
}
项目:logdb
文件:StreamingResultEncoder.java
public StreamingResultEncoder(String name, int poolSize) {
if (poolSize < 1)
throw new IllegalArgumentException("pool size should be positive");
this.poolSize = poolSize;
this.executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
poolSize), new NamedThreadFactory(name), new CallerRunsPolicy());
slog.debug("araqne logdb: created encoder thread pool [{}]", poolSize);
}
项目:adaptive-executor-java
文件:ExecutorFactory.java
ExecutorService create(int corePoolSize, int maxPoolThread, Duration keepAliveTime,
BlockingQueue<Runnable> workQueue, CallerRunsPolicy policy);
项目:invesdwin-util
文件:Executors.java
public static WrappedExecutorService newFixedCallerRunsThreadPool(final String name, final int nThreads) {
final java.util.concurrent.ThreadPoolExecutor ex = new java.util.concurrent.ThreadPoolExecutor(nThreads,
nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(nThreads),
newFastThreadLocalThreadFactory(name), new CallerRunsPolicy());
return new WrappedExecutorService(ex, name);
}
项目:KeyedThreadPool
文件:KeyedThreadPoolExecutor.java
public KeyedThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
this(corePoolSize, threadFactory, new CallerRunsPolicy());
}
项目:staging-client-java
文件:IntegrationUtils.java
public static IntegrationResult processSchemaSelection(final Staging staging, String fileName, InputStream is) throws IOException, InterruptedException {
// initialize the threads pool (don't use more than 9 threads)
int n = Math.min(9, Runtime.getRuntime().availableProcessors() + 1);
ExecutorService pool = new ThreadPoolExecutor(n, n, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000), new CallerRunsPolicy());
Stopwatch stopwatch = Stopwatch.create();
AtomicInteger processedCases = new AtomicInteger(0);
final AtomicInteger failedCases = new AtomicInteger(0);
System.out.println("Starting schema selection tests from " + fileName + " [" + n + " threads]");
LineNumberReader reader = new LineNumberReader(new InputStreamReader(is, "UTF-8"));
// loop over each line in the file
String line = reader.readLine();
while (line != null) {
if (!line.startsWith("#")) {
processedCases.getAndIncrement();
// split the string; important to keep empty trailing values in the resulting array
String[] parts = Arrays.stream(line.split(",", -1)).map(String::trim).toArray(String[]::new);
if (parts.length != 4)
throw new IllegalStateException("Bad record in schema_selection.txt on line number" + reader.getLineNumber());
final int lineNum = reader.getLineNumber();
final String fullLine = line;
pool.submit(() -> {
try {
SchemaLookup lookup = new SchemaLookup(parts[0], parts[1]);
lookup.setInput(CsStagingData.SSF25_KEY, parts[2]);
List<StagingSchema> lookups = staging.lookupSchema(lookup);
if (parts[3].length() == 0) {
if (lookups.size() == 1) {
System.out.println("Line #" + lineNum + " [" + fullLine + "] --> The schema selection should not have found any schema but did: " + lookups.get(0).getId());
failedCases.getAndIncrement();
}
}
else {
if (lookups.size() != 1) {
System.out.println("Line #" + lineNum + " [" + fullLine + "] --> The schema selection should have found a schema, " + parts[3] + ", but did not.");
failedCases.getAndIncrement();
}
else if (!Objects.equals(lookups.get(0).getId(), parts[3])) {
System.out.println(
"Line #" + lineNum + " [" + fullLine + "] --> The schema selection found schema " + lookups.get(0).getId() + " but it should have found " + parts[3] + ".");
failedCases.getAndIncrement();
}
}
}
catch (Throwable t) {
if (failedCases.get() == 0)
System.out.println("Line #" + lineNum + " --> Exception processing schema selection: " + t.getMessage());
failedCases.getAndIncrement();
}
return null;
});
}
line = reader.readLine();
}
pool.shutdown();
pool.awaitTermination(30, TimeUnit.SECONDS);
stopwatch.stop();
String perMs = String.format("%.3f", ((float)stopwatch.elapsed(TimeUnit.MILLISECONDS) / processedCases.get()));
System.out.print("Completed " + NumberFormat.getNumberInstance(Locale.US).format(processedCases.get()) + " cases in " + stopwatch + " (" + perMs + "ms/case).");
if (failedCases.get() > 0)
System.out.println("There were " + NumberFormat.getNumberInstance(Locale.US).format(failedCases.get()) + " failures.");
else
System.out.println();
return new IntegrationResult(processedCases.get(), failedCases.get());
}
项目:killbill-analytics-plugin
文件:BusinessExecutor.java
public static ScheduledExecutorService newSingleThreadScheduledExecutor(final String name) {
return Executors.newSingleThreadScheduledExecutor(name,
new CallerRunsPolicy());
}
项目:logdb
文件:StreamingResultDecoder.java
public StreamingResultDecoder(String name, int poolSize) {
executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(poolSize),
new NamedThreadFactory(name), new CallerRunsPolicy());
}
项目:logdb
文件:StreamingResultDecoder.java
public StreamingResultDecoder(String name, int poolSize) {
executor = new ThreadPoolExecutor(poolSize, poolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(poolSize),
new NamedThreadFactory(name), new CallerRunsPolicy());
}
项目:asteria-3.0
文件:GameSyncExecutor.java
/**
* Creates and configures the update service for this game sync executor.
* The returned executor is <b>unconfigurable</b> meaning it's configuration
* can no longer be modified.
*
* @param nThreads
* the amount of threads to create this service.
* @return the newly created and configured service.
*/
private ExecutorService create(int nThreads) {
if (nThreads <= 1)
return null;
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(nThreads);
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat("GameSyncThread").build());
return Executors.unconfigurableExecutorService(executor);
}
项目:components-ness-executors
文件:ThreadPoolConfiguration.java
@Override
RejectedExecutionHandler getHandler() {
return new ThreadPoolExecutor.CallerRunsPolicy();
}