@Override public void start() throws Exception { NetServer netServer = vertx.createNetServer();//创建代理服务器 NetClient netClient = vertx.createNetClient();//创建连接mysql客户端 netServer.connectHandler(socket -> netClient.connect(port, mysqlHost, result -> { //响应来自客户端的连接请求,成功之后,在建立一个与目标mysql服务器的连接 if (result.succeeded()) { //与目标mysql服务器成功连接连接之后,创造一个MysqlProxyConnection对象,并执行代理方法 new MysqlProxyConnection(socket, result.result()).proxy(); } else { logger.error(result.cause().getMessage(), result.cause()); socket.close(); } })).listen(port, listenResult -> {//代理服务器的监听端口 if (listenResult.succeeded()) { //成功启动代理服务器 logger.info("Mysql proxy server start up."); } else { //启动代理服务器失败 logger.error("Mysql proxy exit. because: " + listenResult.cause().getMessage(), listenResult.cause()); System.exit(1); } }); }
private void registerTimerOut(final Session session, final NetClient client) { vertx.setPeriodic(config.getJsonObject("application").getInteger("tcp-session-timeout", 1200) * 1000, new Handler<Long>() { private int readCount = 0; private int writeCount = 0; @Override public void handle(Long timerID) { if ((session.getRead_count() <= readCount) && (session.getWrite_count() <= writeCount)) { session.setActive(false); if (client != null) { client.close(); } vertx.cancelTimer(timerID); } readCount = session.getRead_count(); writeCount = session.getWrite_count(); } }); }
@Override public void start(Handler<AsyncResult<Void>> startFuture) { if (port > 0) { // fail if port is already in use NetClientOptions options = new NetClientOptions().setConnectTimeout(200); NetClient c = vertx.createNetClient(options); c.connect(port, "localhost", res -> { if (res.succeeded()) { NetSocket socket = res.result(); socket.close(); startFuture.handle(Future.failedFuture("port " + port + " already in use")); } else { start2(startFuture); } }); } else { start2(startFuture); } }
private void waitPortToClose(Handler<AsyncResult<Void>> stopFuture, int iter) { if (port > 0) { // fail if port is already in use NetClientOptions options = new NetClientOptions().setConnectTimeout(50); NetClient c = vertx.createNetClient(options); c.connect(port, "localhost", res -> { if (res.succeeded()) { NetSocket socket = res.result(); socket.close(); if (iter > 0) { vertx.setTimer(100, x -> waitPortToClose(stopFuture, iter - 1)); } else { stopFuture.handle(Future.failedFuture("port " + port + " not shut down")); } } else { stopFuture.handle(Future.succeededFuture()); } }); } else { stopFuture.handle(Future.succeededFuture()); } }
@Test public void noConnectSent(TestContext context) { NetClient client = this.vertx.createNetClient(); Async async = context.async(); client.connect(MQTT_SERVER_PORT, MQTT_SERVER_HOST, done -> { if (done.succeeded()) { done.result().closeHandler(v -> { log.info("No CONNECT sent in " + MQTT_TIMEOUT_ON_CONNECT + " secs. Closing connection."); async.complete(); }); } else { context.fail(); } }); // check that the async is completed (so connection was closed by server) in // the specified timeout (+500 ms just for being sure) async.await(500 + MQTT_TIMEOUT_ON_CONNECT * 1000); if (!async.isCompleted()) context.fail(); }
@Override public void start(Future<Void> startFuture) { log.info("start", "initializationStarted"); MemcacheClusterConfig memcacheClusterConfig; try { memcacheClusterConfig = new MemcacheClusterConfig(config().getJsonObject(MEMCACHE_CLUSTER_KEY)); } catch (MemcacheException me) { log.error("start", "exception", me.getMessage()); startFuture.fail(new Exception(me.getMessage())); return; } NetClient netClient = vertx.createNetClient(); establishSockets(memcacheClusterConfig, netClient); log.info("start", "initializationCompleted"); startFuture.complete(); }
@Test public void testSendMessageWithReplyBacktrack(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertNotEquals("err", frame.getString("type")); context.assertEquals("Hello vert.x", frame.getJsonObject("body").getString("value")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("send", "hello", "#backtrack", new JsonObject().put("value", "vert.x"), socket); }); }
@Test public void testSendMessageWithDuplicateReplyID(TestContext context) { // replies must always return to the same origin NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); vertx.eventBus().consumer("third-party-receiver", msg -> context.fail()); final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("send", "hello", "third-party-receiver", new JsonObject().put("value", "vert.x"), socket); }); }
@Test public void testSendVoidMessage(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); vertx.eventBus().consumer("test", (Message<JsonObject> msg) -> { client.close(); async.complete(); }); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); FrameHelper.sendFrame("send", "test", new JsonObject().put("value", "vert.x"), socket); }); }
@Test public void testSendsFromOtherSideOfBridge(TestContext context) { NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertNotEquals("err", frame.getString("type")); context.assertEquals(true, frame.getBoolean("send")); context.assertEquals("hi", frame.getJsonObject("body").getString("value")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("register", "ping", null, socket); }); }
@Test public void testSendMessageWithReplyBacktrack(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertNotEquals("err", frame.getString("type")); context.assertEquals(true, frame.getBoolean("send")); context.assertEquals("Hello vert.x", frame.getJsonObject("body").getString("value")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("send", "hello", "#backtrack", new JsonObject().put("value", "vert.x"), socket); }); }
@Test public void testSendPing(TestContext context) { NetClient client = vertx.createNetClient(); final Async async = context.async(); eventHandler = event -> { if (event.type() == BridgeEventType.SOCKET_PING) { async.complete(); } }; client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> { socket.handler(buff -> { }); FrameHelper.sendFrame("register", "echo", null, socket); FrameHelper.sendFrame("ping", socket); })); }
@Override public Future<Result> check() { Future<Result> future = Future.future(); NetClientOptions options = new NetClientOptions().setConnectTimeout(500); NetClient client = vertx.createNetClient(options); client.connect(port, host, res -> { if (res.succeeded()) { future.complete(Result.healthy()); } else { future.complete(Result.unhealthy(res.cause())); } client.close(); }); return future; }
@Override public CompletableFuture<Result> check() { VertxCompletableFuture<Result> result = new VertxCompletableFuture<>(vertx); NetClientOptions options = new NetClientOptions().setConnectTimeout(500); NetClient client = vertx.createNetClient(options); client.connect(port, host, res -> { if (res.succeeded()) { result.complete(Result.healthy()); } else { result.complete(Result.unhealthy(res.cause())); } client.close(); }); return result; }
ProtonTransport(Connection connection, Vertx vertx, NetClient netClient, NetSocket socket, ProtonSaslAuthenticator authenticator, ProtonTransportOptions options) { this.connection = connection; this.vertx = vertx; this.netClient = netClient; this.socket = socket; transport.setMaxFrameSize(options.getMaxFrameSize() == 0 ? DEFAULT_MAX_FRAME_SIZE : options.getMaxFrameSize()); transport.setEmitFlowEventOnSend(false); // TODO: make configurable transport.setIdleTimeout(2 * options.getHeartbeat()); if (authenticator != null) { authenticator.init(this.socket, (ProtonConnection) this.connection.getContext(), transport); } this.authenticator = authenticator; transport.bind(connection); connection.collect(collector); socket.endHandler(this::handleSocketEnd); socket.handler(this::handleSocketBuffer); }
/** * Log plugin is rudimentary for now. It will return a single byte to indicate the Write is done * @param context */ @Test public void testCanWriteToLog(TestContext context) { final DeploymentOptions options = buildDeploymentOptions(); vertx.deployVerticle(Start.class.getName(), options, context.asyncAssertSuccess(deploymentID -> { final Async async = context.async(); final NetClient netClient = vertx.createNetClient(); netClient.connect(2500, "localhost", context.asyncAssertSuccess(socket -> { socket.handler(buffer -> { context.assertEquals((byte) 0x1, buffer.getByte(0)); context.assertEquals(Hex.encodeHexString("Hello Log".getBytes()), fakeLogHandler.lastRecord.getMessage()); async.complete(); }); socket.write("Hello Log"); })); vertx.setTimer(5000, event -> context.fail("timed out")); })); }
public CircularConnectionPool(NetClient client, SocketAddress address, int capacity) { this.current = new AtomicInteger(0); this.connections = new FdfsConnection[capacity]; for (int i=0; i<capacity; ++i) { this.connections[i] = new FdfsConnection(client, address); } this.capacity = connections.length; }
@Override public GremlinClient execute(String json, Handler<Buffer> handler) { NetClient netClient = vertx.createNetClient(); netClient.connect(port, host, res -> { if (res.succeeded()) { NetSocket socket = res.result(); socket.write(json).handler(handler); } }); return this; }
private void tryConnect(Handler<AsyncResult<Void>> startFuture, int count) { NetClientOptions options = new NetClientOptions().setConnectTimeout(MILLISECONDS); NetClient c = vertx.createNetClient(options); c.connect(port, "localhost", res -> { if (res.succeeded()) { logger.info("Connected to service at port " + port + " count " + count); NetSocket socket = res.result(); socket.close(); try { p.getErrorStream().close(); } catch (Exception e) { logger.error("Closing streams failed: " + e); } startFuture.handle(Future.succeededFuture()); } else if (!p.isAlive() && p.exitValue() != 0) { logger.warn("Service returned with exit code " + p.exitValue()); startFuture.handle(Future.failedFuture("Service returned with exit code " + p.exitValue())); } else if (count < maxIterations) { vertx.setTimer((long) (count + 1) * MILLISECONDS, id -> tryConnect(startFuture, count + 1)); } else { startFuture.handle(Future.failedFuture("Deployment failed. " + "Could not connect to port " + port + ": " + res.cause().getMessage())); } }); }
@Override public void start(Future<Void> startFuture) { log.info("start", "initializationStarted"); RedisConfig redisConfig; JsonObject redisConfigObj = vertx.getOrCreateContext().config().getJsonObject(REDIS_KEY); if (redisConfigObj == null) { log.error("start", "exception", "No Redis config found."); startFuture.fail(new Exception("No Redis config found.")); return; } else { try { redisConfig = new RedisConfig(redisConfigObj); } catch (Exception ex) { log.error("start", "exception", "Invalid Redis config defined"); startFuture.fail(new Exception("Invalid Redis config defined")); return; } } NetClient netClient = vertx.createNetClient(); establishSockets(redisConfig, netClient); log.info("start", "initializationCompleted"); startFuture.complete(null); }
public RedisSocketHandler( Vertx vertx, String eventBusAddress, String host, int port, NetClient netClient, long delayFactor) { this.vertx = vertx; this.eventBusAddress = eventBusAddress; this.host = host; this.port = port; this.netClient = netClient; this.delayFactor = delayFactor; this.currentDelay = delayFactor; }
private void connectToRemote(String addr, int port) { // 5s timeout. NetClientOptions options = new NetClientOptions().setConnectTimeout(5000); NetClient client = mVertx.createNetClient(options); client.connect(port, addr, res -> { // connect handler if (!res.succeeded()) { log.error("Failed to connect " + addr + ":" + port + ". Caused by " + res.cause().getMessage()); destory(); return; } mTargetSocket = res.result(); setFinishHandler(mTargetSocket); mTargetSocket.handler(buffer -> { // remote socket data handler try { byte [] data = buffer.getBytes(); byte [] encryptData = mCrypto.encrypt(data, data.length); flowControl(mClientSocket, mTargetSocket); mClientSocket.write(Buffer.buffer(encryptData)); }catch(CryptoException e){ log.error("Catch exception", e); destory(); } }); if (mBufferQueue.length() > 0) { handleStageData(); } }); }
/** * This method opens the connection to the Memcache server and registers the message handler on * success. If the connection fails or is closed, it unregisters the handler and attempts to * reconnect. * * @param memcacheClusterConfig - The configurations for the connection to Memcache * @param netClient - The client for connecting to Memcache. */ private void establishSockets(final MemcacheClusterConfig memcacheClusterConfig, final NetClient netClient) { for (String server : memcacheClusterConfig.getServers()) { final String eventBusAddress = memcacheClusterConfig.getEventBusAddressPrefix() + "_" + server; final MemcacheServer memcacheServer = new MemcacheServer(server); MemcacheSocketHandler handler = new MemcacheSocketHandler(vertx, eventBusAddress, memcacheServer, netClient, memcacheClusterConfig.getRetryInterval()); handler.handle(System.currentTimeMillis()); } }
public MemcacheSocketHandler(Vertx vertx, String eventBusAddress, MemcacheServer server, NetClient netClient, long delayFactor) { this.vertx = vertx; this.eventBusAddress = eventBusAddress; this.server = server; this.netClient = netClient; this.delayFactor = delayFactor; this.currentDelay = delayFactor; }
@Override public void start(Future<Void> startFuture) { log.info("start", "initializationStarted"); MemcacheConfig memcacheConfig; try { memcacheConfig = new MemcacheConfig(config().getJsonObject(MEMCACHE_KEY)); } catch (MemcacheException me) { log.error("start", "exception", me.getMessage()); startFuture.fail(new Exception(me.getMessage())); return; } try { registerDefaultCodec(MemcacheCommand.class, new MemcacheCommandCodec()); registerDefaultCodec(MemcacheCommandResponse.class, new MemcacheCommandResponseCodec()); registerDefaultCodec(DeleteCommandResponse.class, new DeleteCommandResponseCodec()); registerDefaultCodec(ModifyCommandResponse.class, new ModifyCommandResponseCodec()); registerDefaultCodec(RetrieveCommandResponse.class, new RetrieveCommandResponseCodec()); registerDefaultCodec(StoreCommandResponse.class, new StoreCommandResponseCodec()); registerDefaultCodec(TouchCommandResponse.class, new TouchCommandResponseCodec()); } catch (Exception ex) { log.error("start", "exception", ex.getMessage()); startFuture.fail(new Exception(ex.getMessage())); return; } NetClient netClient = vertx.createNetClient(); establishSockets(memcacheConfig, netClient); log.info("start", "initializationCompleted"); startFuture.complete(); }
/** * This method opens the connection to the Memcache server and registers the message handler on * success. If the connection fails or is closed, it unregisters the handler and attempts to * reconnect. * * @param memcacheConfig - The configuration for the connection to Memcache * @param netClient - The client for connecting to Memcache. */ private void establishSockets(final MemcacheConfig memcacheConfig, final NetClient netClient) { for (String server : memcacheConfig.getServers()) { final String eventBusAddress = memcacheConfig.getEventBusAddress() + "_" + server; final MemcacheServer memcacheServer = new MemcacheServer(server); MemcacheSocketHandler handler = new MemcacheSocketHandler(vertx, eventBusAddress, memcacheServer, netClient, memcacheConfig.getRetryInterval()); handler.handle(System.currentTimeMillis()); } }
@Test public void testRegister(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); // 1 reply will arrive // MESSAGE for echo final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertNotEquals("err", frame.getString("type")); context.assertEquals("Vert.x", frame.getJsonObject("body").getString("value")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("register", "echo", null, socket); // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this // remote consumer FrameHelper.sendFrame("publish", "echo", new JsonObject().put("value", "Vert.x"), socket); }); }
@Test public void testNoHandlers(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertEquals("err", frame.getString("type")); context.assertEquals("#backtrack", frame.getString("address")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("send", "test", "#backtrack", new JsonObject().put("value", "vert.x"), socket); }); }
@Test public void testErrorReply(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); vertx.eventBus().consumer("test", (Message<JsonObject> msg) -> { msg.fail(0, "oops!"); }); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertEquals("err", frame.getString("type")); context.assertEquals("#backtrack", frame.getString("address")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("send", "test", "#backtrack", new JsonObject().put("value", "vert.x"), socket); }); }
@Test public void testRegister(TestContext context) { // Send a request and get a response NetClient client = vertx.createNetClient(); final Async async = context.async(); client.connect(7000, "localhost", conn -> { context.assertFalse(conn.failed()); NetSocket socket = conn.result(); // 1 reply will arrive // MESSAGE for echo final FrameParser parser = new FrameParser(parse -> { context.assertTrue(parse.succeeded()); JsonObject frame = parse.result(); context.assertNotEquals("err", frame.getString("type")); context.assertEquals(false, frame.getBoolean("send")); context.assertEquals("Vert.x", frame.getJsonObject("body").getString("value")); client.close(); async.complete(); }); socket.handler(parser); FrameHelper.sendFrame("register", "echo", null, socket); // now try to publish a message so it gets delivered both to the consumer registred on the startup and to this // remote consumer FrameHelper.sendFrame("publish", "echo", new JsonObject().put("value", "Vert.x"), socket); }); }
SMTPConnection(Vertx vertx, NetClient client, ConnectionLifeCycleListener listener) { broken = true; idle = false; doShutdown = false; socketClosed = false; socketShutDown = false; this.client = client; this.listener = listener; this.vertx = vertx; }
public void example7(Vertx vertx, NetClient netClient) { StompClient client = StompClient.create(vertx) .connect(netClient, ar -> { if (ar.succeeded()) { StompClientConnection connection = ar.result(); connection.errorHandler(frame -> System.out.println("ERROR frame received : " + frame)); } else { System.out.println("Failed to connect to the STOMP server: " + ar.cause().toString()); } }); }
/** * Creates a new instance of <code>DownloadStream</code>. * * @param pClient the communication client * @param pStream the wrapped stream */ private DownloadStream(NetClient pClient, InputStream pStream) { super(pStream); client = pClient; }
public MyReceiver(String host, int port, String dest) throws MessagingException { NetClient netClient = Vertx.factory.vertx().createNetClient(new NetClientOptions()); DefaultConnectionSettings settings = new DefaultConnectionSettings(); settings.setHost(host); settings.setPort(port); connection = new ManagedConnection(settings, this, false); netClient.connect(settings.getPort(), settings.getHost(), result -> { if (result.succeeded()) { connection.setNetSocket(result.result()); connection.write(); connection.addDisconnectHandler(c -> { LOG.warn("Connection lost to peer at %s:%s", connection.getSettings().getHost(), connection .getSettings().getPort()); }); LOG.info("Connected to AMQP peer at %s:%s", connection.getSettings().getHost(), connection .getSettings().getPort()); } else { LOG.warn("Error {%s}, when connecting to AMQP peer at %s:%s", result.cause(), connection.getSettings() .getHost(), connection.getSettings().getPort()); } }); link = connection.createInboundLink(dest, ReliabilityMode.AT_LEAST_ONCE, CreditMode.AUTO); link.setCredits(10); }
public MySender(String host, int port, String destination) throws MessagingException { NetClient netClient = Vertx.factory.vertx().createNetClient(new NetClientOptions()); DefaultConnectionSettings settings = new DefaultConnectionSettings(); settings.setHost(host); settings.setPort(port); connection = new ManagedConnection(settings, this, false); netClient.connect(settings.getPort(), settings.getHost(), result -> { if (result.succeeded()) { connection.setNetSocket(result.result()); connection.write(); connection.addDisconnectHandler(c -> { LOG.warn("Connection lost to peer at %s:%s", connection.getSettings().getHost(), connection .getSettings().getPort()); }); LOG.info("Connected to AMQP peer at %s:%s", connection.getSettings().getHost(), connection .getSettings().getPort()); } else { LOG.warn("Error {%s}, when connecting to AMQP peer at %s:%s", result.cause(), connection.getSettings() .getHost(), connection.getSettings().getPort()); } }); link = connection.createOutboundLink(destination, ReliabilityMode.AT_LEAST_ONCE); }
private void connectNetClient(NetClient netClient, String host, int port, String username, String password, ConnectCompletionHandler connectHandler, ProtonClientOptions options) { String serverName = options.getSniServerName() != null ? options.getSniServerName() : (options.getVirtualHost() != null ? options.getVirtualHost() : null); netClient.connect(port, host, serverName, res -> { if (res.succeeded()) { String virtualHost = options.getVirtualHost() != null ? options.getVirtualHost() : host; ProtonConnectionImpl conn = new ProtonConnectionImpl(vertx, virtualHost); conn.disconnectHandler(h -> { LOG.trace("Connection disconnected"); if(!connectHandler.isComplete()) { connectHandler.handle(Future.failedFuture(new VertxException("Disconnected"))); } }); ProtonSaslClientAuthenticatorImpl authenticator = new ProtonSaslClientAuthenticatorImpl(username, password, options.getEnabledSaslMechanisms(), connectHandler); ProtonTransportOptions transportOptions = new ProtonTransportOptions(); transportOptions.setHeartbeat(options.getHeartbeat()); transportOptions.setMaxFrameSize(options.getMaxFrameSize()); conn.bindClient(netClient, res.result(), authenticator, transportOptions); // Need to flush here to get the SASL process going, or it will wait until calls on the connection are processed // later (e.g open()). conn.flush(); } else { connectHandler.handle(Future.failedFuture(res.cause())); } }); }
protected NetClient createNetClient(NetClientOptions options) { NetClient client = vertx.createNetClient(options); toClose.add(() -> { CountDownLatch latch = new CountDownLatch(1); client.close(); awaitLatch(latch); return null; }); return client; }
@Test public void testNetMetricsOnClose() throws Exception { int requests = 8; CountDownLatch latch = new CountDownLatch(requests); NetClient client = vertx.createNetClient(new NetClientOptions()); NetServer server = vertx.createNetServer(new NetServerOptions().setHost("localhost").setPort(1235).setReceiveBufferSize(50)).connectHandler(socket -> { socket.handler(buff -> latch.countDown()); }).listen(ar -> { assertTrue(ar.succeeded()); client.connect(1235, "localhost", ar2 -> { assertTrue(ar2.succeeded()); for (int i = 0; i < requests; i++) { ar2.result().write(randomBuffer(50)); } }); }); awaitLatch(latch); client.close(); server.close(ar -> { assertTrue(ar.succeeded()); vertx.runOnContext(v -> testComplete()); }); await(); JsonObject metrics = metricsService.getMetricsSnapshot(server); assertNotNull(metrics); assertTrue(metrics.isEmpty()); metrics = metricsService.getMetricsSnapshot(client); assertNotNull(metrics); assertTrue(metrics.isEmpty()); cleanup(client); cleanup(server); }
@Test public void testMetricsCleanupedOnVertxClose() throws Exception { CountDownLatch latch1 = new CountDownLatch(1); HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080)); server.requestHandler(req -> {}); server.listen(onSuccess(res -> { latch1.countDown(); })); awaitLatch(latch1); HttpClient client = vertx.createHttpClient(new HttpClientOptions()); CountDownLatch latch2 = new CountDownLatch(1); NetServer nServer = vertx.createNetServer(new NetServerOptions().setPort(1234)); nServer.connectHandler(conn -> {}); nServer.listen(res -> { latch2.countDown(); }); awaitLatch(latch2); NetClient nClient = vertx.createNetClient(new NetClientOptions()); DatagramSocket sock = vertx.createDatagramSocket(new DatagramSocketOptions()); EventBus eb = vertx.eventBus(); assertFalse(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(server).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(client).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(sock).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(eb).isEmpty()); vertx.close(res -> { assertTrue(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(server).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(client).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(sock).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(eb).isEmpty()); testComplete(); }); await(); vertx = null; }