Java 类io.reactivex.netty.server.RxServer 实例源码
项目:RxNetty
文件:RemoteObservable.java
private static <T> void serveMany(int port, final Observable<List<Observable<T>>> observable, final Encoder<T> encoder,
boolean startAndWait, SlottingStrategy<T> slottingStrategy, IngressPolicy ingressPolicy){
RxServer<RemoteRxEvent, RemoteRxEvent> server
= RxNetty.createTcpServer(port, new PipelineConfiguratorComposite<RemoteRxEvent, RemoteRxEvent>(
new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){
@Override
public void configureNewPipeline(ChannelPipeline pipeline) {
// pipeline.addFirst(new LoggingHandler(LogLevel.ERROR)); // uncomment to enable debug logging
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 4 bytes to encode length
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4)); // max frame = half MB
}
}, new RxEventPipelineConfigurator()),
new RemoteObservableConnectionHandler<T>(observable, encoder, slottingStrategy, ingressPolicy));
if(startAndWait){
server.startAndWait();
}else{
server.start();
}
}
项目:karyon
文件:KaryonTcpModule.java
@Override
protected void configure() {
configureServer();
bind(serverConfigKey).toInstance(serverConfigBuilder.build());
MapBinder.newMapBinder(binder(), String.class, RxServer.class).addBinding(nameAnnotation.value()).toProvider(
new TcpRxServerProvider<I, O, RxServer<I, O>>(nameAnnotation.value(), iType, oType)
).asEagerSingleton();
}
项目:karyon
文件:KaryonHttpModule.java
@Override
protected void configure() {
configureServer();
bind(serverConfigKey).toInstance(serverConfigBuilder.build());
bind(interceptorSupportKey).toInstance(interceptorSupportInstance);
MapBinder.newMapBinder(binder(), String.class, RxServer.class).addBinding(nameAnnotation.value()).toProvider(
new HttpRxServerProvider<I, O, HttpServer<I, O>>(nameAnnotation.value(), iType, oType)
).asEagerSingleton();
}
项目:karyon
文件:KaryonWebSocketsModule.java
@Override
protected void configure() {
configureServer();
bind(serverConfigKey).toInstance(serverConfigBuilder.build());
MapBinder.newMapBinder(binder(), String.class, RxServer.class).addBinding(nameAnnotation.value()).toProvider(
new WebSocketsRxServerProvider<I, O, RxServer<I, O>>(nameAnnotation.value(), iType, oType)
).asEagerSingleton();
}
项目:karyon
文件:KaryonHttpModuleTest.java
private HttpResponseStatus sendRequest(String path, RxServer server) throws Exception {
return (HttpResponseStatus) RxNetty.createHttpGet("http://localhost:" + server.getServerPort() + path)
.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<?>>() {
@Override
public Observable<HttpResponseStatus> call(HttpClientResponse<ByteBuf> httpClientResponse) {
return Observable.just(httpClientResponse.getStatus());
}
}).single().toBlocking().toFuture().get(60, TimeUnit.SECONDS);
}
项目:karyon
文件:WebSocketEchoServer.java
public static void main(final String[] args) {
RxServer<TextWebSocketFrame, TextWebSocketFrame> webSocketServer = RxNetty.newWebSocketServerBuilder(
8888,
new ConnectionHandler<TextWebSocketFrame, TextWebSocketFrame>() {
@Override
public Observable<Void> handle(final ObservableConnection<TextWebSocketFrame, TextWebSocketFrame> connection) {
return connection.getInput().flatMap(new Func1<WebSocketFrame, Observable<Void>>() {
@Override
public Observable<Void> call(WebSocketFrame wsFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) wsFrame;
System.out.println("Got message: " + textFrame.text());
return connection.writeAndFlush(new TextWebSocketFrame(textFrame.text().toUpperCase()));
}
});
}
}
).build();
Karyon.forWebSocketServer(
webSocketServer,
new KaryonBootstrapModule(),
new ArchaiusBootstrapModule("websocket-echo-server"),
// KaryonEurekaModule.asBootstrapModule(), /* Uncomment if you need eureka */
Karyon.toBootstrapModule(KaryonWebAdminModule.class),
ShutdownModule.asBootstrapModule(),
KaryonServoModule.asBootstrapModule())
.startAndWaitTillShutdown();
}
项目:RxNetty
文件:RxNetty.java
public static <I, O> RxServer<I, O> createTcpServer(final int port, PipelineConfigurator<I, O> pipelineConfigurator,
ConnectionHandler<I, O> connectionHandler) {
return new ServerBuilder<I, O>(port, connectionHandler).pipelineConfigurator(pipelineConfigurator).build();
}
项目:RxNetty
文件:RxNetty.java
public static RxServer<ByteBuf, ByteBuf> createTcpServer(final int port,
ConnectionHandler<ByteBuf, ByteBuf> connectionHandler) {
return new ServerBuilder<ByteBuf, ByteBuf>(port, connectionHandler).build();
}
项目:karyon
文件:KaryonTcpModule.java
protected KaryonTcpModule(String moduleName, Class<I> iType, Class<O> oType) {
super(moduleName, iType, oType);
connectionHandlerKey = keyFor(ConnectionHandler.class, iType, oType, nameAnnotation);
serverKey = keyFor(RxServer.class, iType, oType, nameAnnotation);
}
项目:karyon
文件:KaryonWebSocketsModule.java
protected KaryonWebSocketsModule(String moduleName, Class<I> iType, Class<O> oType) {
super(moduleName, iType, oType);
connectionHandlerKey = keyFor(ConnectionHandler.class, iType, oType, nameAnnotation);
serverKey = keyFor(RxServer.class, iType, oType, nameAnnotation);
}
项目:karyon
文件:Karyon.java
/**
* Creates a new {@link KaryonServer} that has a single TCP server instance which delegates all connection
* handling to {@link ConnectionHandler}.
* The {@link RxServer} is created using {@link RxNetty#newTcpServerBuilder(int, ConnectionHandler)}
*
* @param port Port for the server.
* @param handler Connection Handler
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forTcpConnectionHandler(int port, ConnectionHandler<ByteBuf, ByteBuf> handler,
BootstrapModule... bootstrapModules) {
RxServer<ByteBuf, ByteBuf> server = RxNetty.newTcpServerBuilder(port, handler).build();
return new RxNettyServerBackedServer(server, bootstrapModules);
}
项目:karyon
文件:Karyon.java
/**
* Creates a new {@link KaryonServer} which combines lifecycle of the passed {@link RxServer} with
* it's own lifecycle.
*
* @param server TCP server
* @param modules Additional modules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forTcpServer(RxServer<?, ?> server, Module... modules) {
return forTcpServer(server, toBootstrapModule(modules));
}
项目:karyon
文件:Karyon.java
/**
* Creates a new {@link KaryonServer} which combines lifecycle of the passed {@link RxServer} with
* it's own lifecycle.
*
* @param server TCP server
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forTcpServer(RxServer<?, ?> server, BootstrapModule... bootstrapModules) {
return new RxNettyServerBackedServer(server, bootstrapModules);
}
项目:karyon
文件:Karyon.java
/**
* Creates a new {@link KaryonServer} which combines lifecycle of the passed WebSockets {@link RxServer} with
* it's own lifecycle.
*
* @param server WebSocket server
* @param modules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forWebSocketServer(RxServer<? extends WebSocketFrame, ? extends WebSocketFrame> server, Module... modules) {
return forWebSocketServer(server, toBootstrapModule(modules));
}
项目:karyon
文件:Karyon.java
/**
* Creates a new {@link KaryonServer} which combines lifecycle of the passed WebSockets {@link RxServer} with
* it's own lifecycle.
*
* @param server WebSocket server
* @param bootstrapModules Additional bootstrapModules if any.
*
* @return {@link KaryonServer} which is to be used to start the created server.
*/
public static KaryonServer forWebSocketServer(RxServer<? extends WebSocketFrame, ? extends WebSocketFrame> server,
BootstrapModule... bootstrapModules) {
return new RxNettyServerBackedServer(server, bootstrapModules);
}