Java 类io.reactivex.netty.pipeline.PipelineConfigurator 实例源码
项目:ribbon
文件:LoadBalancingRxClientWithPoolOptions.java
public LoadBalancingRxClientWithPoolOptions(ILoadBalancer lb, IClientConfig config,
RetryHandler retryHandler,
PipelineConfigurator<O, I> pipelineConfigurator, ScheduledExecutorService poolCleanerScheduler) {
super(lb, config, retryHandler, pipelineConfigurator);
poolEnabled = config.get(CommonClientConfigKey.EnableConnectionPool,
DefaultClientConfigImpl.DEFAULT_ENABLE_CONNECTION_POOL);
if (poolEnabled) {
this.poolCleanerScheduler = poolCleanerScheduler;
int maxTotalConnections = config.get(IClientConfigKey.Keys.MaxTotalConnections,
DefaultClientConfigImpl.DEFAULT_MAX_TOTAL_CONNECTIONS);
int maxConnections = config.get(Keys.MaxConnectionsPerHost, DefaultClientConfigImpl.DEFAULT_MAX_CONNECTIONS_PER_HOST);
MaxConnectionsBasedStrategy perHostStrategy = new DynamicPropertyBasedPoolStrategy(maxConnections,
config.getClientName() + "." + config.getNameSpace() + "." + CommonClientConfigKey.MaxConnectionsPerHost);
globalStrategy = new DynamicPropertyBasedPoolStrategy(maxTotalConnections,
config.getClientName() + "." + config.getNameSpace() + "." + CommonClientConfigKey.MaxTotalConnections);
poolStrategy = new CompositePoolLimitDeterminationStrategy(perHostStrategy, globalStrategy);
idleConnectionEvictionMills = config.get(Keys.ConnIdleEvictTimeMilliSeconds, DefaultClientConfigImpl.DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS);
}
}
项目:RxNetty
文件:RxClientImpl.java
public RxClientImpl(ServerInfo serverInfo, Bootstrap clientBootstrap,
PipelineConfigurator<O, I> pipelineConfigurator, ClientConfig clientConfig) {
if (null == clientBootstrap) {
throw new NullPointerException("Client bootstrap can not be null.");
}
if (null == serverInfo) {
throw new NullPointerException("Server info can not be null.");
}
if (null == clientConfig) {
throw new NullPointerException("Client config can not be null.");
}
this.clientConfig = clientConfig;
this.serverInfo = serverInfo;
this.clientBootstrap = clientBootstrap;
if (clientConfig.isReadTimeoutSet()) {
ReadTimeoutPipelineConfigurator readTimeoutConfigurator =
new ReadTimeoutPipelineConfigurator(clientConfig.getReadTimeoutInMillis(), TimeUnit.MILLISECONDS);
if (null != pipelineConfigurator) {
pipelineConfigurator = new PipelineConfiguratorComposite<O, I>(pipelineConfigurator,
readTimeoutConfigurator);
} else {
pipelineConfigurator = new PipelineConfiguratorComposite<O, I>(readTimeoutConfigurator);
}
}
incompleteConfigurator = pipelineConfigurator;
}
项目:RxNetty
文件:RxServer.java
public RxServer(ServerBootstrap bootstrap, int port, final PipelineConfigurator<I, O> pipelineConfigurator,
final ConnectionHandler<I, O> connectionHandler) {
if (null == bootstrap) {
throw new NullPointerException("Bootstrap can not be null.");
}
this.bootstrap = bootstrap;
this.port = port;
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RxRequiredConfigurator<I, O> requiredConfigurator = new RxRequiredConfigurator<I, O>(connectionHandler,
errorHandler);
PipelineConfigurator<I, O> configurator;
if (null == pipelineConfigurator) {
configurator = requiredConfigurator;
} else {
configurator = new PipelineConfiguratorComposite<I, O>(pipelineConfigurator, requiredConfigurator);
}
configurator.configureNewPipeline(ch.pipeline());
}
});
serverStateRef = new AtomicReference<ServerState>(ServerState.Created);
}
项目:RxNetty
文件:RemoteObservable.java
private static <T> void serveMany(int port, final Observable<List<Observable<T>>> observable, final Encoder<T> encoder,
boolean startAndWait, SlottingStrategy<T> slottingStrategy, IngressPolicy ingressPolicy){
RxServer<RemoteRxEvent, RemoteRxEvent> server
= RxNetty.createTcpServer(port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
// pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB
}
}, new RxEventPipelineConfigurator()),
new RemoteObservableConnectionHandler<T>(observable, encoder, slottingStrategy, ingressPolicy));
if(startAndWait){
server.startAndWait();
}else{
server.start();
}
}
项目:triathlon
文件:MarathonClient.java
public Observable<HttpClientResponse<ByteBuf>> postMessage(String message) {
PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> pipelineConfigurator
= PipelineConfigurators.httpClientConfigurator();
HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(networkAddress.getIpAddress(), port)
.pipelineConfigurator(pipelineConfigurator)
.enableWireLogging(LogLevel.ERROR).build();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/v2/apps");
request.withRawContentSource(Observable.just(message), StringTransformer.DEFAULT_INSTANCE);
request.withHeader("Content-Type", "application/json");
return client.submit(request);
}
项目:ribbon
文件:LoadBalancingRxClient.java
public LoadBalancingRxClient(IClientConfig config, RetryHandler defaultRetryHandler, PipelineConfigurator<O, I> pipelineConfigurator) {
this(LoadBalancerBuilder.newBuilder().withClientConfig(config).buildLoadBalancerFromConfigWithReflection(),
config,
defaultRetryHandler,
pipelineConfigurator
);
}
项目:ribbon
文件:LoadBalancingRxClient.java
public LoadBalancingRxClient(ILoadBalancer lb, IClientConfig config, RetryHandler defaultRetryHandler, PipelineConfigurator<O, I> pipelineConfigurator) {
this.rxClientCache = new ConcurrentHashMap<Server, T>();
this.lbContext = new LoadBalancerContext(lb, config, defaultRetryHandler);
this.defaultRetryHandler = defaultRetryHandler;
this.pipelineConfigurator = pipelineConfigurator;
this.clientConfig = config;
this.listener = createListener(config.getClientName());
eventSubject = new MetricEventsSubject<ClientMetricsEvent<?>>();
boolean isSecure = getProperty(IClientConfigKey.Keys.IsSecure, null, false);
if (isSecure) {
final URL trustStoreUrl = getResourceForOptionalProperty(CommonClientConfigKey.TrustStore);
final URL keyStoreUrl = getResourceForOptionalProperty(CommonClientConfigKey.KeyStore);
boolean isClientAuthRequired = clientConfig.get(IClientConfigKey.Keys.IsClientAuthRequired, false);
if ( // if client auth is required, need both a truststore and a keystore to warrant configuring
// if client is not is not required, we only need a keystore OR a truststore to warrant configuring
(isClientAuthRequired && (trustStoreUrl != null && keyStoreUrl != null))
||
(!isClientAuthRequired && (trustStoreUrl != null || keyStoreUrl != null))
) {
try {
sslContextFactory = new URLSslContextFactory(trustStoreUrl,
clientConfig.get(CommonClientConfigKey.TrustStorePassword),
keyStoreUrl,
clientConfig.get(CommonClientConfigKey.KeyStorePassword));
} catch (ClientSslSocketFactoryException e) {
throw new IllegalArgumentException("Unable to configure custom secure socket factory", e);
}
} else {
sslContextFactory = null;
}
} else {
sslContextFactory = null;
}
addLoadBalancerListener();
}
项目:ribbon
文件:LoadBalancingRxClientWithPoolOptions.java
public LoadBalancingRxClientWithPoolOptions(IClientConfig config,
RetryHandler retryHandler,
PipelineConfigurator<O, I> pipelineConfigurator, ScheduledExecutorService poolCleanerScheduler) {
this(LoadBalancerBuilder.newBuilder().withClientConfig(config).buildDynamicServerListLoadBalancer(),
config,
retryHandler,
pipelineConfigurator,
poolCleanerScheduler);
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config) {
return LoadBalancingHttpClient.<I, O>builder()
.withLoadBalancer(loadBalancer)
.withClientConfig(config)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(poolCleanerScheduler)
.build();
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return LoadBalancingHttpClient.<I, O>builder()
.withClientConfig(config)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(poolCleanerScheduler)
.build();
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config, RetryHandler retryHandler) {
return LoadBalancingHttpClient.<I, O>builder()
.withClientConfig(config)
.withRetryHandler(retryHandler)
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(poolCleanerScheduler)
.build();
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> LoadBalancingHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler,
List<ExecutionListener<HttpClientRequest<I>, HttpClientResponse<O>>> listeners) {
return LoadBalancingHttpClient.<I, O>builder()
.withLoadBalancer(loadBalancer)
.withClientConfig(config)
.withRetryHandler(retryHandler)
.withPipelineConfigurator(pipelineConfigurator)
.withPoolCleanerScheduler(poolCleanerScheduler)
.withExecutorListeners(listeners)
.build();
}
项目:ribbon
文件:RibbonTransport.java
public static <I> LoadBalancingHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config) {
return SSEClient.<I>sseClientBuilder()
.withLoadBalancer(loadBalancer)
.withClientConfig(config)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.build();
}
项目:ribbon
文件:RibbonTransport.java
public static <I> LoadBalancingHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return SSEClient.<I>sseClientBuilder()
.withClientConfig(config)
.withRetryHandler(getDefaultHttpRetryHandlerWithConfig(config))
.withPipelineConfigurator(pipelineConfigurator)
.build();
}
项目:RxNetty
文件:TcpIntervalClientTakeN.java
public static void main(String[] args) {
Observable<ObservableConnection<String, ByteBuf>> connectionObservable =
RxNetty.createTcpClient("localhost", 8181, new PipelineConfigurator<String, ByteBuf>() {
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
pipeline.addLast(new StringDecoder());
}
}).connect();
connectionObservable.flatMap(new Func1<ObservableConnection<String, ByteBuf>, Observable<String>>() {
@Override
public Observable<String> call(ObservableConnection<String, ByteBuf> connection) {
ByteBuf request = Unpooled.copiedBuffer("subscribe:".getBytes());
Observable<String> subscribeWrite = connection.writeAndFlush(request).map(new Func1<Void, String>() {
@Override
public String call(Void aVoid) {
return "";
}
});
Observable<String> data = connection.getInput().map(new Func1<String, String>() {
@Override
public String call(String msg) {
return msg.trim();
}
});
return Observable.concat(subscribeWrite, data);
}
}).take(3).toBlockingObservable().forEach(new Action1<Object>() {
@Override
public void call(Object o) {
System.out.println("onNext: " + o);
}
});
}
项目:RxNetty
文件:HttpClientImpl.java
@Override
protected PipelineConfigurator<HttpClientRequest<I>, HttpClientResponse<O>> getPipelineConfiguratorForAChannel(ClientConnectionHandler clientConnectionHandler,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator) {
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> configurator =
new PipelineConfiguratorComposite<HttpClientResponse<O>, HttpClientRequest<I>>(pipelineConfigurator,
new ClientRequiredConfigurator<I, O>());
return super.getPipelineConfiguratorForAChannel(clientConnectionHandler, configurator);
}
项目:RxNetty
文件:RxClientImpl.java
/**
* A lazy connect to the {@link ServerInfo} for this client. Every subscription to the returned {@link Observable} will create a fresh connection.
*
* @return Observable for the connect. Every new subscription will create a fresh connection.
*/
@Override
public Observable<ObservableConnection<O, I>> connect() {
return Observable.create(new OnSubscribe<ObservableConnection<O, I>>() {
@Override
public void call(final Subscriber<? super ObservableConnection<O, I>> subscriber) {
try {
final ClientConnectionHandler clientConnectionHandler = new ClientConnectionHandler(subscriber);
clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
PipelineConfigurator<I, O> configurator = getPipelineConfiguratorForAChannel(clientConnectionHandler,
incompleteConfigurator);
configurator.configureNewPipeline(ch.pipeline());
}
});
// make the connection
final ChannelFuture connectFuture =
clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort())
.addListener(clientConnectionHandler);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
if (!connectFuture.isDone()) {
connectFuture.cancel(true); // Unsubscribe here means, no more connection is required. A close on connection is explicit.
}
}
}));
} catch (Throwable e) {
subscriber.onError(e);
}
}
});
}
项目:RxNetty
文件:RxClientImpl.java
protected PipelineConfigurator<I, O> getPipelineConfiguratorForAChannel(ClientConnectionHandler clientConnectionHandler,
PipelineConfigurator<O, I> pipelineConfigurator) {
RxRequiredConfigurator<O, I> requiredConfigurator = new RxRequiredConfigurator<O, I>(clientConnectionHandler);
PipelineConfiguratorComposite<I, O> toReturn;
if (null != pipelineConfigurator) {
toReturn = new PipelineConfiguratorComposite<I, O>(pipelineConfigurator, requiredConfigurator);
} else {
toReturn = new PipelineConfiguratorComposite<I, O>(requiredConfigurator);
}
return toReturn;
}
项目:RxNetty
文件:RemoteObservable.java
private static <T> Observable<T> createTcpConnectionToServer(String host, int port, final Decoder<T> decoder,
final RemoteUnsubscribe remoteUnsubscribe){
return RxNetty.createTcpClient(host, port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
// pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB
}
}, new RxEventPipelineConfigurator()))
.connect().flatMap(new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>(){
@Override
public Observable<RemoteRxEvent> call(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
connection.writeAndFlush(RemoteRxEvent.subscribed()); // send subscribe event to server
remoteUnsubscribe.setConnection(connection);
return connection.getInput();
}
})
// data received form server
.map(new Func1<RemoteRxEvent,Notification<T>>(){
@Override
public Notification<T> call(RemoteRxEvent rxEvent) {
if (rxEvent.getType() == RemoteRxEvent.Type.next){
return Notification.createOnNext(decoder.decode(rxEvent.getData()));
}else if (rxEvent.getType() == RemoteRxEvent.Type.error){
return Notification.createOnError(fromBytesToThrowable(rxEvent.getData()));
}else if (rxEvent.getType() == RemoteRxEvent.Type.completed){
return Notification.createOnCompleted();
}else{
throw new RuntimeException("RemoteRxEvent of type:"+rxEvent.getType()+", not supported.");
}
}
})
.<T>dematerialize();
}
项目:karyon
文件:TcpRxServerProvider.java
public TcpRxServerProvider(String name, Class<I> iType, Class<O> oType) {
nameAnnotation = Names.named(name);
connectionHandlerKey = keyFor(ConnectionHandler.class, iType, oType, nameAnnotation);
pipelineConfiguratorKey = Key.get(PipelineConfigurator.class, nameAnnotation);
metricEventsListenerFactoryKey = Key.get(MetricEventsListenerFactory.class, nameAnnotation);
serverConfigKey = Key.get(ServerConfig.class, nameAnnotation);
}
项目:karyon
文件:HttpRxServerProvider.java
public HttpRxServerProvider(String name, Class<I> iType, Class<O> oType) {
nameAnnotation = Names.named(name);
routerKey = keyFor(RequestHandler.class, iType, oType, nameAnnotation);
interceptorSupportKey = keyFor(GovernatorHttpInterceptorSupport.class, iType, oType, nameAnnotation);
pipelineConfiguratorKey = Key.get(PipelineConfigurator.class, nameAnnotation);
metricEventsListenerFactoryKey = Key.get(MetricEventsListenerFactory.class, nameAnnotation);
serverConfigKey = Key.get(ServerConfig.class, nameAnnotation);
}
项目:karyon
文件:WebSocketsRxServerProvider.java
public WebSocketsRxServerProvider(String name, Class<I> iType, Class<O> oType) {
nameAnnotation = Names.named(name);
connectionHandlerKey = keyFor(ConnectionHandler.class, iType, oType, nameAnnotation);
pipelineConfiguratorKey = Key.get(PipelineConfigurator.class, nameAnnotation);
metricEventsListenerFactoryKey = Key.get(MetricEventsListenerFactory.class, nameAnnotation);
serverConfigKey = Key.get(AbstractServerModule.ServerConfig.class, nameAnnotation);
}
项目:karyon
文件:AbstractServerModule.java
protected AbstractServerModule(String moduleName, Class<I> iType, Class<O> oType) {
nameAnnotation = Names.named(moduleName);
this.iType = iType;
this.oType = oType;
pipelineConfiguratorKey = Key.get(PipelineConfigurator.class, nameAnnotation);
serverConfigKey = Key.get(ServerConfig.class, nameAnnotation);
serverConfigBuilder = newServerConfigBuilder();
}
项目:ribbon
文件:LoadBalancingUdpClient.java
public LoadBalancingUdpClient(IClientConfig config,
RetryHandler retryHandler,
PipelineConfigurator<O, I> pipelineConfigurator) {
super(config, retryHandler, pipelineConfigurator);
}
项目:ribbon
文件:LoadBalancingUdpClient.java
public LoadBalancingUdpClient(ILoadBalancer lb, IClientConfig config,
RetryHandler retryHandler,
PipelineConfigurator<O, I> pipelineConfigurator) {
super(lb, config, retryHandler, pipelineConfigurator);
}
项目:ribbon
文件:LoadBalancingTcpClient.java
public LoadBalancingTcpClient(ILoadBalancer lb, IClientConfig config,
RetryHandler retryHandler,
PipelineConfigurator<O, I> pipelineConfigurator, ScheduledExecutorService poolCleanerScheduler) {
super(lb, config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}
项目:ribbon
文件:LoadBalancingTcpClient.java
public LoadBalancingTcpClient(IClientConfig config,
RetryHandler retryHandler,
PipelineConfigurator<O, I> pipelineConfigurator,
ScheduledExecutorService poolCleanerScheduler) {
super(config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}
项目:ribbon
文件:LoadBalancingHttpClient.java
public Builder<I, O> withPipelineConfigurator(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator) {
this.pipelineConfigurator = pipelineConfigurator;
return this;
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> RxClient<I, O> newTcpClient(ILoadBalancer loadBalancer, PipelineConfigurator<O, I> pipelineConfigurator,
IClientConfig config, RetryHandler retryHandler) {
return new LoadBalancingTcpClient<I, O>(loadBalancer, config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> RxClient<I, O> newTcpClient(PipelineConfigurator<O, I> pipelineConfigurator,
IClientConfig config) {
return new LoadBalancingTcpClient<I, O>(config, getDefaultRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> RxClient<I, O> newUdpClient(ILoadBalancer loadBalancer, PipelineConfigurator<O, I> pipelineConfigurator,
IClientConfig config, RetryHandler retryHandler) {
return new LoadBalancingUdpClient<I, O>(loadBalancer, config, retryHandler, pipelineConfigurator);
}
项目:ribbon
文件:RibbonTransport.java
public static <I, O> RxClient<I, O> newUdpClient(PipelineConfigurator<O, I> pipelineConfigurator, IClientConfig config) {
return new LoadBalancingUdpClient<I, O>(config, getDefaultRetryHandlerWithConfig(config), pipelineConfigurator);
}
项目:ribbon
文件:MyUDPClient.java
public MyUDPClient(IClientConfig config, PipelineConfigurator<DatagramPacket, DatagramPacket> pipelineConfigurator) {
super(config, new MyRetryHandler(config), pipelineConfigurator);
}
项目:RxNetty
文件:HttpClientImpl.java
public HttpClientImpl(ServerInfo serverInfo, Bootstrap clientBootstrap,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator, ClientConfig clientConfig) {
super(serverInfo, clientBootstrap, pipelineConfigurator, clientConfig);
}
项目:RxNetty
文件:HttpServer.java
public HttpServer(ServerBootstrap bootstrap, int port,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> pipelineConfigurator,
RequestHandler<I, O> requestHandler) {
this(bootstrap, port, pipelineConfigurator, new HttpConnectionHandler<I, O>(requestHandler));
}
项目:RxNetty
文件:HttpServer.java
protected HttpServer(ServerBootstrap bootstrap, int port,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> pipelineConfigurator,
HttpConnectionHandler<I, O> connectionHandler) {
super(bootstrap, port, addRequiredConfigurator(pipelineConfigurator), connectionHandler);
this.connectionHandler = connectionHandler;
}
项目:RxNetty
文件:HttpServer.java
private static <I, O> PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> addRequiredConfigurator(
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> pipelineConfigurator) {
return new PipelineConfiguratorComposite<HttpServerRequest<I>, HttpServerResponse<O>>(pipelineConfigurator,
new ServerRequiredConfigurator<I, O>());
}
项目:RxNetty
文件:RxNetty.java
public static <I, O> RxServer<I, O> createTcpServer(final int port, PipelineConfigurator<I, O> pipelineConfigurator,
ConnectionHandler<I, O> connectionHandler) {
return new ServerBuilder<I, O>(port, connectionHandler).pipelineConfigurator(pipelineConfigurator).build();
}
项目:RxNetty
文件:RxNetty.java
public static <I, O> RxClient<I, O> createTcpClient(String host, int port, PipelineConfigurator<O, I> configurator) {
return new ClientBuilder<I, O>(host, port).pipelineConfigurator(configurator).build();
}
项目:RxNetty
文件:RxNetty.java
public static <I, O> HttpServer<I, O> createHttpServer(int port,
RequestHandler<I, O> requestHandler,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> configurator) {
return new HttpServerBuilder<I, O>(port, requestHandler).pipelineConfigurator(configurator).build();
}