@Test public void testMultipleClients() throws Exception { ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); try { ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<Boolean>(exec); for (int i = 0; i < NUM_THREADS; ++i) ecs.submit(new IdLockTestThread("client_" + i)); for (int i = 0; i < NUM_THREADS; ++i) { Future<Boolean> result = ecs.take(); assertTrue(result.get()); } idLock.assertMapEmpty(); } finally { exec.shutdown(); exec.awaitTermination(5000, TimeUnit.MILLISECONDS); } }
public GraphBasedSaga(EventStore eventStore, Executor executor, Map<String, SagaTask> tasks, SagaContext sagaContext, SingleLeafDirectedAcyclicGraph<SagaRequest> sagaTaskGraph) { this.eventStore = eventStore; this.tasks = tasks; this.transactionTaskRunner = new TaskRunner( traveller(sagaTaskGraph, new FromRootTraversalDirection<>()), new TransactionTaskConsumer( tasks, sagaContext, new ExecutorCompletionService<>(executor))); this.sagaContext = sagaContext; this.compensationTaskRunner = new TaskRunner( traveller(sagaTaskGraph, new FromLeafTraversalDirection<>()), new CompensationTaskConsumer(tasks, sagaContext)); currentTaskRunner = transactionTaskRunner; }
/** * * Very transient * * @param timeOutSeconds * @param numberOfThreads * @param outputWriter */ public TransferManager( Application csapApp, int timeOutSeconds, BufferedWriter outputWriter ) { this.csapApp = csapApp; logger.debug( "Number of workers: {}", csapApp.lifeCycleSettings().getNumberWorkerThreads() ); this.timeOutSeconds = timeOutSeconds; osCommandRunner = new OsCommandRunner( timeOutSeconds, 1, "TransferMgr" ); this.globalWriterForResults = outputWriter; updateProgress( "\nExecuting distribution using : " + csapApp.lifeCycleSettings().getNumberWorkerThreads() + " threads.\n\n" ); BasicThreadFactory schedFactory = new BasicThreadFactory.Builder() .namingPattern( "CsapFileTransfer-%d" ) .daemon( true ) .priority( Thread.NORM_PRIORITY ) .build(); fileTransferService = Executors.newFixedThreadPool( csapApp.lifeCycleSettings().getNumberWorkerThreads(), schedFactory ); fileTransferComplete = new ExecutorCompletionService<String>( fileTransferService ); }
public CsapEventClient( ) { BasicThreadFactory eventThreadFactory = new BasicThreadFactory.Builder() .namingPattern( "CsapEventPost-%d" ) .daemon( true ) .priority( Thread.NORM_PRIORITY + 1 ) .build(); eventPostQueue = new ArrayBlockingQueue<>( MAX_EVENT_BACKLOG ); // Use a single thread to sequence and post // eventPostPool = Executors.newFixedThreadPool(1, schedFactory, queue); // really only needs to be 1 - adding the others for lt scenario eventPostPool = new ThreadPoolExecutor( 1, 1, 30, TimeUnit.SECONDS, eventPostQueue, eventThreadFactory ); eventPostCompletionService = new ExecutorCompletionService<String>( eventPostPool ); }
private void doTestNullKeyNoHeader() throws Exception { final KafkaChannel channel = startChannel(false); Properties props = channel.getProducerProps(); KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); for (int i = 0; i < 50; i++) { ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes()); producer.send(data).get(); } ExecutorCompletionService<Void> submitterSvc = new ExecutorCompletionService<Void>(Executors.newCachedThreadPool()); List<Event> events = pullEvents(channel, submitterSvc, 50, false, false); wait(submitterSvc, 5); List<String> finals = new ArrayList<String>(50); for (int i = 0; i < 50; i++) { finals.add(i, events.get(i).getHeaders().get(KEY_HEADER)); } for (int i = 0; i < 50; i++) { Assert.assertTrue( finals.get(i) == null); } channel.stop(); }
private void putEvents(final KafkaChannel channel, final List<List<Event>> events, ExecutorCompletionService<Void> submitterSvc) { for (int i = 0; i < 5; i++) { final int index = i; submitterSvc.submit(new Callable<Void>() { @Override public Void call() { Transaction tx = channel.getTransaction(); tx.begin(); List<Event> eventsToPut = events.get(index); for (int j = 0; j < 10; j++) { channel.put(eventsToPut.get(j)); } try { tx.commit(); } finally { tx.close(); } return null; } }); } }
@PostConstruct public void init() { final ThreadPoolExecutor executor = new BlockingThreadPoolExecutor(2, 2, 5, TimeUnit.MINUTES, 2, TimeUnit.MINUTES, new NamedThreadFactory("ThumbnailServiceExecutor"), new Callable<Boolean>() { @Override public Boolean call() { //Wait forever LOGGER.trace("Waited 2 minutes to queue a thumb job, waiting again."); return true; } }); completionService = new ExecutorCompletionService<ThumbingCallableResult>(executor); new Thread() { @Override public void run() { setName("Thumb task finisher listener"); watchCompleted(); } }.start(); }
public ConcurrentTransferWorker(final SessionPool source, final SessionPool destination, final Transfer transfer, final TransferOptions options, final TransferSpeedometer meter, final TransferPrompt prompt, final TransferErrorCallback error, final ConnectionCallback connectionCallback, final PasswordCallback passwordCallback, final ProgressListener progressListener, final StreamListener streamListener) { super(transfer, options, prompt, meter, error, progressListener, streamListener, connectionCallback, passwordCallback); this.source = source; this.destination = destination; final ThreadPool pool = ThreadPoolFactory.get("transfer", transfer.getSource().getTransferType() == Host.TransferType.newconnection ? 1 : PreferencesFactory.get().getInteger("queue.connections.limit")); this.completion = new ExecutorCompletionService<TransferStatus>(pool.executor()); }
private void submitFileBatch(List<Future> futures, ExecutorCompletionService completionService, final FileBatch fileBatch, final File rootDir, final WeightController controller) { futures.add(completionService.submit(new Callable<FileLoadContext>() { public FileLoadContext call() throws Exception { try { MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(fileBatch.getIdentity().getPipelineId())); FileLoadAction fileLoadAction = (FileLoadAction) beanFactory.getBean("fileLoadAction", FileLoadAction.class); return fileLoadAction.load(fileBatch, rootDir, controller); } finally { MDC.remove(OtterConstants.splitPipelineLogFileKey); } } })); }
private void submitRowBatch(List<Future> futures, ExecutorCompletionService completionService, final List<RowBatch> rowBatchs, final WeightController controller) { for (final RowBatch rowBatch : rowBatchs) { // 提交多个并行加载通道 futures.add(completionService.submit(new Callable<DbLoadContext>() { public DbLoadContext call() throws Exception { try { MDC.put(OtterConstants.splitPipelineLogFileKey, String.valueOf(rowBatch.getIdentity().getPipelineId())); // dbLoadAction是一个pool池化对象 DbLoadAction dbLoadAction = (DbLoadAction) beanFactory.getBean("dbLoadAction", DbLoadAction.class); return dbLoadAction.load(rowBatch, controller); } finally { MDC.remove(OtterConstants.splitPipelineLogFileKey); } } })); } }
DAGIterator(DAGRequest req, List<RangeSplitter.RegionTask> regionTasks, TiSession session, SchemaInfer infer, PushDownType pushDownType) { super(req, regionTasks, session, infer); this.pushDownType = pushDownType; switch (pushDownType) { case NORMAL: dagService = new ExecutorCompletionService<>(session.getThreadPoolForTableScan()); break; case STREAMING: streamingService = new ExecutorCompletionService<>(session.getThreadPoolForTableScan()); break; } submitTasks(); }
void solveAny(Executor e, Collection<Callable<Integer>> solvers) throws InterruptedException { CompletionService<Integer> cs = new ExecutorCompletionService<>(e); int n = solvers.size(); List<Future<Integer>> futures = new ArrayList<>(n); Integer result = null; try { solvers.forEach(solver -> futures.add(cs.submit(solver))); for (int i = n; i > 0; i--) { try { Integer r = cs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { futures.forEach(future -> future.cancel(true)); } if (result != null) use(result); }
@Test(timeout = 60000) public void testMultipleClients() throws Exception { ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); try { ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<Boolean>(exec); for (int i = 0; i < NUM_THREADS; ++i) ecs.submit(new IdLockTestThread("client_" + i)); for (int i = 0; i < NUM_THREADS; ++i) { Future<Boolean> result = ecs.take(); assertTrue(result.get()); } // make sure the entry pool will be cleared after GC and purge call int entryPoolSize = idLock.purgeAndGetEntryPoolSize(); LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize); assertEquals(0, entryPoolSize); } finally { exec.shutdown(); exec.awaitTermination(5000, TimeUnit.MILLISECONDS); } }
/** * poll returns non-null when the returned task is completed */ public void testPoll1() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; while ((f = cs.poll()) == null) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); } assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); }
/** * timed poll returns non-null when the returned task is completed */ public void testPoll2() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); } assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); }
/** * poll returns null before the returned task is completed */ public void testPollReturnsNull() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); final CountDownLatch proceed = new CountDownLatch(1); cs.submit(new Callable() { public String call() throws Exception { proceed.await(); return TEST_STRING; }}); assertNull(cs.poll()); assertNull(cs.poll(0L, MILLISECONDS)); assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS)); long startTime = System.nanoTime(); assertNull(cs.poll(timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); proceed.countDown(); assertSame(TEST_STRING, cs.take().get()); }
/** * successful and failed tasks are both returned */ public void testTaskAssortment() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); ArithmeticException ex = new ArithmeticException(); for (int i = 0; i < 2; i++) { cs.submit(new StringTask()); cs.submit(callableThrowing(ex)); cs.submit(runnableThrowing(ex), null); } int normalCompletions = 0; int exceptionalCompletions = 0; for (int i = 0; i < 3 * 2; i++) { try { if (cs.take().get() == TEST_STRING) normalCompletions++; } catch (ExecutionException expected) { assertTrue(expected.getCause() instanceof ArithmeticException); exceptionalCompletions++; } } assertEquals(2 * 1, normalCompletions); assertEquals(2 * 2, exceptionalCompletions); assertNull(cs.poll()); }
void solveAny(Executor e, Collection<Callable<Integer>> solvers) throws InterruptedException { CompletionService<Integer> cs = new ExecutorCompletionService<>(e); int n = solvers.size(); List<Future<Integer>> futures = new ArrayList<>(n); Integer result = null; try { solvers.forEach((solver) -> futures.add(cs.submit(solver))); for (int i = n; i > 0; i--) { try { Integer r = cs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { futures.forEach((future) -> future.cancel(true)); } if (result != null) use(result); }
public static void main(String []args) throws InterruptedException, ExecutionException { final Random random = new Random(); ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService<String>completionService = new ExecutorCompletionService<String>(executorService); for(int i = 0 ; i < 100 ; i++) { final int num = i; completionService.submit(new Callable<String>() { public String call() { try { Thread.sleep((random.nextLong()) & 5000); } catch (InterruptedException e) { e.printStackTrace(); } return "num" + num; } }); } for(int i = 0 ; i < 100 ; i++) { Future<String> f = completionService.take(); System.out.println(f.get()); } executorService.shutdown(); }
/** * 根据指定的列表关键数据及列表数据处理器,并发地处理并返回处理后的列表数据集合 * @param allKeys 列表关键数据 * @param handleBizDataFunc 列表数据处理器 * @param <T> 待处理的数据参数类型 * @param <R> 待返回的数据结果类型 * @return 处理后的列表数据集合 * * NOTE: 类似实现了 stream.par.map 的功能,不带延迟计算 */ public static <T,R> List<R> exec(List<T> allKeys, Function<List<T>, List<R>> handleBizDataFunc) { List<String> parts = TaskUtil.divide(allKeys.size(), TASK_SIZE); //System.out.println(parts); CompletionService<List<R>> completionService = new ExecutorCompletionService<>(executor); ForeachUtil.foreachDone(parts, (part) -> { final List<T> tmpRowkeyList = TaskUtil.getSubList(allKeys, part); completionService.submit( () -> handleBizDataFunc.apply(tmpRowkeyList)); // lambda replace inner class }); // foreach code refining List<R> result = ForeachUtil.foreachAddWithReturn(parts.size(), (ind) -> get(ind, completionService)); return result; }
private AudioPlaylist loadTracksAsynchronously(List<String> videoIds, String selectedVideoId) { ExecutorCompletionService<AudioItem> completion = new ExecutorCompletionService<>(mixLoadingExecutor); List<AudioTrack> tracks = new ArrayList<>(); for (final String videoId : videoIds) { completion.submit(() -> sourceManager.loadTrackWithVideoId(videoId, true)); } try { fetchTrackResultsFromExecutor(completion, tracks, videoIds.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } AudioTrack selectedTrack = sourceManager.findSelectedTrack(tracks, selectedVideoId); if (tracks.isEmpty()) { throw new FriendlyException("No tracks from the mix loaded succesfully.", SUSPICIOUS, null); } else if (selectedTrack == null) { throw new FriendlyException("The selected track of the mix failed to load.", SUSPICIOUS, null); } return new BasicAudioPlaylist("YouTube mix", tracks, selectedTrack, false); }
private void fetchTrackResultsFromExecutor(ExecutorCompletionService<AudioItem> completion, List<AudioTrack> tracks, int size) throws InterruptedException { for (int i = 0; i < size; i++) { try { AudioItem item = completion.take().get(); if (item instanceof AudioTrack) { tracks.add((AudioTrack) item); } } catch (ExecutionException e) { if (e.getCause() instanceof FriendlyException) { ExceptionTools.log(log, (FriendlyException) e.getCause(), "Loading a track from a mix."); } else { log.warn("Failed to load a track from a mix.", e); } } } }
/** * Submit multiple jobs concurrently using ProcessSource. */ @Test public void testMultiTopology() throws Exception { int executions = 4; ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>( Executors.newFixedThreadPool(executions)); for (int i = 0; i < executions; i++) { completer.submit(() -> { Topology t = newTopology(); TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h"); s.sink((tuple) -> { if ("h".equals(tuple)) System.out.println(tuple);}); Condition<Long> tc = t.getTester().tupleCount(s, 8); complete(t, tc); return true; }); } waitForCompletion(completer, executions); }
/** * Submit multiple jobs concurrently using ProcessSource. */ @Test public void testMultiTopologyWithError() throws Exception { int executions = 4; ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>( Executors.newFixedThreadPool(executions)); for (int i = 0; i < executions; i++) { completer.submit(() -> { Topology t = newTopology(); TStream<String> s = t.strings("a", "b", "c", "d", "e", "f", "g", "h"); // Throw on the 8th tuple s.sink((tuple) -> { if ("h".equals(tuple)) throw new RuntimeException("Expected Test Exception");}); // Expect 7 tuples out of 8 Condition<Long> tc = t.getTester().tupleCount(s, 7); complete(t, tc); return true; }); } waitForCompletion(completer, executions); }
/** * Submit multiple jobs concurrently using PeriodicSource. */ @Test public void testMultiTopologyPollWithError() throws Exception { int executions = 4; ExecutorCompletionService<Boolean> completer = new ExecutorCompletionService<>( Executors.newFixedThreadPool(executions)); for (int i = 0; i < executions; i++) { completer.submit(() -> { Topology t = newTopology(); AtomicLong n = new AtomicLong(0); TStream<Long> s = t.poll(() -> n.incrementAndGet(), 10, TimeUnit.MILLISECONDS); // Throw on the 8th tuple s.sink((tuple) -> { if (8 == n.get()) throw new RuntimeException("Expected Test Exception");}); // Expect 7 tuples out of 8 Condition<Long> tc = t.getTester().tupleCount(s, 7); complete(t, tc); return true; }); } waitForCompletion(completer, executions); }
private void waitForCompletion(ExecutorCompletionService<Boolean> completer, int numtasks) throws ExecutionException { int remainingTasks = numtasks; while (remainingTasks > 0) { try { Future<Boolean> completed = completer.poll(4, TimeUnit.SECONDS); if (completed == null) { System.err.println("Completer timed out"); throw new RuntimeException(new TimeoutException()); } else { completed.get(); } } catch (InterruptedException e) { e.printStackTrace(); } remainingTasks--; } }
@Test public void testDoSmth() throws Exception { Demo.DemoRequest.Builder req = Demo.DemoRequest.newBuilder(); req.setUserId(1); int multiSize = 12; int totalRequestSize = 10; ExecutorService pool = Executors.newFixedThreadPool(multiSize); CompletionService<Demo.DemoResponse> completionService = new ExecutorCompletionService<Demo.DemoResponse>( pool); Invoker invoker = new Invoker(req.build()); long time = System.currentTimeMillis(); for (int i = 0; i < totalRequestSize; i++) { completionService.submit(invoker); } for (int i = 0; i < totalRequestSize; i++) { completionService.take().get(); } long timetook = System.currentTimeMillis() - time; System.out.println("Total using " + timetook + "ms"); System.out.println("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize))); }
/** * 调用服务端 * * @param port * @param multiSize * 并发数 * @param invokeNum * 总请求数 * @param size * batch请求的数据内含的list数量 * @param textLength * batch请求数据中随机字符串的长度 * @throws Exception */ public void run(int port, int multiSize, int invokeNum, int size, int textLength) throws Exception { PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(), "127.0.0.1", port, 60000); ExecutorService pool = Executors.newFixedThreadPool(multiSize); CompletionService<DemoBatchResponse> completionService = new ExecutorCompletionService<DemoBatchResponse>( pool); BatchInvoker invoker = new BatchInvoker(client, size, RandomUtils.generateString(textLength)); long time = System.currentTimeMillis(); for (int i = 0; i < invokeNum; i++) { completionService.submit(invoker); } for (int i = 0; i < invokeNum; i++) { completionService.take().get(); } long timetook = System.currentTimeMillis() - time; LOG.info("Send " + invokeNum + " requests using " + timetook + "ms"); LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * invokeNum))); }
public void testPoolBatch() throws Exception { PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(), "127.0.0.1", 8088, 60000); int multiSize = 8; int totalRequestSize = 100; ExecutorService pool = Executors.newFixedThreadPool(multiSize); CompletionService<DemoBatchResponse> completionService = new ExecutorCompletionService<DemoBatchResponse>( pool); BatchInvoker invoker = new BatchInvoker(client); long time = System.currentTimeMillis(); for (int i = 0; i < totalRequestSize; i++) { completionService.submit(invoker); } for (int i = 0; i < totalRequestSize; i++) { completionService.take().get(); } long timetook = System.currentTimeMillis() - time; LOG.info("Total using " + timetook + "ms"); LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize))); }
public BlockingKafkaMessageConsumerCoordinator(final String zookeeper, final String groupId, final String topic, final int numThreads) { super(topic, numThreads); Preconditions.checkNotNull(zookeeper, "zookeeper cannot be null"); Preconditions.checkNotNull(groupId, "groupId cannot be null"); final Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", String.valueOf(Consts.ZOOKEEPER_SESSION_TIMEOUT_MS)); // Zookeeper session timeout. If the consumer fails to heartbeat to zookeeper for this period of time it is considered dead and a rebalance will occur. props.put("zookeeper.sync.time.ms", String.valueOf(Consts.ZOOKEEPER_SYNC_TIME_MS)); // How far a ZK follower can be behind a ZK leader. props.put("auto.commit.interval.ms", String.valueOf(Consts.KAFKA_AUTO_COMMIT_INTERVAL_MS)); // The frequency in ms that the consumer offsets are committed to zookeeper. // XXX: Is there a better way to do this? // I have been thinking about other ways, such as using special "poison" message to indicate // end of consumption, however there is no guarantee that all of the consumers will recieve // the same message. // This will throw a timeout exception after specified time. props.put("consumer.timeout.ms", String.valueOf(Consts.POLLING_CONSUMER_MAX_IDLE_TIME_MS)); this.topic = topic; this.numThreads = numThreads; this.props = props; this.executorCompletionService = new ExecutorCompletionService<Histogram>(Executors.newFixedThreadPool(numThreads)); }
public void flush(FaweQueue queue) { int parallelThreads; if (Fawe.get().isMainThread()) { parallelThreads = Settings.IMP.QUEUE.PARALLEL_THREADS; Settings.IMP.QUEUE.PARALLEL_THREADS = 1; } else { parallelThreads = 0; } try { queue.startSet(Settings.IMP.QUEUE.PARALLEL_THREADS > 1); queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, Long.MAX_VALUE); } catch (Throwable e) { pool.awaitQuiescence(Settings.IMP.QUEUE.DISCARD_AFTER_MS, TimeUnit.MILLISECONDS); completer = new ExecutorCompletionService(pool); MainUtil.handleError(e); } finally { queue.endSet(Settings.IMP.QUEUE.PARALLEL_THREADS > 1); queue.setStage(QueueStage.NONE); queue.runTasks(); if (parallelThreads != 0) { Settings.IMP.QUEUE.PARALLEL_THREADS = parallelThreads; } } }
private long solve(Executor executor, Collection<Callable<Long>> solvers) { CompletionService<Long> ecs = new ExecutorCompletionService<>(executor); List<Future<Long>> futures = new ArrayList<>(solvers.size()); for (Callable<Long> solver : solvers) { futures.add(ecs.submit(solver)); } try { return ecs.take().get(); } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException(e); } finally { for (Future<Long> f : futures) { f.cancel(true); } } }
public AsynchInvocationHandler(I target, Executor executor, AsynchProxyListener listener) { this.target = target; this.completionService = new ExecutorCompletionService<GenericAsynchResult>( executor != null ? executor : Executors.newFixedThreadPool(DEFAULT_THREAD_COUNT, new AsynchInvocationHandlerThreadFactory()) ); this.listener = listener; if(listener != null) { Thread listenerThread = new ListenerNotificationThread(listener); listenerThread.setDaemon(true); listenerThread.start(); } }
private List<String> createProjectsParallel(final Path parentFolder) throws InterruptedException, ExecutionException { List<Future<String>> futures = new ArrayList<>(); CompletionService<String> service = new ExecutorCompletionService<>(Executors.newFixedThreadPool(8)); List<SpringBootProjectParams> expandedMatrix = generateSringBootMatrix(); for (final SpringBootProjectParams params : expandedMatrix) { service.submit(new Callable<String>() { @Override public String call() throws Exception { return createProject(parentFolder, params); } }); } for(int i=0 ; i<expandedMatrix.size() ; i++) { futures.add(service.take()); } List<String> modules = new ArrayList<>(); for(Future<String> future : futures) { modules.add(future.get()); } return modules; }
private static void runTest(VehicleController controller, ExecutorService exec, long duration) { CompletionService<TaskStatistics> ecs = new ExecutorCompletionService<>(exec); List<TaskStatistics> results = new ArrayList<>(); //submit tasks for concurrent execution for (int i = 0; i < NUM_THREADS; i++){ System.out.println("Submitting task: TASK-" + i); ecs.submit(new VehicleTask("TASK-" + i, controller, duration)); } // Wait for completion and print individul results for (int i = 0; i < NUM_THREADS; ++i) { try { TaskStatistics nextStat = ecs.take().get(); results.add(nextStat); //block till next task finishes updateCounters(nextStat); System.out.println(nextStat); //block till next task finishes } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }