@Test @SuppressWarnings("unchecked") // mocked generics public void testContainerLocalizerClosesFilesystems() throws Exception { // verify filesystems are closed when localizer doesn't fail FileContext fs = FileContext.getLocalFSFileContext(); spylfs = spy(fs.getDefaultFileSystem()); ContainerLocalizer localizer = setupContainerLocalizerForTest(); doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); localizer.runLocalization(nmAddr); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); spylfs = spy(fs.getDefaultFileSystem()); // verify filesystems are closed when localizer fails localizer = setupContainerLocalizerForTest(); doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); localizer.runLocalization(nmAddr); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); }
void solveAny(Executor e, Collection<Callable<Integer>> solvers) throws InterruptedException { CompletionService<Integer> cs = new ExecutorCompletionService<>(e); int n = solvers.size(); List<Future<Integer>> futures = new ArrayList<>(n); Integer result = null; try { solvers.forEach(solver -> futures.add(cs.submit(solver))); for (int i = n; i > 0; i--) { try { Integer r = cs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { futures.forEach(future -> future.cancel(true)); } if (result != null) use(result); }
/** * poll returns non-null when the returned task is completed */ public void testPoll1() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; while ((f = cs.poll()) == null) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); } assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); }
/** * timed poll returns non-null when the returned task is completed */ public void testPoll2() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); assertNull(cs.poll()); cs.submit(new StringTask()); long startTime = System.nanoTime(); Future f; while ((f = cs.poll(SHORT_DELAY_MS, MILLISECONDS)) == null) { if (millisElapsedSince(startTime) > LONG_DELAY_MS) fail("timed out"); Thread.yield(); } assertTrue(f.isDone()); assertSame(TEST_STRING, f.get()); }
/** * poll returns null before the returned task is completed */ public void testPollReturnsNull() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); final CountDownLatch proceed = new CountDownLatch(1); cs.submit(new Callable() { public String call() throws Exception { proceed.await(); return TEST_STRING; }}); assertNull(cs.poll()); assertNull(cs.poll(0L, MILLISECONDS)); assertNull(cs.poll(Long.MIN_VALUE, MILLISECONDS)); long startTime = System.nanoTime(); assertNull(cs.poll(timeoutMillis(), MILLISECONDS)); assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); proceed.countDown(); assertSame(TEST_STRING, cs.take().get()); }
/** * successful and failed tasks are both returned */ public void testTaskAssortment() throws InterruptedException, ExecutionException { CompletionService cs = new ExecutorCompletionService(cachedThreadPool); ArithmeticException ex = new ArithmeticException(); for (int i = 0; i < 2; i++) { cs.submit(new StringTask()); cs.submit(callableThrowing(ex)); cs.submit(runnableThrowing(ex), null); } int normalCompletions = 0; int exceptionalCompletions = 0; for (int i = 0; i < 3 * 2; i++) { try { if (cs.take().get() == TEST_STRING) normalCompletions++; } catch (ExecutionException expected) { assertTrue(expected.getCause() instanceof ArithmeticException); exceptionalCompletions++; } } assertEquals(2 * 1, normalCompletions); assertEquals(2 * 2, exceptionalCompletions); assertNull(cs.poll()); }
void solveAny(Executor e, Collection<Callable<Integer>> solvers) throws InterruptedException { CompletionService<Integer> cs = new ExecutorCompletionService<>(e); int n = solvers.size(); List<Future<Integer>> futures = new ArrayList<>(n); Integer result = null; try { solvers.forEach((solver) -> futures.add(cs.submit(solver))); for (int i = n; i > 0; i--) { try { Integer r = cs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) {} } } finally { futures.forEach((future) -> future.cancel(true)); } if (result != null) use(result); }
private ByteBuffer getFirstToComplete( CompletionService<ByteBuffer> hedgedService, ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { if (futures.isEmpty()) { throw new InterruptedException("let's retry"); } Future<ByteBuffer> future = null; try { future = hedgedService.take(); ByteBuffer bb = future.get(); futures.remove(future); return bb; } catch (ExecutionException | CancellationException e) { // already logged in the Callable futures.remove(future); } throw new InterruptedException("let's retry"); }
public static void main(String []args) throws InterruptedException, ExecutionException { final Random random = new Random(); ExecutorService executorService = Executors.newFixedThreadPool(10); CompletionService<String>completionService = new ExecutorCompletionService<String>(executorService); for(int i = 0 ; i < 100 ; i++) { final int num = i; completionService.submit(new Callable<String>() { public String call() { try { Thread.sleep((random.nextLong()) & 5000); } catch (InterruptedException e) { e.printStackTrace(); } return "num" + num; } }); } for(int i = 0 ; i < 100 ; i++) { Future<String> f = completionService.take(); System.out.println(f.get()); } executorService.shutdown(); }
/** * 根据指定的列表关键数据及列表数据处理器,并发地处理并返回处理后的列表数据集合 * @param allKeys 列表关键数据 * @param handleBizDataFunc 列表数据处理器 * @param <T> 待处理的数据参数类型 * @param <R> 待返回的数据结果类型 * @return 处理后的列表数据集合 * * NOTE: 类似实现了 stream.par.map 的功能,不带延迟计算 */ public static <T,R> List<R> exec(List<T> allKeys, Function<List<T>, List<R>> handleBizDataFunc) { List<String> parts = TaskUtil.divide(allKeys.size(), TASK_SIZE); //System.out.println(parts); CompletionService<List<R>> completionService = new ExecutorCompletionService<>(executor); ForeachUtil.foreachDone(parts, (part) -> { final List<T> tmpRowkeyList = TaskUtil.getSubList(allKeys, part); completionService.submit( () -> handleBizDataFunc.apply(tmpRowkeyList)); // lambda replace inner class }); // foreach code refining List<R> result = ForeachUtil.foreachAddWithReturn(parts.size(), (ind) -> get(ind, completionService)); return result; }
/** * Removes all task from queue and shuts down executor service. * * @param taskQueue Queue of tasks * @param completionService Producer of new asynchronous tasks * @param executorService Tasks executor service */ public void finalizeQueue(Queue<Future<Void>> taskQueue, CompletionService<Void> completionService, ExecutorService executorService) { try { while (!taskQueue.isEmpty()) { taskQueue.remove(completionService.take()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { while (!taskQueue.isEmpty()) { taskQueue.poll().cancel(true); } executorService.shutdownNow(); } }
private ByteBuffer getFirstToComplete( CompletionService<ByteBuffer> hedgedService, ArrayList<Future<ByteBuffer>> futures) throws InterruptedException { if (futures.isEmpty()) { throw new InterruptedException("let's retry"); } Future<ByteBuffer> future = null; try { future = hedgedService.take(); ByteBuffer bb = future.get(); futures.remove(future); return bb; } catch (ExecutionException e) { // already logged in the Callable futures.remove(future); } catch (CancellationException ce) { // already logged in the Callable futures.remove(future); } throw new InterruptedException("let's retry"); }
public Boolean addToRequestQueue(JRTServerConfigRequest request, boolean forceResponse, CompletionService<Boolean> completionService) { // It's no longer delayed if we get here request.setDelayedResponse(false); //ConfigDebug.logDebug(log, System.currentTimeMillis(), request.getConfigKey(), "RpcServer.addToRequestQueue()"); try { final GetConfigProcessor task = new GetConfigProcessor(this, request, forceResponse); if (completionService == null) { executorService.submit(task); } else { completionService.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { task.run(); return true; } }); } updateWorkQueueMetrics(); return true; } catch (RejectedExecutionException e) { request.addErrorResponse(ErrorCode.INTERNAL_ERROR, "getConfig request queue size is larger than configured max limit"); respond(request); return false; } }
private void trainEachWithEarlyUpdate(Sentence sentence, CompletionService<List<BeamItem>> completionService) { State.StateIterator iterator = oracle.getState(sentence).getIterator(); State oracleState = iterator.next(); // initial state List<BeamItem> beam = new ArrayList<>(1); beam.add(new BeamItem(new State(sentence), 0.0)); boolean terminate = false; while (!terminate) { oracleState = iterator.next(); beam = getNextBeamItems(beam, beamWidth, classifier, completionService); terminate = beam.stream().allMatch(item -> item.getState().isTerminal()); final State finalOracleState = oracleState; // make a variable final to use it in lambda beam.stream().forEach(item -> { if (item.getState().equals(finalOracleState)) { System.out.println("pred.hashCode: " + item.getState().hashCode() + ", oracle.hashCode: " + finalOracleState.hashCode()); } }); boolean oracleInBeam = beam.stream().anyMatch(item -> item.getState().equals(finalOracleState));; if (!oracleInBeam || (!terminate && !iterator.hasNext())) { classifier.update(oracleState, beam.get(0).getState()); // early update break; } } }
public List<BeamItem> getNextBeamItems(List<BeamItem> beam, int beamWidth, Perceptron classifier, CompletionService<List<BeamItem>> completionService) { try { List<BeamItem> items1 = BeamSearchDecoder.super.getNextBeamItems(beam, beamWidth, classifier); List<BeamItem> items2 = BeamSearchDecoder.super.getNextBeamItems(beam, beamWidth, classifier, completionService); for (int i = 0; i < items1.size(); i++) { BeamItem item1 = items1.get(i); BeamItem item2 = items2.get(i); System.out.println("item1: " + item1.getState() + ": score=" + item1.getScore()); System.out.println("item2: " + item2.getState() + ": score=" + item2.getScore()); if (!item1.equals(item2)) { throw new Exception(item1 + " != " + item2); } } System.out.println("========"); return items2; } catch (Exception e) { throw new RuntimeException(e); } }
@Test public void testDoSmth() throws Exception { Demo.DemoRequest.Builder req = Demo.DemoRequest.newBuilder(); req.setUserId(1); int multiSize = 12; int totalRequestSize = 10; ExecutorService pool = Executors.newFixedThreadPool(multiSize); CompletionService<Demo.DemoResponse> completionService = new ExecutorCompletionService<Demo.DemoResponse>( pool); Invoker invoker = new Invoker(req.build()); long time = System.currentTimeMillis(); for (int i = 0; i < totalRequestSize; i++) { completionService.submit(invoker); } for (int i = 0; i < totalRequestSize; i++) { completionService.take().get(); } long timetook = System.currentTimeMillis() - time; System.out.println("Total using " + timetook + "ms"); System.out.println("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize))); }
/** * 调用服务端 * * @param port * @param multiSize * 并发数 * @param invokeNum * 总请求数 * @param size * batch请求的数据内含的list数量 * @param textLength * batch请求数据中随机字符串的长度 * @throws Exception */ public void run(int port, int multiSize, int invokeNum, int size, int textLength) throws Exception { PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(), "127.0.0.1", port, 60000); ExecutorService pool = Executors.newFixedThreadPool(multiSize); CompletionService<DemoBatchResponse> completionService = new ExecutorCompletionService<DemoBatchResponse>( pool); BatchInvoker invoker = new BatchInvoker(client, size, RandomUtils.generateString(textLength)); long time = System.currentTimeMillis(); for (int i = 0; i < invokeNum; i++) { completionService.submit(invoker); } for (int i = 0; i < invokeNum; i++) { completionService.take().get(); } long timetook = System.currentTimeMillis() - time; LOG.info("Send " + invokeNum + " requests using " + timetook + "ms"); LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * invokeNum))); }
public void testPoolBatch() throws Exception { PbrpcClient client = PbrpcClientFactory.buildPooledConnection(new PooledConfiguration(), "127.0.0.1", 8088, 60000); int multiSize = 8; int totalRequestSize = 100; ExecutorService pool = Executors.newFixedThreadPool(multiSize); CompletionService<DemoBatchResponse> completionService = new ExecutorCompletionService<DemoBatchResponse>( pool); BatchInvoker invoker = new BatchInvoker(client); long time = System.currentTimeMillis(); for (int i = 0; i < totalRequestSize; i++) { completionService.submit(invoker); } for (int i = 0; i < totalRequestSize; i++) { completionService.take().get(); } long timetook = System.currentTimeMillis() - time; LOG.info("Total using " + timetook + "ms"); LOG.info("QPS:" + 1000f / ((timetook) / (1.0f * totalRequestSize))); }
@Test @SuppressWarnings("unchecked") // mocked generics public void testContainerLocalizerClosesFilesystems() throws Exception { // verify filesystems are closed when localizer doesn't fail ContainerLocalizer localizer = setupContainerLocalizerForTest(); doNothing().when(localizer).localizeFiles(any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); localizer.runLocalization(nmAddr); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); // verify filesystems are closed when localizer fails localizer = setupContainerLocalizerForTest(); doThrow(new YarnRuntimeException("Forced Failure")).when(localizer).localizeFiles( any(LocalizationProtocol.class), any(CompletionService.class), any(UserGroupInformation.class)); verify(localizer, never()).closeFileSystems( any(UserGroupInformation.class)); localizer.runLocalization(nmAddr); verify(localizer).closeFileSystems(any(UserGroupInformation.class)); }
@Override public void run(final MessageConsumerCoordinator messageConsumerCoordinator, final MessageProducerCoordinator messageProducerCoordinator, final int numConsumers, final int numProducers, final long totalNumberOfMessages, final Report report) { CompletionService<Histogram> producerCompletionService = messageProducerCoordinator.startProducers(); // Producer / No Consumer Stopwatch producerStartTime = Stopwatch.createStarted(); report.aggregateAndPrintResults(CoordinatorType.PRODUCER, producerCompletionService, numProducers, totalNumberOfMessages, producerStartTime); // Now, let's start some consumers with producers final CompletionService<Histogram> consumerCompletionService = messageConsumerCoordinator.startConsumers(); producerCompletionService = messageProducerCoordinator.startProducers(); producerStartTime = Stopwatch.createStarted(); final Stopwatch consumerStartTime = Stopwatch.createStarted(); report.aggregateAndPrintResults(CoordinatorType.PRODUCER, producerCompletionService, numProducers, totalNumberOfMessages, producerStartTime); report.aggregateAndPrintResults(CoordinatorType.CONSUMER, consumerCompletionService, numConsumers, totalNumberOfMessages, consumerStartTime); }
@Override public void run(final MessageConsumerCoordinator messageConsumerCoordinator, final MessageProducerCoordinator messageProducerCoordinator, final int numConsumers, final int numProducers, final long totalNumberOfMessages, final Report report) { final CompletionService<Histogram> consumerCompletionService = messageConsumerCoordinator.startConsumers(); final CompletionService<Histogram> producerCompletionService = messageProducerCoordinator.startProducers(); // Note that the timer is started after startConsumers() and startProducers(), this is by purpose to exclude the initialization time. final Stopwatch producerStartTime = Stopwatch.createStarted(); final Stopwatch consumerStartTime = Stopwatch.createStarted(); report.aggregateAndPrintResults(CoordinatorType.PRODUCER, producerCompletionService, numProducers, totalNumberOfMessages, producerStartTime); report.aggregateAndPrintResults(CoordinatorType.CONSUMER, consumerCompletionService, numConsumers, totalNumberOfMessages, consumerStartTime); }
@Override public CompletionService<Histogram> startConsumers() { final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // Create message streams final Map<String, Integer> topicMap = new HashMap<>(); topicMap.put(topic, numThreads); final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap); final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // Pass each stream to a consumer that will read from the stream in its own thread. for (final KafkaStream<byte[], byte[]> stream : streams) { executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream)); } return executorCompletionService; }
private long solve(Executor executor, Collection<Callable<Long>> solvers) { CompletionService<Long> ecs = new ExecutorCompletionService<>(executor); List<Future<Long>> futures = new ArrayList<>(solvers.size()); for (Callable<Long> solver : solvers) { futures.add(ecs.submit(solver)); } try { return ecs.take().get(); } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException(e); } finally { for (Future<Long> f : futures) { f.cancel(true); } } }
private List<String> createProjectsParallel(final Path parentFolder) throws InterruptedException, ExecutionException { List<Future<String>> futures = new ArrayList<>(); CompletionService<String> service = new ExecutorCompletionService<>(Executors.newFixedThreadPool(8)); List<SpringBootProjectParams> expandedMatrix = generateSringBootMatrix(); for (final SpringBootProjectParams params : expandedMatrix) { service.submit(new Callable<String>() { @Override public String call() throws Exception { return createProject(parentFolder, params); } }); } for(int i=0 ; i<expandedMatrix.size() ; i++) { futures.add(service.take()); } List<String> modules = new ArrayList<>(); for(Future<String> future : futures) { modules.add(future.get()); } return modules; }
private static void runTest(VehicleController controller, ExecutorService exec, long duration) { CompletionService<TaskStatistics> ecs = new ExecutorCompletionService<>(exec); List<TaskStatistics> results = new ArrayList<>(); //submit tasks for concurrent execution for (int i = 0; i < NUM_THREADS; i++){ System.out.println("Submitting task: TASK-" + i); ecs.submit(new VehicleTask("TASK-" + i, controller, duration)); } // Wait for completion and print individul results for (int i = 0; i < NUM_THREADS; ++i) { try { TaskStatistics nextStat = ecs.take().get(); results.add(nextStat); //block till next task finishes updateCounters(nextStat); System.out.println(nextStat); //block till next task finishes } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
/** * * Optimizes all of the passed in images. This process is multi-threaded so * that the number of threads is equal to the number of CPUs. * * @param conversionType If and how to handle converting images from one * type to another. * @param includeWebPConversion If <code>true</code> then the a WebP version * of the image will also be generated (if it * is smaller). * @param files The images to optimize * @return The results from the optimization. All items in the {@link List} * are considered optimized, not <code>null</code>, and will exclude * images that could not be optimized to a smaller size. * @throws ImageFileOptimizationException If there are any issues optimizing * an image. * @throws TimeoutException Happens if it takes to long to optimize an * image. * @see #optimizeAllImages(com.salesforce.perfeng.uiperf.imageoptimization.service.IImageOptimizationService.FileTypeConversion, boolean, File...) * @see com.salesforce.perfeng.uiperf.imageoptimization.service.IImageOptimizationService#optimizeAllImages(FileTypeConversion, boolean, Collection) */ @Override public List<OptimizationResult<C>> optimizeAllImages(final FileTypeConversion conversionType, final boolean includeWebPConversion, final Collection<File> files) throws ImageFileOptimizationException, TimeoutException { if((files == null) || files.isEmpty()) { return Collections.emptyList(); } final CompletionService<OptimizationResult<C>> completionService = new ExecutorCompletionService<>(executorService); int i = 0; final Date start = new Date(); final long time = System.nanoTime(); final ArrayList<Future<OptimizationResult<C>>> futures = new ArrayList<>(); for(final File file : files) { futures.addAll(submitExecuteOptimization(completionService, file, new StringBuilder(tmpWorkingDirectory.getAbsolutePath()).append(File.separatorChar).append("scratch").append(time).append(i++), conversionType, includeWebPConversion)); } futures.trimToSize(); final List<OptimizationResult<C>> optimizedFiles = optimizeGroupOfImages(completionService, futures); logger.info("Image optimization elapsed time: " + (new Date().getTime() - start.getTime())); return optimizedFiles; }
/** * Deserialize {@link Communication} objects in parallel. * <br> * <br> * The {@link ExecutorCompletionService} guarantees that the objects are returned in the order that they are queued. * In other words, one can safely iterate over the returned object and wait without truly blocking. * * @param pathToCommFiles - path to a text file containing paths on disk to serialized {@link Communication} files. * @return a {@link List} of {@link Future} objects with a {@link Communication} expected. * @throws FileNotFoundException if the passed in {@link Path} does not exist on disk. */ public List<Future<Communication>> bulkLoad(Path pathToCommFiles) throws FileNotFoundException { List<Path> paths = new ArrayList<>(); try(Scanner sc = new Scanner(pathToCommFiles.toFile())) { while (sc.hasNextLine()) paths.add(Paths.get(sc.nextLine())); } CompletionService<Communication> srv = new ExecutorCompletionService<>(this.runner); List<Future<Communication>> commList = new ArrayList<>(); for (Path p : paths) { Future<Communication> f = srv.submit(new CallablePathToCommunication(p)); commList.add(f); } return commList; }
/** * GET urls in parallel using the executor service. * @param urls absolute URLs to GET * @param timeoutMs timeout in milliseconds for each GET request * @return instance of CompletionService. Completion service will provide * results as they arrive. The order is NOT same as the order of URLs */ public CompletionService<GetMethod> execute(@Nonnull List<String> urls, final int timeoutMs) { Preconditions.checkNotNull(urls); Preconditions.checkArgument(timeoutMs > 0, "Timeout value for multi-get must be greater than 0"); CompletionService<GetMethod> completionService = new ExecutorCompletionService<>(executor); for (final String url : urls) { completionService.submit(new Callable<GetMethod>() { @Override public GetMethod call() throws Exception { HttpClient client = new HttpClient(connectionManager); GetMethod getMethod = new GetMethod(url); getMethod.getParams().setSoTimeout(timeoutMs); // if all connections in the connection manager are busy this will wait to retrieve a connection // set time to wait to retrieve a connection from connection manager client.getParams().setConnectionManagerTimeout(timeoutMs); client.executeMethod(getMethod); return getMethod; } }); } return completionService; }
private void submitProcessTask(final CacheLoaderTask<K, V> cacheLoaderTask, CompletionService<Void> ecs, final TaskContext taskContext, final Set<Object> batch, final boolean loadEntry, final boolean loadMetadata) { ecs.submit(new Callable<Void>() { @Override public Void call() throws Exception { try { for (Object key : batch) { if (taskContext.isStopped()) break; if (!loadEntry && !loadMetadata) { cacheLoaderTask.processEntry( initializationContext.getMarshalledEntryFactory().newMarshalledEntry(key, (Object) null, null), taskContext); } else { cacheLoaderTask.processEntry(load(key), taskContext); } } } catch (Exception e) { log.errorExecutingParallelStoreTask(e); throw e; } return null; } }); }