public UdpServer<DatagramPacket, DatagramPacket> createServer() { UdpServer<DatagramPacket, DatagramPacket> server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() { @Override public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) { return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() { @Override public Observable<Void> call(final DatagramPacket received) { return Observable.interval(delay, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() { @Override public Observable<Void> call(Long aLong) { InetSocketAddress sender = received.sender(); System.out.println("Received datagram. Sender: " + sender); ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length); data.writeBytes(WELCOME_MSG_BYTES); return newConnection.writeAndFlush(new DatagramPacket(data, sender)); } }); } }); } }); System.out.println("UDP hello server started at port: " + port); return server; }
public Observable<DatagramPacket> submit(final String content) { return LoadBalancerCommand.<DatagramPacket>builder() .withLoadBalancerContext(lbContext) .build() .submit(new ServerOperation<DatagramPacket>() { @Override public Observable<DatagramPacket> call(Server server) { RxClient<DatagramPacket, DatagramPacket> rxClient = getOrCreateRxClient(server); return rxClient.connect().flatMap(new Func1<ObservableConnection<DatagramPacket, DatagramPacket>, Observable<? extends DatagramPacket>>() { @Override public Observable<? extends DatagramPacket> call(ObservableConnection<DatagramPacket, DatagramPacket> connection) { connection.writeStringAndFlush(content); return connection.getInput().timeout(10, TimeUnit.MILLISECONDS).take(1); } }); } }); }
public static void main(String[] args) { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() { @Override public Observable<?> call(ObservableConnection<String, String> connection) { return connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { return msg.trim(); } }); } }).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext event => " + o); } }); }
public static void main(final String[] args) { final int port = 8181; RxNetty.createTcpServer(port, PipelineConfigurators.textOnlyConfigurator(), new ConnectionHandler<String, String>() { @Override public Observable<Void> handle( final ObservableConnection<String, String> connection) { System.out.println("New client connection established."); connection.writeAndFlush("Welcome! \n\n"); return connection.getInput().flatMap(new Func1<String, Observable<Void>>() { @Override public Observable<Void> call(String msg) { System.out.println("onNext: " + msg); msg = msg.trim(); if (!msg.isEmpty()) { return connection.writeAndFlush("echo => " + msg + '\n'); } else { return COMPLETED_OBSERVABLE; } } }); } }).startAndWait(); }
public static void main(String[] args) { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() { @Override public Observable<?> call(ObservableConnection<String, String> connection) { return connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return msg.trim(); } }); } }).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext event => " + o); } }); }
protected Observable<HttpClientResponse<O>> submit(final HttpClientRequest<I> request, final Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable, final ClientConfig config) { enrichRequest(request, config); // Here we do not map the connection Observable and return because the onComplete() of connectionObservable, // does not indicate onComplete of the request processing. return Observable.create(new Observable.OnSubscribe<HttpClientResponse<O>>() { @Override public void call(final Subscriber<? super HttpClientResponse<O>> subscriber) { final Subscription connectSubscription = connectionObservable.subscribe(new ConnectObserver<I, O>(request, subscriber)); subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { //TODO: Cancel write & if the response is not over, disconnect the channel. connectSubscription.unsubscribe(); } })); } }); }
@Override public void onNext(ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>> connection) { ClientRequestResponseConverter converter = connection.getChannelHandlerContext().pipeline().get(ClientRequestResponseConverter.class); if (null != converter) { converter.setRequestProcessingObserver(requestProcessingObserver); } connection.getInput().subscribe(requestProcessingObserver); connection.writeAndFlush(request).doOnError(new Action1<Throwable>() { @Override public void call(Throwable throwable) { // If the write fails, the response should get the error. Completion & onNext are managed by // the response observable itself. requestProcessingObserver.onError(throwable); } }); }
@Test public void testGovernatedTcpServer() throws Exception { String message = RxNetty.createTcpClient("localhost", server.getServerPort()).connect() .flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<String>>() { @Override public Observable<String> call(ObservableConnection<ByteBuf, ByteBuf> connection) { return connection.getInput().map(new Func1<ByteBuf, String>() { @Override public String call(ByteBuf byteBuf) { return byteBuf.toString(Charset.defaultCharset()); } }); } }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS); assertEquals("Invalid message received from server", SERVER_MESSAGE, message); }
@Test public void testGovernatedTcpServer() throws Exception { String message = RxNetty.<TextWebSocketFrame, TextWebSocketFrame>newWebSocketClientBuilder("localhost", server.getServerPort()) .build() .connect() .flatMap(new Func1<ObservableConnection<TextWebSocketFrame, TextWebSocketFrame>, Observable<String>>() { @Override public Observable<String> call(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) { return connection.getInput().map(new Func1<TextWebSocketFrame, String>() { @Override public String call(TextWebSocketFrame frame) { return frame.text(); } }); } }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS); assertEquals("Invalid message received from server", SERVER_MESSAGE, message); }
@Override public Observable<Void> handle(final ObservableConnection<String, String> conn) { return conn.getInput().take(1) /*Take only one command per connection*/ .doOnNext(new Action1<String>() { @Override public void call(String s) { logger.info("Received a command: " + s); } }) .flatMap(commandHandler) .doOnCompleted(new Action0() { @Override public void call() { try { shutdown(); } catch (InterruptedException e) { logger.error("Interrupted while shutting down the shutdown command listener."); } } }); }
@Override public Observable<Void> handle(final ObservableConnection<ByteBuf, ByteBuf> connection) { System.out.println("New frontend connection"); return connection.getInput().flatMap(new Func1<ByteBuf, Observable<Void>>() { @Override public Observable<Void> call(ByteBuf byteBuf) { String message = byteBuf.toString(Charset.defaultCharset()); System.out.println("Received: " + message); queueProvider.put(message); ByteBuf output = connection.getAllocator().buffer(); output.writeBytes("Want some more:\n".getBytes()); return connection.writeAndFlush(output); } }); }
@Override public Observable<Void> handle(final ObservableConnection<ByteBuf, ByteBuf> connection) { System.out.println("New backend connection"); return Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Void>>() { @Override public Observable<Void> call(Long tick) { while (!queueProvider.isEmpty()) { ByteBuf output = connection.getAllocator().buffer(); output.writeBytes(queueProvider.poll().getBytes()); connection.write(output); } return connection.flush(); } }); }
@OnClick(R.id.client_button) void startClient() { Observable<ObservableConnection<String, String>> connectionObservable = RxNetty.createTcpClient("localhost", PORT, PipelineConfigurators.textOnlyConfigurator()).connect(); connectionObservable.flatMap(connection -> { Observable<String> helloMessage = connection.getInput() .take(1).map(String::trim); // output 10 values at intervals and receive the echo back Observable<String> intervalOutput = Observable.interval(500, TimeUnit.MILLISECONDS) .flatMap(aLong -> connection.writeAndFlush(String.valueOf(aLong + 1)) .map(aVoid -> "")); // capture the output from the server Observable<String> echo = connection.getInput().map(String::trim); // wait for the helloMessage then start the output and receive echo input return Observable.concat(helloMessage, Observable.merge(intervalOutput, echo)); }) .take(10) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Object>() { @Override public void onCompleted() { Log.d(TAG, "Client Complete!"); } @Override public void onError(Throwable throwable) { Log.e(TAG, "onError: " + throwable.getMessage()); } @Override public void onNext(Object o) { final String message = o.toString(); Log.d(TAG, "Client onNext: " + message); adapter.add(message); } }); }
@Override public Observable<ObservableConnection<O, I>> connect() { return LoadBalancerCommand.<ObservableConnection<O, I>>builder() .withLoadBalancerContext(lbContext) .build() .submit(new ServerOperation<ObservableConnection<O, I>>() { @Override public Observable<ObservableConnection<O, I>> call(Server server) { return getOrCreateRxClient(server).connect(); } }); }
@Test public void testUdpClientWithoutTimeout() throws Exception { int port = choosePort(); UdpServer<DatagramPacket, DatagramPacket> server = new HelloUdpServer(port, 0).createServer(); server.start(); BaseLoadBalancer lb = new BaseLoadBalancer(); lb.setServersList(Lists.newArrayList(new Server("localhost", port))); RxClient<DatagramPacket, DatagramPacket> client = RibbonTransport.newUdpClient(lb, DefaultClientConfigImpl.getClientConfigWithDefaultValues()); try { String response = client.connect().flatMap(new Func1<ObservableConnection<DatagramPacket, DatagramPacket>, Observable<DatagramPacket>>() { @Override public Observable<DatagramPacket> call(ObservableConnection<DatagramPacket, DatagramPacket> connection) { connection.writeStringAndFlush("Is there anybody out there?"); return connection.getInput(); } }).take(1) .map(new Func1<DatagramPacket, String>() { @Override public String call(DatagramPacket datagramPacket) { return datagramPacket.content().toString(Charset.defaultCharset()); } }) .toBlocking() .first(); assertEquals(HelloUdpServer.WELCOME_MSG, response); } finally { server.shutdown(); } }
public void start() { int port; try { port = choosePort(); } catch (SocketException e) { throw new RuntimeException("Error choosing point", e); } server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() { @Override public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) { return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() { @Override public Observable<Void> call(final DatagramPacket received) { return Observable.interval(timeout, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() { @Override public Observable<Void> call(Long aLong) { InetSocketAddress sender = received.sender(); LOG.info("Received datagram. Sender: " + sender); ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length); data.writeBytes(WELCOME_MSG_BYTES); return newConnection.writeAndFlush(new DatagramPacket(data, sender)); } }); } }); } }); server.start(); LOG.info("UDP hello server started at port: " + port); }
public static void main(String[] args) { RxNetty.createTcpServer(8181, PipelineConfigurators.textOnlyConfigurator(), new ConnectionHandler<String, String>() { @Override public Observable<Void> handle(ObservableConnection<String, String> newConnection) { return startEventStream(newConnection); } }).startAndWait(); }
public static void main(String[] args) { Observable<ObservableConnection<String, ByteBuf>> connectionObservable = RxNetty.createTcpClient("localhost", 8181, new PipelineConfigurator<String, ByteBuf>() { @Override public void configureNewPipeline(ChannelPipeline pipeline) { pipeline.addLast(new StringDecoder()); } }).connect(); connectionObservable.flatMap(new Func1<ObservableConnection<String, ByteBuf>, Observable<String>>() { @Override public Observable<String> call(ObservableConnection<String, ByteBuf> connection) { ByteBuf request = Unpooled.copiedBuffer("subscribe:".getBytes()); Observable<String> subscribeWrite = connection.writeAndFlush(request).map(new Func1<Void, String>() { @Override public String call(Void aVoid) { return ""; } }); Observable<String> data = connection.getInput().map(new Func1<String, String>() { @Override public String call(String msg) { return msg.trim(); } }); return Observable.concat(subscribeWrite, data); } }).take(3).toBlockingObservable().forEach(new Action1<Object>() { @Override public void call(Object o) { System.out.println("onNext: " + o); } }); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { inputSubject = PublishSubject.create(); connection = new ObservableConnection<I, O>(ctx, inputSubject); if (null != observableAdapter) { observableAdapter.activate(inputSubject); } super.channelActive(ctx); try { Observable<Void> handledObservable = connectionHandler.handle(connection); if (null == handledObservable) { handledObservable = Observable.empty(); } handledObservable.subscribe(new Subscriber<Void>() { @Override public void onCompleted() { connection.close(); } @Override public void onError(Throwable e) { invokeErrorHandler(e); connection.close(); } @Override public void onNext(Void aVoid) { // No Op. } }); } catch (Throwable throwable) { invokeErrorHandler(throwable); } }
/** * A lazy connect to the {@link ServerInfo} for this client. Every subscription to the returned {@link Observable} will create a fresh connection. * * @return Observable for the connect. Every new subscription will create a fresh connection. */ @Override public Observable<ObservableConnection<O, I>> connect() { return Observable.create(new OnSubscribe<ObservableConnection<O, I>>() { @Override public void call(final Subscriber<? super ObservableConnection<O, I>> subscriber) { try { final ClientConnectionHandler clientConnectionHandler = new ClientConnectionHandler(subscriber); clientBootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { PipelineConfigurator<I, O> configurator = getPipelineConfiguratorForAChannel(clientConnectionHandler, incompleteConfigurator); configurator.configureNewPipeline(ch.pipeline()); } }); // make the connection final ChannelFuture connectFuture = clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort()) .addListener(clientConnectionHandler); subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { if (!connectFuture.isDone()) { connectFuture.cancel(true); // Unsubscribe here means, no more connection is required. A close on connection is explicit. } } })); } catch (Throwable e) { subscriber.onError(e); } } }); }
@Override public Observable<Void> handle(final ObservableConnection<O, I> newConnection) { return Observable.create(new OnSubscribe<Void>() { @Override public void call(Subscriber<? super Void> voidSub) { connectionObserver.onNext(newConnection); connectionObserver.onCompleted(); // The observer is no longer looking for any more connections. } }); }
@Before public void setUp() throws Exception { server = RxNetty.createTcpServer(PORT, new ConnectionHandler<ByteBuf, ByteBuf>() { @Override public Observable<Void> handle(ObservableConnection<ByteBuf, ByteBuf> newConnection) { return Observable.error(new IllegalStateException("I always throw an error.")); } }); }
private static void blockTillConnected() throws InterruptedException, ExecutionException { RxNetty.createTcpClient("localhost", PORT).connect().flatMap( new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<?>>() { @Override public Observable<Void> call(ObservableConnection<ByteBuf, ByteBuf> connection) { return connection.close(); } }).toBlockingObservable().toFuture().get(); }
@Override public boolean allowed( ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) { InetSocketAddress inetSocketAddress = (InetSocketAddress) connection.getChannelHandlerContext().channel().remoteAddress(); return whiteList.get().contains(inetSocketAddress.getAddress().getHostAddress()); }
public static IngressPolicy allowAll(){ return new IngressPolicy(){ @Override public boolean allowed( ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) { return true; } }; }
private static <T> Observable<T> createTcpConnectionToServer(String host, int port, final Decoder<T> decoder, final RemoteUnsubscribe remoteUnsubscribe){ return RxNetty.createTcpClient(host, port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>( new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){ @Override public void configureNewPipeline(ChannelPipeline pipeline) { // pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB } }, new RxEventPipelineConfigurator())) .connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>(){ @Override public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) { connection.writeAndFlush(RemoteRxEvent.subscribed()); // send subscribe event to server remoteUnsubscribe.setConnection(connection); return connection.getInput(); } }) // data received form server .map(new Func1<RemoteRxEvent,Notification<T>>(){ @Override public Notification<T> call(RemoteRxEvent rxEvent) { if (rxEvent.getType() == RemoteRxEvent.Type.next){ return Notification.createOnNext(decoder.decode(rxEvent.getData())); }else if (rxEvent.getType() == RemoteRxEvent.Type.error){ return Notification.createOnError(fromBytesToThrowable(rxEvent.getData())); }else if (rxEvent.getType() == RemoteRxEvent.Type.completed){ return Notification.createOnCompleted(); }else{ throw new RuntimeException("RemoteRxEvent of type:"+rxEvent.getType()+", not supported."); } } }) .<T>dematerialize(); }
@Override public synchronized SlotAssignment assignSlot(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection) { SlotAssignment assignment = SlotAssignment.notAssigned(); if (slotTokens.size() > 0){ Integer slot = slotTokens.remove(0); // grab first slot slotAssignments.put(connection, slot); // make assignment assignment = new SlotAssignment(slotAssignments.get(connection), numSlots); } return assignment; }
@Override public synchronized void releaseSlot(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection) { Integer freeSlot = slotAssignments.get(connection); if (freeSlot != null){ slotTokens.add(freeSlot); slotAssignments.remove(connection); } }
public static void main(final String[] args) { RxServer<TextWebSocketFrame, TextWebSocketFrame> webSocketServer = RxNetty.newWebSocketServerBuilder( 8888, new ConnectionHandler<TextWebSocketFrame, TextWebSocketFrame>() { @Override public Observable<Void> handle(final ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) { return connection.getInput().flatMap(new Func1<WebSocketFrame, Observable<Void>>() { @Override public Observable<Void> call(WebSocketFrame wsFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) wsFrame; System.out.println("Got message: " + textFrame.text()); return connection.writeAndFlush(new TextWebSocketFrame(textFrame.text().toUpperCase())); } }); } } ).build(); Karyon.forWebSocketServer( webSocketServer, new KaryonBootstrapModule(), new ArchaiusBootstrapModule("websocket-echo-server"), // KaryonEurekaModule.asBootstrapModule(), /* Uncomment if you need eureka */ Karyon.toBootstrapModule(KaryonWebAdminModule.class), ShutdownModule.asBootstrapModule(), KaryonServoModule.asBootstrapModule()) .startAndWaitTillShutdown(); }
@Override public Observable<HttpClientResponse<O>> submit(HttpClientRequest<I> request) { Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable = connect(); return submit(request, connectionObservable); }
@Override public Observable<HttpClientResponse<O>> submit(HttpClientRequest<I> request, ClientConfig config) { Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable = connect(); return submit(request, connectionObservable, config); }
protected Observable<HttpClientResponse<O>> submit(final HttpClientRequest<I> request, Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable) { return submit(request, connectionObservable, null == clientConfig ? HttpClientConfig.DEFAULT_CONFIG : clientConfig); }
private ClientConnectionHandler(Observer<? super ObservableConnection<O, I>> connectionObserver) { this.connectionObserver = connectionObserver; }
void setConnection( ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) { this.connection = connection; }
@Override public SlotAssignment assignSlot(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection) { return new SlotAssignment(1, 1); }
@Override public void releaseSlot(ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {}
@Override public Observable<Void> handle(ObservableConnection<String, String> connection) { return connection.writeAndFlush(SERVER_MESSAGE); }
@Override public Observable<Void> handle(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> newConnection) { return newConnection.writeAndFlush(new TextWebSocketFrame(SERVER_MESSAGE)); }
/** * Creates exactly one new connection for every subscription to the returned observable. * * @return A new obserbvable which creates a single connection for every connection. */ Observable<ObservableConnection<O, I>> connect();
public boolean allowed(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection);