Java 类io.reactivex.netty.channel.ObservableConnection 实例源码

项目:ribbon    文件:HelloUdpServer.java   
public UdpServer<DatagramPacket, DatagramPacket> createServer() {
    UdpServer<DatagramPacket, DatagramPacket> server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() {
        @Override
        public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) {
            return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() {
                @Override
                public Observable<Void> call(final DatagramPacket received) {
                    return Observable.interval(delay, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() {
                        @Override
                        public Observable<Void> call(Long aLong) {
                            InetSocketAddress sender = received.sender();
                            System.out.println("Received datagram. Sender: " + sender);
                            ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length);
                            data.writeBytes(WELCOME_MSG_BYTES);
                            return newConnection.writeAndFlush(new DatagramPacket(data, sender));
                        }
                    });
                }
            });
        }
    });
    System.out.println("UDP hello server started at port: " + port);
    return server;
}
项目:ribbon    文件:MyUDPClient.java   
public Observable<DatagramPacket> submit(final String content) {
    return LoadBalancerCommand.<DatagramPacket>builder()
            .withLoadBalancerContext(lbContext)
            .build()
            .submit(new ServerOperation<DatagramPacket>() {
                @Override
                public Observable<DatagramPacket> call(Server server) {
                    RxClient<DatagramPacket, DatagramPacket> rxClient = getOrCreateRxClient(server);
                    return rxClient.connect().flatMap(new Func1<ObservableConnection<DatagramPacket, DatagramPacket>, Observable<? extends DatagramPacket>>() {
                        @Override
                        public Observable<? extends DatagramPacket> call(ObservableConnection<DatagramPacket, DatagramPacket> connection) {
                            connection.writeStringAndFlush(content);
                            return connection.getInput().timeout(10, TimeUnit.MILLISECONDS).take(1);
                        }
                    });
                }
            });
}
项目:RxNetty    文件:TcpEventStreamClientFast.java   
public static void main(String[] args) {
    Observable<ObservableConnection<String, String>> connectionObservable =
            RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect();
    connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() {
        @Override
        public Observable<?> call(ObservableConnection<String, String> connection) {
            return connection.getInput().map(new Func1<String, String>() {
                @Override
                public String call(String msg) {
                    return msg.trim();
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println("onNext event => " + o);
        }
    });

}
项目:RxNetty    文件:TcpEchoServer.java   
public static void main(final String[] args) {
    final int port = 8181;
    RxNetty.createTcpServer(port, PipelineConfigurators.textOnlyConfigurator(),
                            new ConnectionHandler<String, String>() {
                                @Override
                                public Observable<Void> handle(
                                        final ObservableConnection<String, String> connection) {
                                    System.out.println("New client connection established.");
                                    connection.writeAndFlush("Welcome! \n\n");
                                    return connection.getInput().flatMap(new Func1<String, Observable<Void>>() {
                                        @Override
                                        public Observable<Void> call(String msg) {
                                            System.out.println("onNext: " + msg);
                                            msg = msg.trim();
                                            if (!msg.isEmpty()) {
                                                return connection.writeAndFlush("echo => " + msg + '\n');
                                            } else {
                                                return COMPLETED_OBSERVABLE;
                                            }
                                        }
                                    });
                                }
                            }).startAndWait();
}
项目:RxNetty    文件:TcpEventStreamClientSlow.java   
public static void main(String[] args) {
    Observable<ObservableConnection<String, String>> connectionObservable =
            RxNetty.createTcpClient("localhost", 8181, PipelineConfigurators.stringMessageConfigurator()).connect();
    connectionObservable.flatMap(new Func1<ObservableConnection<String, String>, Observable<?>>() {
        @Override
        public Observable<?> call(ObservableConnection<String, String> connection) {
            return connection.getInput().map(new Func1<String, String>() {
                @Override
                public String call(String msg) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return msg.trim();
                }
            });
        }
    }).toBlockingObservable().forEach(new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println("onNext event => " + o);
        }
    });

}
项目:RxNetty    文件:HttpClientImpl.java   
protected Observable<HttpClientResponse<O>> submit(final HttpClientRequest<I> request,
                                             final Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable,
                                             final ClientConfig config) {
    enrichRequest(request, config);

    // Here we do not map the connection Observable and return because the onComplete() of connectionObservable,
    // does not indicate onComplete of the request processing.
    return Observable.create(new Observable.OnSubscribe<HttpClientResponse<O>>() {
        @Override
        public void call(final Subscriber<? super HttpClientResponse<O>> subscriber) {
            final Subscription connectSubscription =
                    connectionObservable.subscribe(new ConnectObserver<I, O>(request, subscriber));

            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    //TODO: Cancel write & if the response is not over, disconnect the channel.
                    connectSubscription.unsubscribe();

                }
            }));
        }
    });
}
项目:RxNetty    文件:HttpClientImpl.java   
@Override
public void onNext(ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>> connection) {
    ClientRequestResponseConverter converter =
            connection.getChannelHandlerContext().pipeline().get(ClientRequestResponseConverter.class);
    if (null != converter) {
        converter.setRequestProcessingObserver(requestProcessingObserver);
    }

    connection.getInput().subscribe(requestProcessingObserver);
    connection.writeAndFlush(request).doOnError(new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            // If the write fails, the response should get the error. Completion & onNext are managed by
            // the response observable itself.
            requestProcessingObserver.onError(throwable);
        }
    });
}
项目:karyon    文件:KaryonTcpModuleTest.java   
@Test
public void testGovernatedTcpServer() throws Exception {
    String message = RxNetty.createTcpClient("localhost", server.getServerPort()).connect()
            .flatMap(new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<String>>() {
                @Override
                public Observable<String> call(ObservableConnection<ByteBuf, ByteBuf> connection) {
                    return connection.getInput().map(new Func1<ByteBuf, String>() {
                        @Override
                        public String call(ByteBuf byteBuf) {
                            return byteBuf.toString(Charset.defaultCharset());
                        }
                    });
                }
            }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);

    assertEquals("Invalid message received from server", SERVER_MESSAGE, message);
}
项目:karyon    文件:KaryonWebSocketsModuleTest.java   
@Test
public void testGovernatedTcpServer() throws Exception {
    String message = RxNetty.<TextWebSocketFrame, TextWebSocketFrame>newWebSocketClientBuilder("localhost", server.getServerPort())
            .build()
            .connect()
            .flatMap(new Func1<ObservableConnection<TextWebSocketFrame, TextWebSocketFrame>, Observable<String>>() {
                @Override
                public Observable<String> call(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) {
                    return connection.getInput().map(new Func1<TextWebSocketFrame, String>() {
                        @Override
                        public String call(TextWebSocketFrame frame) {
                            return frame.text();
                        }
                    });
                }
            }).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);

    assertEquals("Invalid message received from server", SERVER_MESSAGE, message);
}
项目:karyon    文件:ShutdownListener.java   
@Override
public Observable<Void> handle(final ObservableConnection<String, String> conn) {
    return conn.getInput().take(1) /*Take only one command per connection*/
               .doOnNext(new Action1<String>() {
                   @Override
                   public void call(String s) {
                       logger.info("Received a command: " + s);
                   }
               })
               .flatMap(commandHandler)
               .doOnCompleted(new Action0() {
                   @Override
                   public void call() {
                       try {
                           shutdown();
                       } catch (InterruptedException e) {
                           logger.error("Interrupted while shutting down the shutdown command listener.");
                       }
                   }
               });
}
项目:karyon    文件:TcpPipelineHandlers.java   
@Override
public Observable<Void> handle(final ObservableConnection<ByteBuf, ByteBuf> connection) {
    System.out.println("New frontend connection");
    return connection.getInput().flatMap(new Func1<ByteBuf, Observable<Void>>() {
        @Override
        public Observable<Void> call(ByteBuf byteBuf) {
            String message = byteBuf.toString(Charset.defaultCharset());
            System.out.println("Received: " + message);
            queueProvider.put(message);

            ByteBuf output = connection.getAllocator().buffer();
            output.writeBytes("Want some more:\n".getBytes());
            return connection.writeAndFlush(output);
        }
    });
}
项目:karyon    文件:TcpPipelineHandlers.java   
@Override
public Observable<Void> handle(final ObservableConnection<ByteBuf, ByteBuf> connection) {
    System.out.println("New backend connection");

    return Observable.interval(1, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Void>>() {
        @Override
        public Observable<Void> call(Long tick) {
            while (!queueProvider.isEmpty()) {
                ByteBuf output = connection.getAllocator().buffer();
                output.writeBytes(queueProvider.poll().getBytes());
                connection.write(output);
            }
            return connection.flush();
        }
    });
}
项目:RxNetty-Android    文件:ServerFragment.java   
@OnClick(R.id.client_button) void startClient() {
  Observable<ObservableConnection<String, String>> connectionObservable =
      RxNetty.createTcpClient("localhost", PORT, PipelineConfigurators.textOnlyConfigurator()).connect();

  connectionObservable.flatMap(connection -> {
    Observable<String> helloMessage = connection.getInput()
        .take(1).map(String::trim);

    // output 10 values at intervals and receive the echo back
    Observable<String> intervalOutput =

        Observable.interval(500, TimeUnit.MILLISECONDS)
            .flatMap(aLong -> connection.writeAndFlush(String.valueOf(aLong + 1))
                .map(aVoid -> ""));

    // capture the output from the server
    Observable<String> echo = connection.getInput().map(String::trim);

    // wait for the helloMessage then start the output and receive echo input
    return Observable.concat(helloMessage, Observable.merge(intervalOutput, echo));
  })
  .take(10)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<Object>() {
    @Override public void onCompleted() {
      Log.d(TAG, "Client Complete!");
    }

    @Override public void onError(Throwable throwable) {
      Log.e(TAG, "onError: " + throwable.getMessage());
    }

    @Override public void onNext(Object o) {
      final String message = o.toString();
      Log.d(TAG, "Client onNext: " + message);
      adapter.add(message);
    }
  });
}
项目:ribbon    文件:LoadBalancingRxClient.java   
@Override
public Observable<ObservableConnection<O, I>> connect() {
    return LoadBalancerCommand.<ObservableConnection<O, I>>builder()
            .withLoadBalancerContext(lbContext)
            .build()
            .submit(new ServerOperation<ObservableConnection<O, I>>() {
                @Override
                public Observable<ObservableConnection<O, I>> call(Server server) {
                    return getOrCreateRxClient(server).connect();            
                }                    
            });
}
项目:ribbon    文件:UdpClientTest.java   
@Test
public void testUdpClientWithoutTimeout() throws Exception {
    int port = choosePort();
    UdpServer<DatagramPacket, DatagramPacket> server = new HelloUdpServer(port, 0).createServer();
    server.start();
    BaseLoadBalancer lb = new BaseLoadBalancer();
    lb.setServersList(Lists.newArrayList(new Server("localhost", port)));
    RxClient<DatagramPacket, DatagramPacket> client = RibbonTransport.newUdpClient(lb,
            DefaultClientConfigImpl.getClientConfigWithDefaultValues());
    try {
        String response = client.connect().flatMap(new Func1<ObservableConnection<DatagramPacket, DatagramPacket>,
                Observable<DatagramPacket>>() {
            @Override
            public Observable<DatagramPacket> call(ObservableConnection<DatagramPacket, DatagramPacket> connection) {
                connection.writeStringAndFlush("Is there anybody out there?");
                return connection.getInput();
            }
        }).take(1)
                .map(new Func1<DatagramPacket, String>() {
                    @Override
                    public String call(DatagramPacket datagramPacket) {
                        return datagramPacket.content().toString(Charset.defaultCharset());
                    }
                })
                .toBlocking()
                .first();
        assertEquals(HelloUdpServer.WELCOME_MSG, response);
    } finally {
        server.shutdown();
    }
}
项目:ribbon    文件:HelloUdpServerExternalResource.java   
public void start() {
    int port;
    try {
        port = choosePort();
    } catch (SocketException e) {
        throw new RuntimeException("Error choosing point", e);
    }

    server = RxNetty.createUdpServer(port, new ConnectionHandler<DatagramPacket, DatagramPacket>() {
        @Override
        public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) {
            return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() {
                @Override
                public Observable<Void> call(final DatagramPacket received) {
                    return Observable.interval(timeout, TimeUnit.MILLISECONDS).take(1).flatMap(new Func1<Long, Observable<Void>>() {
                        @Override
                        public Observable<Void> call(Long aLong) {
                            InetSocketAddress sender = received.sender();
                            LOG.info("Received datagram. Sender: " + sender);
                            ByteBuf data = newConnection.getChannel().alloc().buffer(WELCOME_MSG_BYTES.length);
                            data.writeBytes(WELCOME_MSG_BYTES);
                            return newConnection.writeAndFlush(new DatagramPacket(data, sender));
                        }
                    });
                }
            });
        }
    });

    server.start();

    LOG.info("UDP hello server started at port: " + port);
}
项目:RxNetty    文件:TcpEventStreamServer.java   
public static void main(String[] args) {
    RxNetty.createTcpServer(8181, PipelineConfigurators.textOnlyConfigurator(),
                            new ConnectionHandler<String, String>() {
                                @Override
                                public Observable<Void> handle(ObservableConnection<String, String> newConnection) {
                                    return startEventStream(newConnection);
                                }
                            }).startAndWait();
}
项目:RxNetty    文件:TcpIntervalClientTakeN.java   
public static void main(String[] args) {
    Observable<ObservableConnection<String, ByteBuf>> connectionObservable =
            RxNetty.createTcpClient("localhost", 8181, new PipelineConfigurator<String, ByteBuf>() {
                @Override
                public void configureNewPipeline(ChannelPipeline pipeline) {
                    pipeline.addLast(new StringDecoder());
                }
            }).connect();
    connectionObservable.flatMap(new Func1<ObservableConnection<String, ByteBuf>, Observable<String>>() {
        @Override
        public Observable<String> call(ObservableConnection<String, ByteBuf> connection) {
            ByteBuf request = Unpooled.copiedBuffer("subscribe:".getBytes());
            Observable<String> subscribeWrite = connection.writeAndFlush(request).map(new Func1<Void, String>() {
                @Override
                public String call(Void aVoid) {
                    return "";
                }
            });

            Observable<String> data = connection.getInput().map(new Func1<String, String>() {
                @Override
                public String call(String msg) {
                    return msg.trim();
                }
            });

            return Observable.concat(subscribeWrite, data);
        }
    }).take(3).toBlockingObservable().forEach(new Action1<Object>() {
        @Override
        public void call(Object o) {
            System.out.println("onNext: " + o);
        }
    });

}
项目:RxNetty    文件:ConnectionLifecycleHandler.java   
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    inputSubject = PublishSubject.create();
    connection = new ObservableConnection<I, O>(ctx, inputSubject);
    if (null != observableAdapter) {
        observableAdapter.activate(inputSubject);
    }
    super.channelActive(ctx);
    try {
        Observable<Void> handledObservable = connectionHandler.handle(connection);
        if (null == handledObservable) {
            handledObservable = Observable.empty();
        }

        handledObservable.subscribe(new Subscriber<Void>() {
            @Override
            public void onCompleted() {
                connection.close();
            }

            @Override
            public void onError(Throwable e) {
                invokeErrorHandler(e);
                connection.close();
            }

            @Override
            public void onNext(Void aVoid) {
                // No Op.
            }
        });
    } catch (Throwable throwable) {
        invokeErrorHandler(throwable);
    }
}
项目:RxNetty    文件:RxClientImpl.java   
/**
 * A lazy connect to the {@link ServerInfo} for this client. Every subscription to the returned {@link Observable} will create a fresh connection.
 * 
 * @return Observable for the connect. Every new subscription will create a fresh connection.
 */
@Override
public Observable<ObservableConnection<O, I>> connect() {
    return Observable.create(new OnSubscribe<ObservableConnection<O, I>>() {

        @Override
        public void call(final Subscriber<? super ObservableConnection<O, I>> subscriber) {
            try {
                final ClientConnectionHandler clientConnectionHandler = new ClientConnectionHandler(subscriber);
                clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        PipelineConfigurator<I, O> configurator = getPipelineConfiguratorForAChannel(clientConnectionHandler,
                                incompleteConfigurator);
                        configurator.configureNewPipeline(ch.pipeline());
                    }
                });

                // make the connection
                final ChannelFuture connectFuture =
                        clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort())
                                .addListener(clientConnectionHandler);

                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        if (!connectFuture.isDone()) {
                            connectFuture.cancel(true); // Unsubscribe here means, no more connection is required. A close on connection is explicit.
                        }
                    }
                }));

            } catch (Throwable e) {
                subscriber.onError(e);
            }
        }
    });
}
项目:RxNetty    文件:RxClientImpl.java   
@Override
public Observable<Void> handle(final ObservableConnection<O, I> newConnection) {
    return Observable.create(new OnSubscribe<Void>() {
        @Override
        public void call(Subscriber<? super Void> voidSub) {
            connectionObserver.onNext(newConnection);
            connectionObserver.onCompleted(); // The observer is no longer looking for any more connections.
        }
    });
}
项目:RxNetty    文件:UnexpectedErrorsTest.java   
@Before
public void setUp() throws Exception {
    server = RxNetty.createTcpServer(PORT, new ConnectionHandler<ByteBuf, ByteBuf>() {
        @Override
        public Observable<Void> handle(ObservableConnection<ByteBuf, ByteBuf> newConnection) {
            return Observable.error(new IllegalStateException("I always throw an error."));
        }
    });
}
项目:RxNetty    文件:UnexpectedErrorsTest.java   
private static void blockTillConnected() throws InterruptedException, ExecutionException {
    RxNetty.createTcpClient("localhost", PORT).connect().flatMap(
            new Func1<ObservableConnection<ByteBuf, ByteBuf>, Observable<?>>() {
                @Override
                public Observable<Void> call(ObservableConnection<ByteBuf, ByteBuf> connection) {
                    return connection.close();
                }
            }).toBlockingObservable().toFuture().get();
}
项目:RxNetty    文件:InetAddressWhiteListIngressPolicy.java   
@Override
public boolean allowed(
        ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
    InetSocketAddress inetSocketAddress 
        = (InetSocketAddress) connection.getChannelHandlerContext().channel().remoteAddress();
    return whiteList.get().contains(inetSocketAddress.getAddress().getHostAddress());
}
项目:RxNetty    文件:IngressPolicies.java   
public static IngressPolicy allowAll(){
    return new IngressPolicy(){
        @Override
        public boolean allowed(
                ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
            return true;
        }
    };
}
项目:RxNetty    文件:RemoteObservable.java   
private static <T> Observable<T> createTcpConnectionToServer(String host, int port, final Decoder<T> decoder, 
            final RemoteUnsubscribe remoteUnsubscribe){
        return RxNetty.createTcpClient(host, port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
                new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){
                    @Override
                    public void configureNewPipeline(ChannelPipeline pipeline) {
//                      pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging             
                        pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
                        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB
                    }
                }, new RxEventPipelineConfigurator()))
            .connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>(){
            @Override
            public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
                connection.writeAndFlush(RemoteRxEvent.subscribed()); // send subscribe event to server
                remoteUnsubscribe.setConnection(connection);
                return connection.getInput();
            }
        })
        // data received form server
        .map(new Func1<RemoteRxEvent,Notification<T>>(){
            @Override
            public Notification<T> call(RemoteRxEvent rxEvent) {
                if (rxEvent.getType() == RemoteRxEvent.Type.next){
                    return Notification.createOnNext(decoder.decode(rxEvent.getData()));
                }else if (rxEvent.getType() == RemoteRxEvent.Type.error){
                    return Notification.createOnError(fromBytesToThrowable(rxEvent.getData()));
                }else if (rxEvent.getType() == RemoteRxEvent.Type.completed){
                    return Notification.createOnCompleted();
                }else{
                    throw new RuntimeException("RemoteRxEvent of type:"+rxEvent.getType()+", not supported.");
                }
            }
        })
        .<T>dematerialize();
    }
项目:RxNetty    文件:HashCodeSlotting.java   
@Override
public synchronized SlotAssignment assignSlot(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection) {
    SlotAssignment assignment = SlotAssignment.notAssigned();
    if (slotTokens.size() > 0){
        Integer slot = slotTokens.remove(0); // grab first slot
        slotAssignments.put(connection, slot); // make assignment
        assignment = new SlotAssignment(slotAssignments.get(connection), numSlots);
    }
    return assignment;
}
项目:RxNetty    文件:HashCodeSlotting.java   
@Override
public synchronized void releaseSlot(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection) {
    Integer freeSlot = slotAssignments.get(connection);
    if (freeSlot != null){
        slotTokens.add(freeSlot);
        slotAssignments.remove(connection);
    }
}
项目:karyon    文件:WebSocketEchoServer.java   
public static void main(final String[] args) {
    RxServer<TextWebSocketFrame, TextWebSocketFrame> webSocketServer = RxNetty.newWebSocketServerBuilder(
            8888,
            new ConnectionHandler<TextWebSocketFrame, TextWebSocketFrame>() {
                @Override
                public Observable<Void> handle(final ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) {
                    return connection.getInput().flatMap(new Func1<WebSocketFrame, Observable<Void>>() {
                        @Override
                        public Observable<Void> call(WebSocketFrame wsFrame) {
                            TextWebSocketFrame textFrame = (TextWebSocketFrame) wsFrame;
                            System.out.println("Got message: " + textFrame.text());
                            return connection.writeAndFlush(new TextWebSocketFrame(textFrame.text().toUpperCase()));
                        }
                    });
                }
            }
    ).build();
    Karyon.forWebSocketServer(
            webSocketServer,
            new KaryonBootstrapModule(),
            new ArchaiusBootstrapModule("websocket-echo-server"),
            // KaryonEurekaModule.asBootstrapModule(), /* Uncomment if you need eureka */
            Karyon.toBootstrapModule(KaryonWebAdminModule.class),
            ShutdownModule.asBootstrapModule(),
            KaryonServoModule.asBootstrapModule())
            .startAndWaitTillShutdown();
}
项目:RxNetty    文件:HttpClientImpl.java   
@Override
public Observable<HttpClientResponse<O>> submit(HttpClientRequest<I> request) {
    Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable = connect();
    return submit(request, connectionObservable);
}
项目:RxNetty    文件:HttpClientImpl.java   
@Override
public Observable<HttpClientResponse<O>> submit(HttpClientRequest<I> request, ClientConfig config) {
    Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable = connect();
    return submit(request, connectionObservable, config);
}
项目:RxNetty    文件:HttpClientImpl.java   
protected Observable<HttpClientResponse<O>> submit(final HttpClientRequest<I> request,
                                          Observable<ObservableConnection<HttpClientResponse<O>, HttpClientRequest<I>>> connectionObservable) {
    return submit(request, connectionObservable, null == clientConfig
                                                 ? HttpClientConfig.DEFAULT_CONFIG : clientConfig);
}
项目:RxNetty    文件:RxClientImpl.java   
private ClientConnectionHandler(Observer<? super ObservableConnection<O, I>> connectionObserver) {
    this.connectionObserver = connectionObserver;
}
项目:RxNetty    文件:RemoteUnsubscribe.java   
void setConnection(
        ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
    this.connection = connection;
}
项目:RxNetty    文件:NoSlotting.java   
@Override
public SlotAssignment assignSlot(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection) {
    return new SlotAssignment(1, 1);
}
项目:RxNetty    文件:NoSlotting.java   
@Override
public void releaseSlot(ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {}
项目:karyon    文件:KaryonTcpModuleTest.java   
@Override
public Observable<Void> handle(ObservableConnection<String, String> connection) {
    return connection.writeAndFlush(SERVER_MESSAGE);
}
项目:karyon    文件:KaryonWebSocketsModuleTest.java   
@Override
public Observable<Void> handle(ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> newConnection) {
    return newConnection.writeAndFlush(new TextWebSocketFrame(SERVER_MESSAGE));
}
项目:RxNetty    文件:RxClient.java   
/**
 * Creates exactly one new connection for every subscription to the returned observable.
 *
 * @return A new obserbvable which creates a single connection for every connection.
 */
Observable<ObservableConnection<O, I>> connect();
项目:RxNetty    文件:IngressPolicy.java   
public boolean allowed(ObservableConnection<RemoteRxEvent,RemoteRxEvent> connection);