@Test public void simpleConnectAndWrite() throws InterruptedException { getClient().websocket(8080, HOST, SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectAndWrite:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleConnectAndAsyncWrite() throws InterruptedException { getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/asyncReply", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectAndAsyncWrite:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndAsyncWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReply() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/wsEndpintTwo", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReply:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/wsEndpintThree", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAll:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll_1() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAll_1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleConnectAndAsyncWrite() throws InterruptedException { getClient().websocket(8080, HOST, SERVICE_REST_GET + "/asyncReply", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectAndAsyncWrite:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndAsyncWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReply() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, HOST, SERVICE_REST_GET + "/wsEndpintTwo", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReply:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, HOST, SERVICE_REST_GET + "/wsEndpintThree", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAll:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll_1() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, HOST, SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAll_1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleConnectAndWrite() throws InterruptedException { getClient().websocket(PORT, HOST, SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectAndWrite:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleConnectAndAsyncWrite() throws InterruptedException { getClient().websocket(PORT, HOST, SERVICE_REST_GET + "/asyncReply", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectAndAsyncWrite:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndAsyncWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReply() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(PORT, HOST, SERVICE_REST_GET + "/wsEndpintTwo", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReply:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(PORT, HOST, SERVICE_REST_GET + "/wsEndpintThree", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAll:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll_1() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(PORT, HOST, SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAll_1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleConnectAndWrite() throws InterruptedException { getClient().websocket(8080, "localhost", "/service-REST-GET/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data handler 1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleConnectAndAsyncWrite() throws InterruptedException { getClient().websocket(8080, "localhost", "/service-REST-GET/asyncReply", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data handler 1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); long endTime = System.currentTimeMillis(); System.out.println("Total execution time simpleConnectAndAsyncWrite: " + (endTime - startTime) + "ms"); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReply() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, "localhost", "/service-REST-GET/wsEndpintTwo", ws -> { ws.handler((data) -> { System.out.println("client data handler 4:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, "localhost", "/service-REST-GET/wsEndpintThree", ws -> { ws.handler((data) -> { System.out.println("client data handler 4:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); if (counter.incrementAndGet() == MAX_RESPONSE_ELEMENTS) { ws.close(); testComplete(); } }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Test public void simpleMutilpeReplyToAll_1() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); getClient().websocket(8080, "localhost", "/service-REST-GET/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client data handler 4:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); testComplete(); }); ws.writeFrame(new WebSocketFrameImpl("xhello")); }); await(); }
@Override public void start() throws Exception { //TODO: Fix a better way of configuration other than system properties? Integer port = Integer.getInteger("websocket.port", 5556); ObservableFuture<HttpServer> httpServerObservable = RxHelper.observableFuture(); HttpServer httpServer = vertx.createHttpServer(new HttpServerOptions().setPort(port)); httpServerObservable.subscribe( a -> log.info("Starting web socket listener..."), e -> log.error("Could not start web socket listener at port " + port, e), () -> log.info("Started web socket listener on port " + port) ); Observable<Tup2<ServerWebSocket, Func1<Event, Boolean>>> eventObservable = EventObservable.convertFromWebSocketObservable(RxHelper.toObservable(httpServer.websocketStream())); eventObservable.subscribe(new EventToJsonAction(Riemann.getEvents(vertx), WebSocketFrameImpl::new), e -> { log.error(e); //TODO: Fix proper error handling }); httpServer.listen(httpServerObservable.asHandler()); }
private void sendHeartbeat(WebSocket ws) { try { ws.writeFrame(new WebSocketFrameImpl(FrameType.PING)); } catch (IllegalStateException e) { LOGGER.error("heartbeat fail", e); } }
@Test public void simpleConnectOnTwoThreads() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(2); CountDownLatch latchMain = new CountDownLatch(2); Runnable r = () -> { getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectOnTwoThreads:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); long endTime = System.currentTimeMillis(); System.out.println("round trip time simpleConnectOnTwoThreads: " + (endTime - startTime) + "ms"); }); ws.writeFrame(new WebSocketFrameImpl("yhello")); }); }; s.submit(r); s.submit(r); latchMain.await(); }
@Test public void simpleConnectOnTwoThreads() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(2); CountDownLatch latchMain = new CountDownLatch(2); Runnable r = () -> { getClient().websocket(8080, HOST, SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectOnTwoThreads:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); long endTime = System.currentTimeMillis(); System.out.println("round trip time simpleConnectOnTwoThreads: " + (endTime - startTime) + "ms"); }); ws.writeFrame(new WebSocketFrameImpl("yhello")); }); }; s.submit(r); s.submit(r); latchMain.await(); }
@Test public void simpleConnectOnTwoThreads() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(2); CountDownLatch latchMain = new CountDownLatch(2); Runnable r = () -> { getClient().websocket(PORT, HOST, SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectOnTwoThreads:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); long endTime = System.currentTimeMillis(); System.out.println("round trip time simpleConnectOnTwoThreads: " + (endTime - startTime) + "ms"); }); ws.writeFrame(new WebSocketFrameImpl("yhello")); }); }; s.submit(r); s.submit(r); latchMain.await(); }
@Test public void simpleConnectOnTwoThreads() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(2); CountDownLatch latchMain = new CountDownLatch(2); Runnable r = () -> { getClient().websocket(8080, "localhost", "/service-REST-GET/hello", ws -> { ws.handler((data) -> { System.out.println("client data handler 2:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); }); ws.writeFrame(new WebSocketFrameImpl("yhello")); }); }; s.submit(r); s.submit(r); latchMain.await(); s.awaitTermination(6000, TimeUnit.MILLISECONDS); }
@Test public void simpleConnectOnTenThreads() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(10); CountDownLatch latchMain = new CountDownLatch(10); Runnable r = () -> { getClient().websocket(8080, "localhost", "/service-REST-GET/hello", ws -> { ws.handler((data) -> { System.out.println("client data handler 3:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); }); ws.writeFrame(new WebSocketFrameImpl("zhello")); }); }; s.submit(r); s.submit(r); s.submit(r); s.submit(r); s.submit(r); s.submit(r); s.submit(r); s.submit(r); s.submit(r); s.submit(r); latchMain.await(); s.awaitTermination(5000, TimeUnit.MILLISECONDS); }
@Test public void testEventAction() { WebSocketFrameImpl frame = new WebSocketFrameImpl(); ServerWebSocket socketMock = mock(ServerWebSocket.class); Event event = new Event("host", "service", "state", "desc", Arrays.asList("blaha"), null, 1, 1.0F, 1.0D); new EventToJsonAction(Observable.just(event), s -> { assertEquals(s, "{\"tags\":[\"blaha\"],\"host\":\"host\",\"state\":\"state\",\"service\":\"service\",\"description\":\"desc\",\"metric\":1.0,\"time\":1,\"ttl\":1.0}"); return frame; }).call(Tup2.create(socketMock, e -> true)); verify(socketMock).writeFrame(frame); }
@Test public void testCombineTextFrameSockJs() throws InterruptedException { String serverPath = "/text-combine-sockjs"; setupSockJsServer(serverPath, this::echoRequest); List<Buffer> receivedMessages = new ArrayList<>(); WebSocket openedWebSocket = setupSockJsClient(serverPath, receivedMessages); Buffer largeMessage = Buffer.buffer("[\"" + TestUtils.randomAlphaString(30) + "\"]"); WebSocketFrame frame1 = new WebSocketFrameImpl(FrameType.TEXT, largeMessage.slice(0, 10).getByteBuf(), false); WebSocketFrame frame2 = WebSocketFrame.continuationFrame(largeMessage.slice(10, 20), false); WebSocketFrame frame3 = WebSocketFrame.continuationFrame(largeMessage.slice(20, largeMessage.length()), true); log.debug("Client sending " + frame1.textData()); openedWebSocket.writeFrame(frame1); log.debug("Client sending " + frame2.textData()); openedWebSocket.writeFrame(frame2); log.debug("Client sending " + frame3.textData()); openedWebSocket.writeFrame(frame3); await(5, TimeUnit.SECONDS); assertEquals("Client should have received 2 messages: the reply and the close.", 2, receivedMessages.size()); Buffer expectedReply = Buffer.buffer("a" + largeMessage.toString()); assertEquals("Client reply should have matched request", expectedReply, receivedMessages.get(0)); assertEquals("Final message should have been a close", SOCKJS_CLOSE_REPLY, receivedMessages.get(1)); }
@Test public void simpleConnectOnTenThreads() throws InterruptedException { int counter =10; ExecutorService s = Executors.newFixedThreadPool(counter); CountDownLatch latchMain = new CountDownLatch(counter); for(int i =0; i<=counter; i++) { Runnable r = () -> { getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectOnTenThreads:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); long endTime = System.currentTimeMillis(); System.out.println("round trip time simpleConnectOnTenThreads: " + (endTime - startTime) + "ms"); }); ws.writeFrame(new WebSocketFrameImpl("zhello")); }); }; s.submit(r); } latchMain.await(); }
@Test public void simpleMutilpeReplyToAllThreaded() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(10); final CountDownLatch latch = new CountDownLatch(2); getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAllThreaded:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); latch.countDown(); ws.close(); }); }); getClient().websocket(8080, "localhost", SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client datasimpleMutilpeReplyToAllThreaded 5.1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); latch.countDown(); ws.close(); }); ws.writeFrame(new WebSocketFrameImpl("xhello simpleMutilpeReplyToAllThreaded")); }); latch.await(); }
@Test public void simpleConnectOnTenThreads() throws InterruptedException { int counter =100; ExecutorService s = Executors.newFixedThreadPool(counter); CountDownLatch latchMain = new CountDownLatch(counter); for(int i =0; i<=counter; i++) { Runnable r = () -> { getClient().websocket(8080, HOST, SERVICE_REST_GET + "/hello", ws -> { long startTime = System.currentTimeMillis(); ws.handler((data) -> { System.out.println("client data simpleConnectOnTenThreads:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); ws.close(); latchMain.countDown(); long endTime = System.currentTimeMillis(); System.out.println("round trip time simpleConnectOnTenThreads: " + (endTime - startTime) + "ms"); }); ws.writeFrame(new WebSocketFrameImpl("zhello")); }); }; s.submit(r); } latchMain.await(); }
@Test public void simpleMutilpeReplyToAllThreaded() throws InterruptedException { ExecutorService s = Executors.newFixedThreadPool(10); final CountDownLatch latch = new CountDownLatch(2); getClient().websocket(8080, HOST, SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client data simpleMutilpeReplyToAllThreaded:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); latch.countDown(); ws.close(); }); }); getClient().websocket(8080, HOST, SERVICE_REST_GET + "/wsEndpintFour", ws -> { ws.handler((data) -> { System.out.println("client datasimpleMutilpeReplyToAllThreaded 5.1:" + new String(data.getBytes())); assertNotNull(data.getString(0, data.length())); latch.countDown(); ws.close(); }); ws.writeFrame(new WebSocketFrameImpl("xhello simpleMutilpeReplyToAllThreaded")); }); latch.await(); }