public void init(ChannelPipelineFactory pipeline, int workerNum) { ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); DefaultChannelFuture.setUseDeadLockChecker(false); pipelineFactory = pipeline; bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(pipelineFactory); // TODO - should be configurable bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); bootstrap.setOption("child.connectTimeoutMillis", 10000); bootstrap.setOption("child.connectResponseTimeoutMillis", 10000); bootstrap.setOption("child.receiveBufferSize", 1048576 * 10); }
@Test public void shouldResolveAddressAndForwardEventWhenConnectIsCalledWithAnUnresolvedInetSocketAddress() throws Exception { DefaultChannelFuture originalEventFuture = new DefaultChannelFuture(channel, false); ChannelStateEvent event = new DownstreamChannelStateEvent( channel, originalEventFuture, ChannelState.CONNECTED, createUnresolvedRemoteAddress() ); handler.connectRequested(ctx, event); ArgumentCaptor<DownstreamChannelStateEvent> eventCaptor = ArgumentCaptor.forClass(DownstreamChannelStateEvent.class); verify(ctx).sendDownstream(eventCaptor.capture()); DownstreamChannelStateEvent forwardedEvent = eventCaptor.getValue(); assertThat(forwardedEvent.getChannel(), equalTo(channel)); assertThat(forwardedEvent.getState(), equalTo(ChannelState.CONNECTED)); assertThat(forwardedEvent.getFuture(), Matchers.<ChannelFuture>equalTo(originalEventFuture)); InetSocketAddress forwardedAddress = (InetSocketAddress) forwardedEvent.getValue(); assertThat(forwardedAddress.isUnresolved(), equalTo(false)); }
@Test public void shouldFireExceptionIfConnectEventCannotBeForwarded() throws Exception { setupPipelineToRunFireEventLaterInline(); // setup the ctx to throw an exception when we attempt to send the connected event downstream IllegalStateException resolveFailureCause = new IllegalStateException("downstream event failed"); doThrow(resolveFailureCause).when(ctx).sendDownstream(any(DownstreamChannelStateEvent.class)); // initiate the connect request DefaultChannelFuture originalEventFuture = new DefaultChannelFuture(channel, false); ChannelStateEvent event = new DownstreamChannelStateEvent( channel, originalEventFuture, ChannelState.CONNECTED, createUnresolvedRemoteAddress() ); handler.connectRequested(ctx, event); // firing the connect downstream should cause a "fireExceptionCaughtLater" call ArgumentCaptor<DefaultExceptionEvent> exceptionEventCaptor = ArgumentCaptor.forClass(DefaultExceptionEvent.class); verify(ctx).sendUpstream(exceptionEventCaptor.capture()); DefaultExceptionEvent exceptionEvent = exceptionEventCaptor.getValue(); assertThat(exceptionEvent.getCause(), Matchers.<Throwable>is(resolveFailureCause)); }
@Test public void shouldSimplyForwardEventWhenConnectIsCalledWithAResolvedInetSocketAddress() throws Exception { ChannelStateEvent event = new DownstreamChannelStateEvent( channel, new DefaultChannelFuture(channel, false), ChannelState.CONNECTED, new InetSocketAddress("localhost", 9999) ); handler.connectRequested(ctx, event); verify(ctx).sendDownstream(refEq(event)); }
@Test public void shouldSimplyForwardEventWhenConnectIsCalledWithNonInetSocketAddress() throws Exception { ChannelStateEvent event = new DownstreamChannelStateEvent( channel, new DefaultChannelFuture(channel, false), ChannelState.CONNECTED, new LocalAddress(LocalAddress.EPHEMERAL) ); handler.connectRequested(ctx, event); verify(ctx).sendDownstream(refEq(event)); }
@Test public void shouldFireExceptionIfAddressResolutionTaskFails() throws Exception { setupPipelineToRunFireEventLaterInline(); // setup the address resolver to throw an exception final IllegalStateException resolveFailureCause = new IllegalStateException("simulate resolve failure"); AddressResolverHandler.ResolvedAddressProvider failingResolver = new AddressResolverHandler.ResolvedAddressProvider() { @Override public InetSocketAddress createResolved(InetSocketAddress unresolvedAddress) throws Exception { throw resolveFailureCause; } }; // fire a connect request with an unresolved address AddressResolverHandler failingHandler = new AddressResolverHandler(MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()), failingResolver); DefaultChannelFuture originalEventFuture = new DefaultChannelFuture(channel, false); ChannelStateEvent event = new DownstreamChannelStateEvent( channel, originalEventFuture, ChannelState.CONNECTED, createUnresolvedRemoteAddress() ); failingHandler.connectRequested(ctx, event); // this should cause a "fireExceptionCaughtLater" call ArgumentCaptor<DefaultExceptionEvent> exceptionEventCaptor = ArgumentCaptor.forClass(DefaultExceptionEvent.class); verify(ctx).sendUpstream(exceptionEventCaptor.capture()); DefaultExceptionEvent exceptionEvent = exceptionEventCaptor.getValue(); assertThat(exceptionEvent.getCause(), Matchers.<Throwable>is(resolveFailureCause)); }
@Override public ChannelFuture getFuture() { return new DefaultChannelFuture(channel, false); }