Java 类io.netty.channel.ChannelConfig 实例源码
项目:netty4.0.27Learn
文件:ServerBootstrap.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
项目:netty4study
文件:ServerBootstrap.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
项目:netty-netty-5.0.0.Alpha1
文件:ServerBootstrap.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
项目:jooby
文件:NettyWebSocketTest.java
@SuppressWarnings("unchecked")
@Test
public void resume() throws Exception {
new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class)
.expect(unit -> {
ChannelConfig chconf = unit.mock(ChannelConfig.class);
expect(chconf.isAutoRead()).andReturn(false);
expect(chconf.setAutoRead(true)).andReturn(chconf);
Channel ch = unit.mock(Channel.class);
expect(ch.config()).andReturn(chconf);
ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class);
expect(ctx.channel()).andReturn(ch);
})
.run(unit -> {
new NettyWebSocket(
unit.get(ChannelHandlerContext.class),
unit.get(WebSocketServerHandshaker.class),
unit.get(Consumer.class)).resume();
});
}
项目:jooby
文件:NettyWebSocketTest.java
@SuppressWarnings("unchecked")
@Test
public void resumeIgnored() throws Exception {
new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class)
.expect(unit -> {
ChannelConfig chconf = unit.mock(ChannelConfig.class);
expect(chconf.isAutoRead()).andReturn(true);
Channel ch = unit.mock(Channel.class);
expect(ch.config()).andReturn(chconf);
ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class);
expect(ctx.channel()).andReturn(ch);
})
.run(unit -> {
new NettyWebSocket(
unit.get(ChannelHandlerContext.class),
unit.get(WebSocketServerHandshaker.class),
unit.get(Consumer.class)).resume();
});
}
项目:jooby
文件:NettyWebSocketTest.java
@SuppressWarnings("unchecked")
@Test
public void pause() throws Exception {
new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class)
.expect(unit -> {
ChannelConfig chconf = unit.mock(ChannelConfig.class);
expect(chconf.isAutoRead()).andReturn(true);
expect(chconf.setAutoRead(false)).andReturn(chconf);
Channel ch = unit.mock(Channel.class);
expect(ch.config()).andReturn(chconf);
ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class);
expect(ctx.channel()).andReturn(ch);
})
.run(unit -> {
new NettyWebSocket(
unit.get(ChannelHandlerContext.class),
unit.get(WebSocketServerHandshaker.class),
unit.get(Consumer.class)).pause();
});
}
项目:jooby
文件:NettyWebSocketTest.java
@SuppressWarnings("unchecked")
@Test
public void pauseIgnored() throws Exception {
new MockUnit(ChannelHandlerContext.class, WebSocketServerHandshaker.class, Consumer.class)
.expect(unit -> {
ChannelConfig chconf = unit.mock(ChannelConfig.class);
expect(chconf.isAutoRead()).andReturn(false);
Channel ch = unit.mock(Channel.class);
expect(ch.config()).andReturn(chconf);
ChannelHandlerContext ctx = unit.get(ChannelHandlerContext.class);
expect(ctx.channel()).andReturn(ch);
})
.run(unit -> {
new NettyWebSocket(
unit.get(ChannelHandlerContext.class),
unit.get(WebSocketServerHandshaker.class),
unit.get(Consumer.class)).pause();
});
}
项目:grpc-java
文件:NettyClientTransportTest.java
@Test
public void setSoLingerChannelOption() throws IOException {
startServer();
Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
// set SO_LINGER option
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
tooManyPingsRunnable, new TransportTracer());
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
// verify SO_LINGER has been set
ChannelConfig config = transport.channel().config();
assertTrue(config instanceof SocketChannelConfig);
assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
}
项目:ServiceCOLDCache
文件:NettyRequestProxyFilter.java
private void setBufferSizeIfConfigIsSocketChannelConfig(
ChannelConfig config, long contentLength) {
if (config instanceof SocketChannelConfig) {
int sendBufferSize = contentLength < m_maxSendBufferSize ? (int) contentLength
: m_maxSendBufferSize;
((SocketChannelConfig) config).setSendBufferSize(sendBufferSize);
}
}
项目:netty4.0.27Learn
文件:AbstractTrafficShapingHandler.java
@Override
public void run() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
ctx.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsupsend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
项目:reactive-ipc-jvm
文件:CodecSample.java
private static void runLineBasedFrameDecoder() {
TcpServer<String, String> transport = Netty4TcpServer.<String, String>create(
0,
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
int bufferSize = 1;
ChannelConfig config = channel.config();
config.setOption(ChannelOption.SO_RCVBUF, bufferSize);
config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize));
channel.pipeline().addFirst(
new LineBasedFrameDecoder(256),
new StringDecoder(CharsetUtil.UTF_8),
new StringEncoder(CharsetUtil.UTF_8));
}
});
ReactorTcpServer.create(transport).start(connection -> {
connection.log("input")
.observeComplete(v -> LOG.info("Connection input complete"))
.capacity(1)
.consume(line -> {
String response = "Hello " + line + "\n";
Streams.wrap(connection.writeWith(Streams.just(response))).consume();
});
return Streams.never();
});
}
项目:docker-java
文件:NettyDockerCmdExecFactory.java
private <T extends Channel> T configure(T channel) {
ChannelConfig channelConfig = channel.config();
if (connectTimeout != null) {
channelConfig.setConnectTimeoutMillis(connectTimeout);
}
return channel;
}
项目:docker-plugin
文件:NettyDockerCmdExecFactory.java
private <T extends Channel> T configure(T channel) {
ChannelConfig channelConfig = channel.config();
if (connectTimeout != null) {
channelConfig.setConnectTimeoutMillis(connectTimeout);
}
// START of new readTimeout code
if (readTimeout != null) {
channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler());
}
// END of new readTimeout code
return channel;
}
项目:jooby
文件:NettyWebSocket.java
@Override
public void resume() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead()) {
config.setAutoRead(true);
}
}
项目:jooby
文件:NettyWebSocket.java
@Override
public void pause() {
ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
config.setAutoRead(false);
}
}
项目:elasticsearch_my
文件:Netty4HttpChannelTests.java
@Override
public ChannelConfig config() {
return null;
}
项目:UdpServerSocketChannel
文件:UdpChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:HeliosStreams
文件:InvocationChannel.java
@Override
public ChannelConfig config() {
// TODO Auto-generated method stub
return null;
}
项目:AnnihilationPro
文件:NullChannel.java
@Override
public ChannelConfig config() {
return null;
}
项目:AnnihilationPro
文件:NullChannel.java
@Override
public ChannelConfig config() {
return null;
}
项目:netty4.0.27Learn
文件:LocalChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty4.0.27Learn
文件:LocalServerChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty4.0.27Learn
文件:EmbeddedChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty4.0.27Learn
文件:AbstractOioMessageChannel.java
@Override
protected void doRead() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
Throwable exception = null;
int localRead = 0;
int totalRead = 0;
try {
for (;;) {
// Perform a read.
localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// Notify with the received messages and clear the buffer.
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// Do not read beyond maxMessagesPerRead.
// Do not continue reading if autoRead has been turned off.
totalRead += localRead;
if (totalRead >= maxMessagesPerRead || !config.isAutoRead()) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
closed = true;
}
pipeline().fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
unsafe().close(unsafe().voidPromise());
}
} else if (localRead == 0 && isActive()) {
// If the read amount was 0 and the channel is still active we need to trigger a new read()
// as otherwise we will never try to read again and the user will never know.
// Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are
// able to process the rest of the tasks in the queue first.
//
// See https://github.com/netty/netty/issues/2404
read();
}
}
项目:netty4study
文件:LocalChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty4study
文件:LocalServerChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty4study
文件:EmbeddedChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty4study
文件:AbstractNioByteChannel.java
@Override
public void read() {
//得到配置信息
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
byteBuf = allocator.ioBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
//触发fireChannelRead事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
项目:bgpcep
文件:BGPPeerAcceptorImpl.java
PeerRegistryListenerImpl(final ChannelConfig channelConfig) {
this.channelConfig = channelConfig;
this.keys = KeyMapping.getKeyMapping();
}
项目:netty-netty-5.0.0.Alpha1
文件:LocalChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty-netty-5.0.0.Alpha1
文件:LocalServerChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty-netty-5.0.0.Alpha1
文件:EmbeddedChannel.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty-netty-5.0.0.Alpha1
文件:AbstractNioByteChannel.java
@Override
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
byteBuf = allocator.ioBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
项目:onos
文件:ChannelAdapter.java
@Override
public ChannelConfig config() {
return null;
}
项目:onos
文件:ChannelAdapter.java
@Override
public ChannelConfig config() {
return null;
}
项目:ambry
文件:NettyRequestTest.java
@Override
public ChannelConfig config() {
return config;
}
项目:netty-xnio-transport
文件:AbstractXnioSocketChannel.java
@Override
public void handleEvent(ConduitStreamSourceChannel channel) {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
byteBuf = allocator.ioBuffer(byteBufCapacity);
int writable = byteBuf.writableBytes();
int localReadAmount = byteBuf.writeBytes(channel, byteBuf.writableBytes());
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
close = localReadAmount < 0;
break;
}
((AbstractXnioUnsafe) unsafe()).readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
// stop reading
if (!config.isAutoRead()) {
break;
}
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead();
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!config.isAutoRead() && !((AbstractXnioUnsafe) unsafe()).readPending) {
removeReadOp(channel);
}
}
}