@Override public HttpClient<ByteBuf, ByteBuf> newHttpClient(final IClientConfig config) { final List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = new ArrayList<>(); listeners.add(createBearerHeaderAdder()); final PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator = new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>(new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(), new HttpObjectAggregationConfigurator(maxChunkSize)); final LoadBalancingHttpClient<ByteBuf, ByteBuf> client = LoadBalancingHttpClient.<ByteBuf, ByteBuf>builder() .withClientConfig(config) .withExecutorListeners(listeners) .withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config)) .withPipelineConfigurator(pipelineConfigurator) .withPoolCleanerScheduler(RibbonTransport.poolCleanerScheduler) .build(); return client; }
public RxClientImpl(ServerInfo serverInfo, Bootstrap clientBootstrap, PipelineConfigurator<O, I> pipelineConfigurator, ClientConfig clientConfig) { if (null == clientBootstrap) { throw new NullPointerException("Client bootstrap can not be null."); } if (null == serverInfo) { throw new NullPointerException("Server info can not be null."); } if (null == clientConfig) { throw new NullPointerException("Client config can not be null."); } this.clientConfig = clientConfig; this.serverInfo = serverInfo; this.clientBootstrap = clientBootstrap; if (clientConfig.isReadTimeoutSet()) { ReadTimeoutPipelineConfigurator readTimeoutConfigurator = new ReadTimeoutPipelineConfigurator(clientConfig.getReadTimeoutInMillis(), TimeUnit.MILLISECONDS); if (null != pipelineConfigurator) { pipelineConfigurator = new PipelineConfiguratorComposite<O, I>(pipelineConfigurator, readTimeoutConfigurator); } else { pipelineConfigurator = new PipelineConfiguratorComposite<O, I>(readTimeoutConfigurator); } } incompleteConfigurator = pipelineConfigurator; }
public RxServer(ServerBootstrap bootstrap, int port, final PipelineConfigurator<I, O> pipelineConfigurator, final ConnectionHandler<I, O> connectionHandler) { if (null == bootstrap) { throw new NullPointerException("Bootstrap can not be null."); } this.bootstrap = bootstrap; this.port = port; this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { RxRequiredConfigurator<I, O> requiredConfigurator = new RxRequiredConfigurator<I, O>(connectionHandler, errorHandler); PipelineConfigurator<I, O> configurator; if (null == pipelineConfigurator) { configurator = requiredConfigurator; } else { configurator = new PipelineConfiguratorComposite<I, O>(pipelineConfigurator, requiredConfigurator); } configurator.configureNewPipeline(ch.pipeline()); } }); serverStateRef = new AtomicReference<ServerState>(ServerState.Created); }
private static <T> void serveMany(int port, final Observable<List<Observable<T>>> observable, final Encoder<T> encoder, boolean startAndWait, SlottingStrategy<T> slottingStrategy, IngressPolicy ingressPolicy){ RxServer<RemoteRxEvent, RemoteRxEvent> server = RxNetty.createTcpServer(port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>( new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){ @Override public void configureNewPipeline(ChannelPipeline pipeline) { // pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB } }, new RxEventPipelineConfigurator()), new RemoteObservableConnectionHandler<T>(observable, encoder, slottingStrategy, ingressPolicy)); if(startAndWait){ server.startAndWait(); }else{ server.start(); } }
@Override protected PipelineConfigurator<HttpClientRequest<I>, HttpClientResponse<O>> getPipelineConfiguratorForAChannel(ClientConnectionHandler clientConnectionHandler, PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator) { PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator = new PipelineConfiguratorComposite<HttpClientResponse<O>, HttpClientRequest<I>>(pipelineConfigurator, new ClientRequiredConfigurator<I, O>()); return super.getPipelineConfiguratorForAChannel(clientConnectionHandler, configurator); }
protected PipelineConfigurator<I, O> getPipelineConfiguratorForAChannel(ClientConnectionHandler clientConnectionHandler, PipelineConfigurator<O, I> pipelineConfigurator) { RxRequiredConfigurator<O, I> requiredConfigurator = new RxRequiredConfigurator<O, I>(clientConnectionHandler); PipelineConfiguratorComposite<I, O> toReturn; if (null != pipelineConfigurator) { toReturn = new PipelineConfiguratorComposite<I, O>(pipelineConfigurator, requiredConfigurator); } else { toReturn = new PipelineConfiguratorComposite<I, O>(requiredConfigurator); } return toReturn; }
private static <T> Observable<T> createTcpConnectionToServer(String host, int port, final Decoder<T> decoder, final RemoteUnsubscribe remoteUnsubscribe){ return RxNetty.createTcpClient(host, port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>( new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){ @Override public void configureNewPipeline(ChannelPipeline pipeline) { // pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB } }, new RxEventPipelineConfigurator())) .connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>(){ @Override public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) { connection.writeAndFlush(RemoteRxEvent.subscribed()); // send subscribe event to server remoteUnsubscribe.setConnection(connection); return connection.getInput(); } }) // data received form server .map(new Func1<RemoteRxEvent,Notification<T>>(){ @Override public Notification<T> call(RemoteRxEvent rxEvent) { if (rxEvent.getType() == RemoteRxEvent.Type.next){ return Notification.createOnNext(decoder.decode(rxEvent.getData())); }else if (rxEvent.getType() == RemoteRxEvent.Type.error){ return Notification.createOnError(fromBytesToThrowable(rxEvent.getData())); }else if (rxEvent.getType() == RemoteRxEvent.Type.completed){ return Notification.createOnCompleted(); }else{ throw new RuntimeException("RemoteRxEvent of type:"+rxEvent.getType()+", not supported."); } } }) .<T>dematerialize(); }
private static <I, O> PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> addRequiredConfigurator( PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> pipelineConfigurator) { return new PipelineConfiguratorComposite<HttpServerRequest<I>, HttpServerResponse<O>>(pipelineConfigurator, new ServerRequiredConfigurator<I, O>()); }