@Test public void forkJoin() throws Exception { int n = Runtime.getRuntime().availableProcessors(); Config conf = ConfigFactory.empty() .withValue("executors", ConfigValueFactory.fromAnyRef("forkjoin, asyncMode")); new MockUnit(Env.class, Binder.class) .expect(executors) .expect(unit -> { ForkJoinPool pool = unit.constructor(ForkJoinPool.class) .args(int.class, ForkJoinWorkerThreadFactory.class, UncaughtExceptionHandler.class, boolean.class) .build(eq(n), isA(ForkJoinWorkerThreadFactory.class), eq(null), eq(false)); unit.registerMock(ExecutorService.class, pool); }) .expect(bind("default", true, ExecutorService.class, Executor.class, ForkJoinPool.class)) .expect(onStop) .run(unit -> { new Exec().configure(unit.get(Env.class), conf, unit.get(Binder.class)); }); }
@Test public void forkJoinAlternative() throws Exception { int n = Runtime.getRuntime().availableProcessors(); Config conf = ConfigFactory.empty() .withValue("executors.default.type", ConfigValueFactory.fromAnyRef("forkjoin")) .withValue("executors.default.asyncMode", ConfigValueFactory.fromAnyRef(false)); new MockUnit(Env.class, Binder.class) .expect(executors) .expect(unit -> { ForkJoinPool pool = unit.constructor(ForkJoinPool.class) .args(int.class, ForkJoinWorkerThreadFactory.class, UncaughtExceptionHandler.class, boolean.class) .build(eq(n), isA(ForkJoinWorkerThreadFactory.class), eq(null), eq(false)); unit.registerMock(ExecutorService.class, pool); }) .expect(bind("default", true, ExecutorService.class, Executor.class, ForkJoinPool.class)) .expect(onStop) .run(unit -> { new Exec().configure(unit.get(Env.class), conf, unit.get(Binder.class)); }); }
@Test public void forkJoinAsync() throws Exception { int n = 1; Config conf = ConfigFactory.empty() .withValue("executors", ConfigValueFactory.fromAnyRef("forkjoin=1, asyncMode=true")); new MockUnit(Env.class, Binder.class) .expect(executors) .expect(unit -> { ForkJoinPool pool = unit.constructor(ForkJoinPool.class) .args(int.class, ForkJoinWorkerThreadFactory.class, UncaughtExceptionHandler.class, boolean.class) .build(eq(n), isA(ForkJoinWorkerThreadFactory.class), eq(null), eq(true)); unit.registerMock(ExecutorService.class, pool); }) .expect(bind("default", true, ExecutorService.class, Executor.class, ForkJoinPool.class)) .expect(onStop) .run(unit -> { new Exec().configure(unit.get(Env.class), conf, unit.get(Binder.class)); }); }
@Test public void forkJoinAsyncAlternative() throws Exception { int n = 1; Config conf = ConfigFactory.empty() .withValue("executors.default.type", ConfigValueFactory.fromAnyRef("forkjoin")) .withValue("executors.default.size", ConfigValueFactory.fromAnyRef(1)) .withValue("executors.default.asyncMode", ConfigValueFactory.fromAnyRef(true)); new MockUnit(Env.class, Binder.class) .expect(executors) .expect(unit -> { ForkJoinPool pool = unit.constructor(ForkJoinPool.class) .args(int.class, ForkJoinWorkerThreadFactory.class, UncaughtExceptionHandler.class, boolean.class) .build(eq(n), isA(ForkJoinWorkerThreadFactory.class), eq(null), eq(true)); unit.registerMock(ExecutorService.class, pool); }) .expect(bind("default", true, ExecutorService.class, Executor.class, ForkJoinPool.class)) .expect(onStop) .run(unit -> { new Exec().configure(unit.get(Env.class), conf, unit.get(Binder.class)); }); }
ExecutorService getExecutor(int asyncThreads) { // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be // put. Move it somewhere else, or remove it if no longer necessary. // See: https://github.com/grpc/grpc-java/issues/2119 return new ForkJoinPool(asyncThreads, new ForkJoinWorkerThreadFactory() { final AtomicInteger num = new AtomicInteger(); @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setDaemon(true); thread.setName("server-worker-" + "-" + num.getAndIncrement()); return thread; } }, UncaughtExceptionHandlers.systemExit(), true /* async */); }
private static synchronized ExecutorService getExecutor() { if (clientExecutor == null) { clientExecutor = new ForkJoinPool( Runtime.getRuntime().availableProcessors(), new ForkJoinWorkerThreadFactory() { final AtomicInteger num = new AtomicInteger(); @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setDaemon(true); thread.setName("grpc-client-app-" + "-" + num.getAndIncrement()); return thread; } }, UncaughtExceptionHandlers.systemExit(), true /* async */); } return clientExecutor; }
private static ForkJoinWorkerThreadFactory fjwtf(final String name) { AtomicLong id = new AtomicLong(); return pool -> { ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setName(name + "-" + id.incrementAndGet()); return thread; }; }
public static TestSuite newForkJoinPoolTestSuiteWithParallelism(final int parallelism) throws Exception { ExecutorTestSuiteBuilder<ExecutorService> fjpSuite = ExecutorTestSuiteBuilder.using(new ExecutorTestSubjectGenerator<ExecutorService>() { @Override protected ExecutorService createExecutor(final ThreadFactory threadFactory) { ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() { @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) { @Override public void interrupt() { notifyThreadInterrupted(this); super.interrupt(); } }; notifyNewThread(threadFactory, thread); thread.setDaemon(true); return thread; } }; return new ForkJoinPool(parallelism, factory, null, false); } }).named("ForkJoinPool[parallelism=" + parallelism + "]") .withFeatures(EXECUTOR_SERVICE, IGNORES_INTERRUPTS) .withConcurrencyLevel(parallelism); /* * This test fails sporadically, possibly more consistently with parallelism=2 than 3. ForkJoinPool#invokeAll cancels tasks when it * sees an exception. Whether this cancellation makes it into the returned future depends on a race condition in (parallel) * execution of the tasks. * * It's not completely clear whether this complies with the spec. The spec doesn't explicitly state that the tasks run * independently, but it feels odd for behaviour of later tasks to depend on earlier ones that threw an exception. That said, * perhaps fork-join should expect this sort of coupling between tasks, in which case cancellation of subsequent tasks may be * reasonable. */ fjpSuite.suppressing(InvokeAllTester.class.getMethod("testInvokeAllMixedCompletesAllTasks_NoTimeout"), InvokeAllTester.class.getMethod("testInterruptedWhileWaiting_NoTimeout"), InvokeAllTester.class.getMethod("testInterruptedWhileWaiting_Timeout")); return fjpSuite.createTestSuite(); }