@Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { if (e.getState() == IdleState.WRITER_IDLE) { Channel channel = e.getChannel(); if (channel != null) { //写空闲 则发送心跳数据 channel.write(Heartbeat.getSingleton()); } else { logger.warn("writer idle on channel({}), but hsfChannel is not managed.", e.getChannel()); } } else if (e.getState() == IdleState.READER_IDLE) { //读空闲 则断开连接 logger.error("channel:{} is time out.", e.getChannel()); handleUpstream(ctx, new DefaultExceptionEvent(e.getChannel(), new SocketTimeoutException( "force to close channel(" + ctx.getChannel().getRemoteAddress() + "), reason: time out."))); e.getChannel().close(); } super.channelIdle(ctx, e); }
@Test public void shouldFireExceptionIfConnectEventCannotBeForwarded() throws Exception { setupPipelineToRunFireEventLaterInline(); // setup the ctx to throw an exception when we attempt to send the connected event downstream IllegalStateException resolveFailureCause = new IllegalStateException("downstream event failed"); doThrow(resolveFailureCause).when(ctx).sendDownstream(any(DownstreamChannelStateEvent.class)); // initiate the connect request DefaultChannelFuture originalEventFuture = new DefaultChannelFuture(channel, false); ChannelStateEvent event = new DownstreamChannelStateEvent( channel, originalEventFuture, ChannelState.CONNECTED, createUnresolvedRemoteAddress() ); handler.connectRequested(ctx, event); // firing the connect downstream should cause a "fireExceptionCaughtLater" call ArgumentCaptor<DefaultExceptionEvent> exceptionEventCaptor = ArgumentCaptor.forClass(DefaultExceptionEvent.class); verify(ctx).sendUpstream(exceptionEventCaptor.capture()); DefaultExceptionEvent exceptionEvent = exceptionEventCaptor.getValue(); assertThat(exceptionEvent.getCause(), Matchers.<Throwable>is(resolveFailureCause)); }
@Override public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) throws Exception { if (((e instanceof DefaultExceptionEvent) || (e instanceof ConnectException)) && (e.getCause() != null) && (e.getCause().getMessage() != null)) { log.warn("Connection timed out"); log.warn("Exception caught: ", e.getCause()); } else { log.warn("Exception caught: ", e.getCause()); } ctx.sendUpstream(e); }
@Test public void shouldFireExceptionIfAddressResolutionTaskFails() throws Exception { setupPipelineToRunFireEventLaterInline(); // setup the address resolver to throw an exception final IllegalStateException resolveFailureCause = new IllegalStateException("simulate resolve failure"); AddressResolverHandler.ResolvedAddressProvider failingResolver = new AddressResolverHandler.ResolvedAddressProvider() { @Override public InetSocketAddress createResolved(InetSocketAddress unresolvedAddress) throws Exception { throw resolveFailureCause; } }; // fire a connect request with an unresolved address AddressResolverHandler failingHandler = new AddressResolverHandler(MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()), failingResolver); DefaultChannelFuture originalEventFuture = new DefaultChannelFuture(channel, false); ChannelStateEvent event = new DownstreamChannelStateEvent( channel, originalEventFuture, ChannelState.CONNECTED, createUnresolvedRemoteAddress() ); failingHandler.connectRequested(ctx, event); // this should cause a "fireExceptionCaughtLater" call ArgumentCaptor<DefaultExceptionEvent> exceptionEventCaptor = ArgumentCaptor.forClass(DefaultExceptionEvent.class); verify(ctx).sendUpstream(exceptionEventCaptor.capture()); DefaultExceptionEvent exceptionEvent = exceptionEventCaptor.getValue(); assertThat(exceptionEvent.getCause(), Matchers.<Throwable>is(resolveFailureCause)); }
@Test public void shouldCloseChannelAndNotForwardEventWhenReceiveExceptionCaughtEvent() throws Exception { ExceptionEvent exceptionEvent = new DefaultExceptionEvent(channel, new Exception("test exception")); handler.exceptionCaught(ctx, exceptionEvent); verifyNoMoreInteractions(ctx); verify(channel).close(); }