@Test public void testThreadPoolMetricsOnClose() throws Exception { WorkerExecutor exec = vertx.createSharedWorkerExecutor("the-executor", 10); assertTrue(metricsService.getMetricsSnapshot(exec).size() > 0); assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-worker-thread").size() > 0); assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-internal-blocking").size() > 0); exec.close(); assertTrue(metricsService.getMetricsSnapshot(exec).size() == 0); assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-worker-thread").size() > 0); assertTrue(metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-internal-blocking").size() > 0); CountDownLatch latch = new CountDownLatch(1); vertx.close(ar -> latch.countDown()); awaitLatch(latch); assertEquals(new JsonObject(), metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-worker-thread")); assertEquals(new JsonObject(), metricsService.getMetricsSnapshot("vertx.pools.worker.vert.x-internal-blocking")); }
public static void main(String[] args) { Vertx vertx = Vertx.vertx(); CountDownLatch completion = new CountDownLatch(2); WorkerExecutor sharedWorker = vertx.createSharedWorkerExecutor("my-shared-pool", 20); sharedWorker.executeBlocking(successfulBlockingTask(), responseHandler(completion)); sharedWorker.executeBlocking(failedBlockingTask(), responseHandler(completion)); vertx.close(); }
@Test public void testWithBlockingWithWorker() throws Exception { AtomicBoolean calledSpy = new AtomicBoolean(); AtomicBoolean startedSpy = new AtomicBoolean(); vertx.createHttpServer().requestHandler(request -> { calledSpy.set(true); request.response().end("Alright"); }).listen(8081, ar -> { startedSpy.set(ar.succeeded()); }); await().atMost(DEFAULT_TIMEOUT).untilAtomic(startedSpy, is(true)); camel.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:my-route") .process(exchange -> Thread.sleep(3000)) .to("http://localhost:8081"); } }); WorkerExecutor pool = vertx.createSharedWorkerExecutor("some-fancy-name"); bridge = CamelBridge.create(vertx, new CamelBridgeOptions(camel) .addOutboundMapping(fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true) .setWorkerExecutor(pool))); camel.start(); BridgeHelper.startBlocking(bridge); vertx.eventBus().send("camel-route", "hello"); await().atMost(DEFAULT_TIMEOUT).untilAtomic(calledSpy, is(true)); }
public ContextScheduler(WorkerExecutor workerExecutor, boolean ordered) { Objects.requireNonNull(workerExecutor, "workerExecutor is null"); this.vertx = ((WorkerExecutorInternal) workerExecutor).vertx(); this.context = null; this.workerExecutor = workerExecutor; this.blocking = true; this.ordered = ordered; }
@Override public WorkerExecutor createSharedWorkerExecutor(String name) { return vertx.createSharedWorkerExecutor(name); }
@Override public WorkerExecutor createSharedWorkerExecutor(String name, int poolSize) { return vertx.createSharedWorkerExecutor(name, poolSize); }
@Override public WorkerExecutor createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) { return vertx.createSharedWorkerExecutor(name, poolSize, maxExecuteTime); }
@Test public void testThreadPoolMetrics() throws Exception { int size = 5; CountDownLatch done = new CountDownLatch(6); WorkerExecutor exec = vertx.createSharedWorkerExecutor("the-executor", size); JsonObject metrics = metricsService.getMetricsSnapshot(exec); assertMetricType("counter", metrics.getJsonObject("queue-size")); assertMetricType("timer", metrics.getJsonObject("queue-delay")); assertMetricType("counter", metrics.getJsonObject("in-use")); assertMetricType("timer", metrics.getJsonObject("usage")); assertMetricType("gauge", metrics.getJsonObject("pool-ratio")); assertMetricType("gauge", metrics.getJsonObject("max-pool-size")); assertCount(metrics.getJsonObject("usage"), 0); assertCount(metrics.getJsonObject("queue-delay"), 0); assertCount(metrics.getJsonObject("queue-size"), 0); assertCount(metrics.getJsonObject("in-use"), 0); assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)0D); assertEquals(metrics.getJsonObject("max-pool-size").getInteger("value"), (Integer)5); // CountDownLatch gate = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(5); for (int i = 0; i < size;i++) { exec.<Boolean>executeBlocking(fut -> { try { latch.countDown(); fut.complete(gate.await(10, TimeUnit.SECONDS)); } catch (InterruptedException e) { fut.fail(e); } }, false, ar -> { assertTrue(ar.succeeded()); assertTrue(ar.result()); vertx.runOnContext(v -> done.countDown()); }); } awaitLatch(latch); metrics = metricsService.getMetricsSnapshot(exec); assertCount(metrics.getJsonObject("usage"), 0); assertCount(metrics.getJsonObject("queue-delay"), 5); assertCount(metrics.getJsonObject("queue-size"), 0); assertCount(metrics.getJsonObject("in-use"), size); assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)1D); exec.executeBlocking(Future::complete, false, ar -> vertx.runOnContext(v -> done.countDown())); metrics = metricsService.getMetricsSnapshot(exec); assertCount(metrics.getJsonObject("usage"), 0); assertCount(metrics.getJsonObject("queue-delay"), 5); assertCount(metrics.getJsonObject("queue-size"), 1); assertCount(metrics.getJsonObject("in-use"), size); assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)1D); gate.countDown(); awaitLatch(done); metrics = metricsService.getMetricsSnapshot(exec); assertCount(metrics.getJsonObject("usage"), 6); assertCount(metrics.getJsonObject("queue-delay"), 6); assertCount(metrics.getJsonObject("queue-size"), 0); assertCount(metrics.getJsonObject("in-use"), 0); assertEquals(metrics.getJsonObject("pool-ratio").getDouble("value"), (Double)0D); }
public ContextScheduler(WorkerExecutor workerExecutor) { this(workerExecutor, true); }
/** * Creates a new instance of producer. * * @param vertx the vert.x instance * @param producer the underlying producer, must not be {@code null} * @param outbound the outbound configuration, must not be {@code null} * @param blocking whether or not the processing is blocking and so should not be run on the event * loop * @param pool the pool on which the blocking code is going to be executed */ public FromVertxToCamelProducer(Vertx vertx, Producer producer, OutboundMapping outbound, boolean blocking, WorkerExecutor pool) { this.endpoint = producer.getEndpoint(); this.producer = AsyncProcessorConverterHelper.convert(producer); this.outbound = outbound; this.blocking = blocking; this.vertx = vertx; this.pool = pool; }
/** * @return the worker thread worker to use to execute the processing. This option is only used if blocking is set to * {@code true}. If not set, it uses the the default worker worker. */ public WorkerExecutor getWorkerExecutor() { return worker; }
/** * Sets the worker thread worker used to execute the blocking processing. This option is only used if blocking is set to * {@code true}. If not set, it uses the the default worker worker. * * @param pool the worker worker on which the code is executed * @return the current instance of {@link OutboundMapping} */ public OutboundMapping setWorkerExecutor(WorkerExecutor pool) { this.worker = pool; return this; }
/** * Create a scheduler for a {@link io.vertx.core.WorkerExecutor} object, actions are executed on the threads of this executor. * * @param executor the worker executor object * @return the scheduler */ public static Scheduler blockingScheduler(WorkerExecutor executor) { return new ContextScheduler(executor, false); }