@Override public ForkJoinWorkerThread newThread(final ForkJoinPool pool) { final String parentThreadName = Thread.currentThread().getName(); final ForkJoinWorkerThread t = new ForkJoinWorkerThread(pool) { /** * http://jsr166-concurrency.10961.n7.nabble.com/How-to-set-the-thread-group-of-the-ForkJoinPool-td1590.html */ @Override protected void onStart() { super.onStart(); final String curThreadName = threadpoolId + "-" + threadIds.incrementAndGet() + ":" + name; setName(curThreadName + Threads.NESTED_THREAD_NAME_SEPARATOR + parentThreadName); } }; /* * So that exceptions are still logged if runnables are sent into executors without futures being checked. This * keeps the default behaviour expected from normal threads. */ if (t.getUncaughtExceptionHandler() != Thread.getDefaultUncaughtExceptionHandler()) { throw new IllegalArgumentException( UncaughtExceptionHandler.class.getSimpleName() + " is not already set properly!"); } return t; }
ExecutorService getExecutor(int asyncThreads) { // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be // put. Move it somewhere else, or remove it if no longer necessary. // See: https://github.com/grpc/grpc-java/issues/2119 return new ForkJoinPool(asyncThreads, new ForkJoinWorkerThreadFactory() { final AtomicInteger num = new AtomicInteger(); @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setDaemon(true); thread.setName("server-worker-" + "-" + num.getAndIncrement()); return thread; } }, UncaughtExceptionHandlers.systemExit(), true /* async */); }
private static synchronized ExecutorService getExecutor() { if (clientExecutor == null) { clientExecutor = new ForkJoinPool( Runtime.getRuntime().availableProcessors(), new ForkJoinWorkerThreadFactory() { final AtomicInteger num = new AtomicInteger(); @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setDaemon(true); thread.setName("grpc-client-app-" + "-" + num.getAndIncrement()); return thread; } }, UncaughtExceptionHandlers.systemExit(), true /* async */); } return clientExecutor; }
/** * Construct a ForkJoinPool with a stricter thread limit. * * <p>ForkJoinPool by default will create a new thread to handle pending work whenever an existing * thread becomes blocked on a task and cannot work steal. In cases when many tasks would block on * a slow running dependency, it can trigger thread creation for all those tasks. * * <p>Note that limiting the maximum threads will impact the ability for ManagedBlockers to cause * the pool to create new worker threads, leading to potential deadlock if many ManagedBlockers * are used. */ public static ForkJoinPool forkJoinPoolWithThreadLimit(int parallelism, int spares) { AtomicInteger activeThreads = new AtomicInteger(0); return new ForkJoinPool( parallelism, pool -> { if (activeThreads.get() > parallelism + spares) { return null; } return new ForkJoinWorkerThread(pool) { @Override protected void onStart() { super.onStart(); activeThreads.incrementAndGet(); } @Override protected void onTermination(Throwable exception) { activeThreads.decrementAndGet(); super.onTermination(exception); } }; }, /* handler */ null, /* asyncMode */ false); }
/** * Gets a value indicating whether no yield is necessary. * * @return {@code true} if the caller is already running on that {@link Executor}. */ @Override public boolean isDone() { if (this.alwaysYield) { return false; } if (executor instanceof ForkJoinPool) { Thread currentThread = Thread.currentThread(); if (currentThread instanceof ForkJoinWorkerThread) { ForkJoinWorkerThread forkJoinWorkerThread = (ForkJoinWorkerThread)currentThread; return forkJoinWorkerThread.getPool() == executor; } } return false; // // We special case the TaskScheduler.Default since that is semantically equivalent to being // // on a ThreadPool thread, and there are various ways to get on those threads. // // TaskScheduler.Current is never null. Even if no scheduler is really active and the current // // thread is not a threadpool thread, TaskScheduler.Current == TaskScheduler.Default, so we have // // to protect against that case too. //#if DESKTOP // bool isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread; //#else // // An approximation of whether we're on a threadpool thread is whether // // there is a SynchronizationContext applied. So use that, since it's // // available to portable libraries. // bool isThreadPoolThread = SynchronizationContext.Current == null; //#endif // return (this.scheduler == TaskScheduler.Default && isThreadPoolThread) // || (this.scheduler == TaskScheduler.Current && TaskScheduler.Current != TaskScheduler.Default); }
/** * pollSubmission returns unexecuted submitted task, if present */ public void testPollSubmission() { final CountDownLatch done = new CountDownLatch(1); final ForkJoinTask a = ForkJoinTask.adapt(awaiter(done)); final ForkJoinTask b = ForkJoinTask.adapt(awaiter(done)); final ForkJoinTask c = ForkJoinTask.adapt(awaiter(done)); final ForkJoinPool p = singletonPool(); try (PoolCleaner cleaner = cleaner(p, done)) { Thread external = new Thread(new CheckedRunnable() { public void realRun() { p.execute(a); p.execute(b); p.execute(c); }}); RecursiveAction s = new CheckedRecursiveAction() { protected void realCompute() { external.start(); try { external.join(); } catch (Exception ex) { threadUnexpectedException(ex); } assertTrue(p.hasQueuedSubmissions()); assertTrue(Thread.currentThread() instanceof ForkJoinWorkerThread); ForkJoinTask r = ForkJoinTask.pollSubmission(); assertTrue(r == a || r == b || r == c); assertFalse(r.isDone()); }}; p.invoke(s); } }
/** * getPool of current thread in pool returns its pool */ public void testWorkerGetPool() { final ForkJoinPool mainPool = mainPool(); RecursiveAction a = new CheckedRecursiveAction() { protected void realCompute() { ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread(); assertSame(mainPool, w.getPool()); }}; testInvokeOnPool(mainPool, a); }
/** * getPoolIndex of current thread in pool returns 0 <= value < poolSize */ public void testWorkerGetPoolIndex() { final ForkJoinPool mainPool = mainPool(); RecursiveAction a = new CheckedRecursiveAction() { protected void realCompute() { ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread(); assertTrue(w.getPoolIndex() >= 0); // pool size can shrink after assigning index, so cannot check // assertTrue(w.getPoolIndex() < mainPool.getPoolSize()); }}; testInvokeOnPool(mainPool, a); }
@Before @SuppressWarnings({"unchecked", "rawtypes"}) public void init() { // Precondition for all tests assertFalse("This test must not run in a ForkJoinPool", currentThread() instanceof ForkJoinWorkerThread); this.mappedDelegateMock = mock(Stream.class); this.mappedIntDelegateMock = mock(IntStream.class); this.mappedLongDelegateMock = mock(LongStream.class); this.mappedDoubleDelegateMock = mock(DoubleStream.class); this.toArrayResult = new String[0]; when(this.delegateMock.map(anyObject())).thenReturn((Stream) this.mappedDelegateMock); when(this.delegateMock.mapToInt(anyObject())).thenReturn(this.mappedIntDelegateMock); when(this.delegateMock.mapToLong(anyObject())).thenReturn(this.mappedLongDelegateMock); when(this.delegateMock.mapToDouble(anyObject())).thenReturn(this.mappedDoubleDelegateMock); when(this.delegateMock.flatMap(anyObject())).thenReturn((Stream) this.mappedDelegateMock); when(this.delegateMock.flatMapToInt(anyObject())).thenReturn(this.mappedIntDelegateMock); when(this.delegateMock.flatMapToLong(anyObject())).thenReturn(this.mappedLongDelegateMock); when(this.delegateMock.flatMapToDouble(anyObject())).thenReturn(this.mappedDoubleDelegateMock); when(this.delegateMock.isParallel()).thenReturn(false); when(this.delegateMock.toArray()).thenReturn(this.toArrayResult); when(this.delegateMock.toArray(anyObject())).thenReturn(this.toArrayResult); when(this.delegateMock.reduce(anyString(), anyObject())).thenReturn("reduce"); when(this.delegateMock.reduce(anyObject())).thenReturn(Optional.of("reduce")); when(this.delegateMock.reduce(anyObject(), anyObject(), anyObject())).thenReturn(42); when(this.delegateMock.collect(anyObject(), anyObject(), anyObject())).thenReturn(42); when(this.delegateMock.collect(anyObject())).thenReturn(singletonList("collect")); when(this.delegateMock.min(anyObject())).thenReturn(Optional.of("min")); when(this.delegateMock.max(anyObject())).thenReturn(Optional.of("max")); when(this.delegateMock.count()).thenReturn(42L); when(this.delegateMock.anyMatch(anyObject())).thenReturn(true); when(this.delegateMock.allMatch(anyObject())).thenReturn(true); when(this.delegateMock.noneMatch(anyObject())).thenReturn(true); when(this.delegateMock.findFirst()).thenReturn(Optional.of("findFirst")); when(this.delegateMock.findAny()).thenReturn(Optional.of("findAny")); this.delegate = singletonList("x").parallelStream(); this.parallelStreamSupport = new ParallelStreamSupport<>(this.delegate, this.workerPool); }
@Test public void forEachParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport.forEach(s -> threadRef.set(currentThread())); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void forEachOrderedParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport.forEachOrdered(s -> threadRef.set(currentThread())); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void toArrayParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .toArray(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void toArrayWithGeneratorParallel() { this.parallelStreamSupport.parallel(); IntFunction<String[]> generator = i -> new String[i]; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .toArray(generator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void reduceWithIdentityAndAccumulatorParallel() { this.parallelStreamSupport.parallel(); BinaryOperator<String> accumulator = (a, b) -> b; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .reduce("a", accumulator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void reduceWithAccumulatorParallel() { this.parallelStreamSupport.parallel(); BinaryOperator<String> accumulator = (a, b) -> b; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .reduce(accumulator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void reduceWithIdentityAndAccumulatorAndCombinerParallel() { this.parallelStreamSupport.parallel(); BiFunction<Thread, String, Thread> accumulator = (a, b) -> currentThread(); BinaryOperator<Thread> combiner = (a, b) -> b; Thread result = this.parallelStreamSupport.reduce(currentThread(), accumulator, combiner); assertThat(result, instanceOf(ForkJoinWorkerThread.class)); }
@Test public void collectWithSupplierAndAccumulatorAndCombinerParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void collectWithCollectorParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .collect(toList()); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void minParallel() { this.parallelStreamSupport.parallel(); Comparator<String> comparator = (a, b) -> 0; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .min(comparator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void maxParallel() { this.parallelStreamSupport.parallel(); Comparator<String> comparator = (a, b) -> 0; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .max(comparator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void countParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .count(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void anyMatchParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .anyMatch(s -> true); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void allMatchParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .allMatch(s -> true); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void noneMatchParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .noneMatch(s -> true); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void findFirstParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .findFirst(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void findAnyParallel() { this.parallelStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelStreamSupport .peek(s -> threadRef.set(currentThread())) .findAny(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void forEachParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport.forEach(i -> threadRef.set(currentThread())); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void forEachOrderedParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport.forEachOrdered(i -> threadRef.set(currentThread())); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void toArrayParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .toArray(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void reduceWithIdentityAndAccumulatorParallel() { this.parallelLongStreamSupport.parallel(); LongBinaryOperator accumulator = (a, b) -> b; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .reduce(0, accumulator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void reduceWithAccumulatorParallel() { this.parallelLongStreamSupport.parallel(); LongBinaryOperator accumulator = (a, b) -> b; AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .reduce(accumulator); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void collectWithSupplierAndAccumulatorAndCombinerParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void sumParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .sum(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void minParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .min(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void maxParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .max(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void countParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .count(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }
@Test public void averageParallel() { this.parallelLongStreamSupport.parallel(); AtomicReference<Thread> threadRef = new AtomicReference<>(); this.parallelLongStreamSupport .peek(i -> threadRef.set(currentThread())) .average(); assertThat(threadRef.get(), instanceOf(ForkJoinWorkerThread.class)); }