@Override protected List<WordStatistic> compute() { if (lines.size() <= countOfLinesThreshold) { return lines. stream(). filter(StringUtils::isNotEmpty). map(this::countWordsInLine). reduce(Collections.emptyList(), this::mergeWordStatistics); } RecursiveTask<List<WordStatistic>> recursiveTask = new WordStatisticsInLinesRecursiveTask(lines.subList(0, countOfLinesThreshold), countOfLinesThreshold, wordSeparator); recursiveTask.fork(); return mergeWordStatistics( new WordStatisticsInLinesRecursiveTask(lines.subList(countOfLinesThreshold, lines.size()), countOfLinesThreshold, wordSeparator).compute(), recursiveTask.join() ); }
private static void realMain(String[] args) throws Throwable { if (debug) { String pp = System.getProperty( "java.util.concurrent.ForkJoinPool.common.parallelism"); System.out.println( "java.util.concurrent.ForkJoinPool.common.parallelism:" + pp); String tf = System.getProperty( "java.util.concurrent.ForkJoinPool.common.threadFactory"); System.out.println( "java.util.concurrent.ForkJoinPool.common.threadFactory:" + tf); } long from = 0, to = 50000; RecursiveTask<Long> task = new SumTask(from, to, Thread.currentThread()); long sum = task.invoke(); System.out.printf("%nSum: from [%d] to [%d] = [%d]%n", from, to, sum); task.fork(); sum = task.join(); System.out.printf("%nSum: from [%d] to [%d] = [%d]%n", from, to, sum); sum = ForkJoinPool.commonPool().invoke(task.fork()); System.out.printf("%nSum: from [%d] to [%d] = [%d]%n", from, to, sum); }
/** * join of a forked task throws exception when task completes abnormally */ public void testAbnormalForkJoin() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FailingFibTask f = new FailingFibTask(8); assertSame(f, f.fork()); try { Integer r = f.join(); shouldThrow(); } catch (FJException success) { checkCompletedAbnormally(f, success); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * get of a forked task throws exception when task completes abnormally */ public void testAbnormalForkGet() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() throws Exception { FailingFibTask f = new FailingFibTask(8); assertSame(f, f.fork()); try { Integer r = f.get(); shouldThrow(); } catch (ExecutionException success) { Throwable cause = success.getCause(); assertTrue(cause instanceof FJException); checkCompletedAbnormally(f, cause); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * timed get of a forked task throws exception when task completes abnormally */ public void testAbnormalForkTimedGet() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() throws Exception { FailingFibTask f = new FailingFibTask(8); assertSame(f, f.fork()); try { Integer r = f.get(LONG_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (ExecutionException success) { Throwable cause = success.getCause(); assertTrue(cause instanceof FJException); checkCompletedAbnormally(f, cause); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invoke task throws exception when task cancelled */ public void testCancelledInvoke() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); assertTrue(f.cancel(true)); try { Integer r = f.invoke(); shouldThrow(); } catch (CancellationException success) { checkCancelled(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * join of a forked task throws exception when task cancelled */ public void testCancelledForkJoin() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); assertTrue(f.cancel(true)); assertSame(f, f.fork()); try { Integer r = f.join(); shouldThrow(); } catch (CancellationException success) { checkCancelled(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * get of a forked task throws exception when task cancelled */ public void testCancelledForkGet() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() throws Exception { FibTask f = new FibTask(8); assertTrue(f.cancel(true)); assertSame(f, f.fork()); try { Integer r = f.get(); shouldThrow(); } catch (CancellationException success) { checkCancelled(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * timed get of a forked task throws exception when task cancelled */ public void testCancelledForkTimedGet() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() throws Exception { FibTask f = new FibTask(8); assertTrue(f.cancel(true)); assertSame(f, f.fork()); try { Integer r = f.get(LONG_DELAY_MS, MILLISECONDS); shouldThrow(); } catch (CancellationException success) { checkCancelled(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * A reinitialized normally completed task may be re-invoked */ public void testReinitialize() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); checkNotDone(f); for (int i = 0; i < 3; i++) { Integer r = f.invoke(); assertEquals(21, (int) r); checkCompletedNormally(f, r); f.reinitialize(); f.publicSetRawResult(null); checkNotDone(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * A reinitialized abnormally completed task may be re-invoked */ public void testReinitializeAbnormal() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FailingFibTask f = new FailingFibTask(8); checkNotDone(f); for (int i = 0; i < 3; i++) { try { f.invoke(); shouldThrow(); } catch (FJException success) { checkCompletedAbnormally(f, success); } f.reinitialize(); checkNotDone(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invoke task throws exception after invoking completeExceptionally */ public void testCompleteExceptionally() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); f.completeExceptionally(new FJException()); try { Integer r = f.invoke(); shouldThrow(); } catch (FJException success) { checkCompletedAbnormally(f, success); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invokeAll(tasks) with > 2 argument invokes tasks */ public void testInvokeAll3() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); FibTask g = new FibTask(9); FibTask h = new FibTask(7); invokeAll(f, g, h); assertTrue(f.isDone()); assertTrue(g.isDone()); assertTrue(h.isDone()); checkCompletedNormally(f, 21); checkCompletedNormally(g, 34); checkCompletedNormally(h, 13); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invokeAll(collection) invokes all tasks in the collection */ public void testInvokeAllCollection() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); FibTask g = new FibTask(9); FibTask h = new FibTask(7); HashSet set = new HashSet(); set.add(f); set.add(g); set.add(h); invokeAll(set); assertTrue(f.isDone()); assertTrue(g.isDone()); assertTrue(h.isDone()); checkCompletedNormally(f, 21); checkCompletedNormally(g, 34); checkCompletedNormally(h, 13); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invokeAll(t1, t2) throw exception if any task does */ public void testAbnormalInvokeAll2() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); FailingFibTask g = new FailingFibTask(9); try { invokeAll(f, g); shouldThrow(); } catch (FJException success) { checkCompletedAbnormally(g, success); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invokeAll(tasks) with > 2 argument throws exception if any task does */ public void testAbnormalInvokeAll3() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask f = new FibTask(8); FailingFibTask g = new FailingFibTask(9); FibTask h = new FibTask(7); try { invokeAll(f, g, h); shouldThrow(); } catch (FJException success) { checkCompletedAbnormally(g, success); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * invokeAll(collection) throws exception if any task does */ public void testAbnormalInvokeAllCollection() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FailingFibTask f = new FailingFibTask(8); FibTask g = new FibTask(9); FibTask h = new FibTask(7); HashSet set = new HashSet(); set.add(f); set.add(g); set.add(h); try { invokeAll(set); shouldThrow(); } catch (FJException success) { checkCompletedAbnormally(f, success); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * tryUnfork returns true for most recent unexecuted task, * and suppresses execution */ public void testTryUnfork() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertTrue(f.tryUnfork()); helpQuiesce(); checkNotDone(f); checkCompletedNormally(g, 34); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(singletonPool(), a)); }
/** * getSurplusQueuedTaskCount returns > 0 when * there are more tasks than threads */ public void testGetSurplusQueuedTaskCount() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask h = new FibTask(7); assertSame(h, h.fork()); FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertTrue(getSurplusQueuedTaskCount() > 0); helpQuiesce(); assertEquals(0, getSurplusQueuedTaskCount()); checkCompletedNormally(f, 21); checkCompletedNormally(g, 34); checkCompletedNormally(h, 13); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(singletonPool(), a)); }
/** * peekNextLocalTask returns most recent unexecuted task. */ public void testPeekNextLocalTask() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertSame(f, peekNextLocalTask()); checkCompletesNormally(f, 21); helpQuiesce(); checkCompletedNormally(g, 34); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(singletonPool(), a)); }
/** * pollNextLocalTask returns most recent unexecuted task * without executing it */ public void testPollNextLocalTask() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertSame(f, pollNextLocalTask()); helpQuiesce(); checkNotDone(f); checkCompletedNormally(g, 34); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(singletonPool(), a)); }
/** * pollTask returns an unexecuted task without executing it */ public void testPollTask() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertSame(f, pollTask()); helpQuiesce(); checkNotDone(f); checkCompletedNormally(g, 34); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(singletonPool(), a)); }
/** * peekNextLocalTask returns least recent unexecuted task in async mode */ public void testPeekNextLocalTaskAsync() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertSame(g, peekNextLocalTask()); assertEquals(21, (int) f.join()); helpQuiesce(); checkCompletedNormally(f, 21); checkCompletedNormally(g, 34); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(asyncSingletonPool(), a)); }
/** * pollNextLocalTask returns least recent unexecuted task without * executing it, in async mode */ public void testPollNextLocalTaskAsync() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertSame(g, pollNextLocalTask()); helpQuiesce(); checkCompletedNormally(f, 21); checkNotDone(g); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(asyncSingletonPool(), a)); }
/** * pollTask returns an unexecuted task without executing it, in * async mode */ public void testPollTaskAsync() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() { FibTask g = new FibTask(9); assertSame(g, g.fork()); FibTask f = new FibTask(8); assertSame(f, f.fork()); assertSame(g, pollTask()); helpQuiesce(); checkCompletedNormally(f, 21); checkNotDone(g); return NoResult; }}; assertSame(NoResult, testInvokeOnPool(asyncSingletonPool(), a)); }
/** * timed get of a forked task throws exception when task completes abnormally */ public void testAbnormalForkTimedGet() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() throws Exception { FailingFibTask f = new FailingFibTask(8); assertSame(f, f.fork()); try { Integer r = f.get(5L, SECONDS); shouldThrow(); } catch (ExecutionException success) { Throwable cause = success.getCause(); assertTrue(cause instanceof FJException); checkCompletedAbnormally(f, cause); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }
/** * timed get of a forked task throws exception when task cancelled */ public void testCancelledForkTimedGet() { RecursiveTask<Integer> a = new CheckedRecursiveTask<Integer>() { public Integer realCompute() throws Exception { FibTask f = new FibTask(8); assertTrue(f.cancel(true)); assertSame(f, f.fork()); try { Integer r = f.get(5L, SECONDS); shouldThrow(); } catch (CancellationException success) { checkCancelled(f); } return NoResult; }}; assertSame(NoResult, testInvokeOnPool(mainPool(), a)); }