@Override public void listen(String ipAddress, int port) { socket = Vertx.vertx().createDatagramSocket(new DatagramSocketOptions()); socket.listen(port, ipAddress, asyncResult -> { if (asyncResult.succeeded()) { socket.handler(packet -> handleMsg(packet.data())); } else { LOGGER.error("Listen failed " + asyncResult.cause()); } }); }
/** * @param vertx vertx instance to create the socket from * @param context of execution to run operations that need vertx initialized * @param options Statful options to configure host and port */ public UDPSender(final Vertx vertx, final Context context, final StatfulMetricsOptions options) { super(options, new Sampler(options, new Random())); this.options = options; // the following code is being executed asynchronously on the same context, to make sure that vertx is properly initialized // so that we can open a socket and configure a interval context.runOnContext(aVoid -> { this.socket = vertx.createDatagramSocket(new DatagramSocketOptions()); this.configureFlushInterval(vertx, this.options.getFlushInterval()); }); }
@Test public void testMetricsCleanupedOnVertxClose() throws Exception { CountDownLatch latch1 = new CountDownLatch(1); HttpServer server = vertx.createHttpServer(new HttpServerOptions().setPort(8080)); server.requestHandler(req -> {}); server.listen(onSuccess(res -> { latch1.countDown(); })); awaitLatch(latch1); HttpClient client = vertx.createHttpClient(new HttpClientOptions()); CountDownLatch latch2 = new CountDownLatch(1); NetServer nServer = vertx.createNetServer(new NetServerOptions().setPort(1234)); nServer.connectHandler(conn -> {}); nServer.listen(res -> { latch2.countDown(); }); awaitLatch(latch2); NetClient nClient = vertx.createNetClient(new NetClientOptions()); DatagramSocket sock = vertx.createDatagramSocket(new DatagramSocketOptions()); EventBus eb = vertx.eventBus(); assertFalse(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(server).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(client).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(sock).isEmpty()); assertFalse(metricsService.getMetricsSnapshot(eb).isEmpty()); vertx.close(res -> { assertTrue(metricsService.getMetricsSnapshot(vertx).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(server).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(client).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nServer).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(nClient).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(sock).isEmpty()); assertTrue(metricsService.getMetricsSnapshot(eb).isEmpty()); testComplete(); }); await(); vertx = null; }
@Override default DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return DummyVertxMetrics.DummyDatagramMetrics.INSTANCE; }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return createSubMetrics(v -> v.createMetrics(socket, options), d -> new DispatchingDatagramSocketMetrics(d)); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return new DatagramSocketMetricsImpl(counterService, gaugeService, properties.getDatagramSocket()); }
@Override public @NotNull DatagramSocketMetrics createMetrics(@NotNull DatagramSocket socket, @NotNull DatagramSocketOptions datagramSocketOptions) { return options.isEnabled(DatagramSocket) ? new DatagramSocketPrometheusMetrics(options.getRegistry()) : super.createMetrics(socket, datagramSocketOptions); }
@Override public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { return vertx.createDatagramSocket(options); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return new DatagramSocketMetricsImpl(defaultLabels, !this.options.isMetricsTypeDisabled(MetricsType.DATAGRAM_SOCKET)); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { DatagramSocketMetricsSupplier supplier = (DatagramSocketMetricsSupplier) metricSuppliers.get(DATAGRAM_SOCKET); return supplier != null ? new DatagramSocketMetricsImpl(supplier) : super.createMetrics(socket, options); }
@Override public DatagramSocketMetrics createMetrics(DatagramSocket socket, DatagramSocketOptions options) { return new DatagramSocketMetricsImpl(this, nameOf("datagram")); }
@Test public void testDatagramMetrics() throws Exception { Buffer clientMax = randomBuffer(1823); Buffer clientMin = randomBuffer(123); AtomicBoolean complete = new AtomicBoolean(false); DatagramSocket datagramSocket = vertx.createDatagramSocket(new DatagramSocketOptions()).listen(1236, "localhost", ar -> { assertTrue(ar.succeeded()); DatagramSocket socket = ar.result(); socket.handler(packet -> { if (complete.getAndSet(true)) { testComplete(); } }); socket.send(clientMin, 1236, "localhost", ds -> { assertTrue(ar.succeeded()); }); socket.send(clientMax, 1236, "localhost", ds -> { assertTrue(ar.succeeded()); }); }); await(); // Test sender/client (bytes-written) JsonObject metrics = metricsService.getMetricsSnapshot(datagramSocket); assertCount(metrics.getJsonObject("bytes-written"), 2L); assertMinMax(metrics.getJsonObject("bytes-written"), (long) clientMin.length(), (long) clientMax.length()); // Test server (bytes-read) assertCount(metrics.getJsonObject("localhost:1236.bytes-read"), 2L); assertMinMax(metrics.getJsonObject("localhost:1236.bytes-read"), (long) clientMin.length(), (long) clientMax.length()); CountDownLatch latch = new CountDownLatch(1); datagramSocket.close(ar -> { assertTrue(ar.succeeded()); latch.countDown(); }); awaitLatch(latch); assertWaitUntil(() -> metricsService.getMetricsSnapshot(datagramSocket).isEmpty()); }