BookKeeperClient(DistributedLogConfiguration conf, String name, String zkServers, ZooKeeperClient zkc, String ledgersPath, ClientSocketChannelFactory channelFactory, HashedWheelTimer requestTimer, StatsLogger statsLogger, Optional<FeatureProvider> featureProvider) { this.conf = conf; this.name = name; this.zkServers = zkServers; this.ledgersPath = ledgersPath; this.passwd = conf.getBKDigestPW().getBytes(UTF_8); this.channelFactory = channelFactory; this.requestTimer = requestTimer; this.statsLogger = statsLogger; this.featureProvider = featureProvider; this.ownZK = null == zkc; if (null != zkc) { // reference the passing zookeeper client this.zkc = zkc; } }
public synchronized void start() { final Executor bossPool = Executors.newCachedThreadPool(); final Executor workerPool = Executors.newCachedThreadPool(); bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(bossPool, workerPool)); final ClientSocketChannelFactory clientSocketChannelFactory = new NioClientSocketChannelFactory(bossPool, workerPool); bootstrap.setOption("child.tcpNoDelay", true); allChannels = new DefaultChannelGroup("handler"); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new FrontendHandler(allChannels, clientSocketChannelFactory, serverPool, statistics)); } }); log.info("Starting on port {}", port); acceptor = bootstrap.bind(new InetSocketAddress(port)); if (acceptor.isBound()) { log.info("Server started successfully"); } }
public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) { this.uri = uri; this.file = file; String scheme = uri.getScheme() == null ? "http" : uri.getScheme(); this.host = uri.getHost() == null ? "localhost" : uri.getHost(); this.port = uri.getPort(); if (port == -1) { if (scheme.equalsIgnoreCase("http")) { this.port = 80; } else if (scheme.equalsIgnoreCase("https")) { this.port = 443; } } bootstrap = new ClientBootstrap(factory); bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec bootstrap.setOption("receiveBufferSize", 1048576); // set 1M bootstrap.setOption("tcpNoDelay", true); ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file); bootstrap.setPipelineFactory(pipelineFactory); }
BlockingRpcClient(final Class<?> protocol, final InetSocketAddress addr, ClientSocketChannelFactory factory) throws Exception { this.protocol = protocol; String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; Class<?> serviceClass = Class.forName(serviceClassName); stubMethod = serviceClass.getMethod("newBlockingStub", BlockingRpcChannel.class); this.handler = new ClientChannelUpstreamHandler(); pipeFactory = new ProtoPipelineFactory(handler, RpcResponse.getDefaultInstance()); super.init(addr, pipeFactory, factory); rpcChannel = new ProxyRpcChannel(); this.key = new RpcConnectionKey(addr, protocol, false); }
AsyncRpcClient(final Class<?> protocol, final InetSocketAddress addr, ClientSocketChannelFactory factory) throws Exception { this.protocol = protocol; String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; Class<?> serviceClass = Class.forName(serviceClassName); stubMethod = serviceClass.getMethod("newStub", RpcChannel.class); this.handler = new ClientChannelUpstreamHandler(); pipeFactory = new ProtoPipelineFactory(handler, RpcResponse.getDefaultInstance()); super.init(addr, pipeFactory, factory); rpcChannel = new ProxyRpcChannel(); this.key = new RpcConnectionKey(addr, protocol, true); }
public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) { name = name + "-" + clientCount.incrementAndGet(); if(LOG.isDebugEnabled()){ LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum); } ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build(); ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build(); NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1, new HashedWheelTimer(), ThreadNameDeterminer.CURRENT); NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT); return new NioClientSocketChannelFactory(bossPool, workerPool); }
public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory) throws IOException { try { this.bootstrap = new ClientBootstrap(factory); this.bootstrap.setPipelineFactory(pipeFactory); // TODO - should be configurable this.bootstrap.setOption("connectTimeoutMillis", 10000); this.bootstrap.setOption("connectResponseTimeoutMillis", 10000); this.bootstrap.setOption("receiveBufferSize", 1048576 * 10); this.bootstrap.setOption("tcpNoDelay", true); this.bootstrap.setOption("keepAlive", true); connect(addr); } catch (Throwable t) { close(); throw new IOException(t.getCause()); } }
public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory) throws IOException { try { this.bootstrap = new ClientBootstrap(factory); this.bootstrap.setPipelineFactory(pipeFactory); // TODO - should be configurable this.bootstrap.setOption("connectTimeoutMillis", 10000); this.bootstrap.setOption("connectResponseTimeoutMillis", 10000); this.bootstrap.setOption("receiveBufferSize", 1048576 * 10); this.bootstrap.setOption("tcpNoDelay", true); this.bootstrap.setOption("keepAlive", true); connect(addr); } catch (Throwable t) { close(); throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause()); } }
/** * Creates a new HttpClientFactory. * * @param filters the filter chain shared by all Clients created by this factory * @param channelFactory the ClientSocketChannelFactory that all Clients created by this * factory will share * @param shutdownFactory if true, the channelFactory will be shut down when this * factory is shut down * @param executor an executor shared by all Clients created by this factory to schedule * tasks * @param shutdownExecutor if true, the executor will be shut down when this factory is * shut down * @param callbackExecutor an optional executor to invoke user callbacks that otherwise * will be invoked by scheduler executor. * @param shutdownCallbackExecutor if true, the callback executor will be shut down when * this factory is shut down */ public HttpClientFactory(FilterChain filters, ClientSocketChannelFactory channelFactory, boolean shutdownFactory, ScheduledExecutorService executor, boolean shutdownExecutor, ExecutorService callbackExecutor, boolean shutdownCallbackExecutor) { this(filters, channelFactory, shutdownFactory, executor, shutdownExecutor, callbackExecutor, shutdownCallbackExecutor, NULL_JMX_MANAGER); }
public HttpClientFactory(FilterChain filters, ClientSocketChannelFactory channelFactory, boolean shutdownFactory, ScheduledExecutorService executor, boolean shutdownExecutor, ExecutorService callbackExecutor, boolean shutdownCallbackExecutor, AbstractJmxManager jmxManager) { _filters = filters; _channelFactory = channelFactory; _shutdownFactory = shutdownFactory; _executor = executor; _shutdownExecutor = shutdownExecutor; _callbackExecutor = callbackExecutor; _shutdownCallbackExecutor = shutdownCallbackExecutor; _jmxManager = jmxManager; }
/** * Creates a new HttpNettyClient with some default parameters * * @see #HttpNettyClient(ClientSocketChannelFactory,ScheduledExecutorService,int,int,int,int,int,SSLContext,SSLParameters,int,ExecutorService,int) */ public HttpNettyClient(ClientSocketChannelFactory factory, ScheduledExecutorService executor, int poolSize, int requestTimeout, int idleTimeout, int shutdownTimeout, int maxResponseSize) { this(factory, executor, poolSize, requestTimeout, idleTimeout, shutdownTimeout, maxResponseSize, null, null, Integer.MAX_VALUE, executor, Integer.MAX_VALUE); }
/** * Tests that even when the factory is shutdown with a long timeout, it does not occupy * any executors with tasks that might prevent them shutting down properly. * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ @Test public void testShutdownTimeoutDoesNotOccupyExecutors() throws InterruptedException, ExecutionException, TimeoutException { ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(boss, worker); HttpClientFactory factory = new HttpClientFactory(FilterChains.empty(), channelFactory, false, scheduler, false); FutureCallback<None> callback = new FutureCallback<None>(); factory.shutdown(callback, 60, TimeUnit.MINUTES); callback.get(60, TimeUnit.SECONDS); scheduler.shutdown(); channelFactory.releaseExternalResources(); Assert.assertTrue(scheduler.awaitTermination(60, TimeUnit.SECONDS)); Assert.assertTrue(boss.awaitTermination(60, TimeUnit.SECONDS)); Assert.assertTrue(worker.awaitTermination(60, TimeUnit.SECONDS)); }
public static void main(String[] args) throws Exception { Executor executor = Executors.newCachedThreadPool(); ChannelFactory factory = new NioServerSocketChannelFactory(executor, executor); ServerBootstrap sb = new ServerBootstrap(factory); ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor); sb.setPipelineFactory(new ProxyPipelineFactory(cf, RtmpConfig.PROXY_REMOTE_HOST, RtmpConfig.PROXY_REMOTE_PORT)); InetSocketAddress socketAddress = new InetSocketAddress(RtmpConfig.PROXY_PORT); sb.bind(socketAddress); logger.info("proxy server started, listening on {}", socketAddress); Thread monitor = new StopMonitor(RtmpConfig.PROXY_STOP_PORT); monitor.start(); monitor.join(); ChannelGroupFuture future = ALL_CHANNELS.close(); logger.info("closing channels"); future.awaitUninterruptibly(); logger.info("releasing resources"); factory.releaseExternalResources(); logger.info("server stopped"); }
/** * Create a new client that connects to the supplied host and port. Connection * attempts and non-blocking commands will {@link #setDefaultTimeout timeout} * after 60 seconds. * * @param host Server hostname. * @param port Server port. */ public RedisClient(String host, int port) { ExecutorService connectors = Executors.newFixedThreadPool(1); ExecutorService workers = Executors.newCachedThreadPool(); ClientSocketChannelFactory factory = new NioClientSocketChannelFactory(connectors, workers); InetSocketAddress addr = new InetSocketAddress(host, port); bootstrap = new ClientBootstrap(factory); bootstrap.setOption("remoteAddress", addr); setDefaultTimeout(60, TimeUnit.SECONDS); channels = new DefaultChannelGroup(); timer = new HashedWheelTimer(); }
public static void main(String[] args) throws Exception { // Validate command line options. if (args.length != 3) { System.err.println("Usage: " + HexDumpProxy.class.getSimpleName() + " <local port> <remote host> <remote port>"); return; } // Parse command line options. int localPort = Integer.parseInt(args[0]); String remoteHost = args[1]; int remotePort = Integer.parseInt(args[2]); System.err.println("Proxying *:" + localPort + " to " + remoteHost + ':' + remotePort + " ..."); // Configure the bootstrap. Executor executor = Executors.newCachedThreadPool(); ServerBootstrap sb = new ServerBootstrap(new NioServerSocketChannelFactory(executor, executor)); // Set up the event pipeline factory. ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor); sb.setPipelineFactory(new HexDumpProxyPipelineFactory(cf, remoteHost, remotePort)); // Start up the server. sb.bind(new InetSocketAddress(localPort)); }
public static void main(String[] args) throws Exception { // Validate command line options. if (args.length != 3) { System.err.println( "Usage: " + TextDumpProxy.class.getSimpleName() + " <local port> <remote host> <remote port>"); return; } // Parse command line options. int localPort = Integer.parseInt(args[0]); String remoteHost = args[1]; int remotePort = Integer.parseInt(args[2]); System.err.println( "Proxying *:" + localPort + " to " + remoteHost + ':' + remotePort + " ..."); // Configure the bootstrap. Executor executor = Executors.newCachedThreadPool(); ServerBootstrap sb = new ServerBootstrap(new NioServerSocketChannelFactory(executor, executor)); // Set up the event pipeline factory. ClientSocketChannelFactory cf = new NioClientSocketChannelFactory(executor, executor); sb.setPipelineFactory(new TextDumpProxyPipelineFactory(cf, remoteHost, remotePort)); // Start up the server. sb.bind(new InetSocketAddress(localPort)); }
public HttpTunnelSoakTester() { scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); executor = Executors.newCachedThreadPool(); ServerSocketChannelFactory serverChannelFactory = new NioServerSocketChannelFactory( executor, executor); HttpTunnelServerChannelFactory serverTunnelFactory = new HttpTunnelServerChannelFactory( serverChannelFactory); serverBootstrap = new ServerBootstrap(serverTunnelFactory); serverBootstrap.setPipelineFactory(createServerPipelineFactory()); ClientSocketChannelFactory clientChannelFactory = new NioClientSocketChannelFactory( executor, executor); HttpTunnelClientChannelFactory clientTunnelFactory = new HttpTunnelClientChannelFactory( clientChannelFactory); clientBootstrap = new ClientBootstrap(clientTunnelFactory); clientBootstrap.setPipelineFactory(createClientPipelineFactory()); configureProxy(); channels = new DefaultChannelGroup(); }
private static ClientSocketChannelFactory getClientSocketChannelFactory(boolean nio) { if ( nio) return new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); else return new OioClientSocketChannelFactory( Executors.newCachedThreadPool()); }
public ClientConnectionImpl(ExecutorService connectService, ClientSocketChannelFactory socketChannelFactory, Timer timeoutTimer) { super(); this.connectService = connectService; this.socketChannelFactory = socketChannelFactory; this.timeoutTimer = timeoutTimer; }
public ClientConnectionFactoryImpl(Timer timeoutTimer, ClientSocketChannelFactory socketChannelFactory, long connectEstablishTimeout, long sendTimeOut, Protocol protocol) { super(); this.timeoutTimer = timeoutTimer; this.socketChannelFactory = socketChannelFactory; this.connectEstablishTimeout = connectEstablishTimeout; this.sendTimeOut = sendTimeOut; this.protocol = protocol; }
public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding) { this.addr = addr; this.executor = executor; this.totalBytesOutstanding = totalBytesOutstanding; this.channelFactory = channelFactory; connect(channelFactory); }
public NetworkFailureHandler( AtomicBoolean blocked, Consumer<NetworkFailureHandler> onClose, ClientSocketChannelFactory channelFactory, String remoteHost, int remotePort) { this.blocked = blocked; this.onClose = onClose; this.channelFactory = channelFactory; this.remoteHost = remoteHost; this.remotePort = remotePort; }
public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) { // Configure the bootstrap. serverBootstrap = new ServerBootstrap( new NioServerSocketChannelFactory(executor, executor)); // Set up the event pipeline factory. ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(executor, executor); serverBootstrap.setOption("child.tcpNoDelay", true); serverBootstrap.setOption("child.keepAlive", true); serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); // synchronized for a race between blocking and creating new handlers synchronized (networkFailureHandlers) { NetworkFailureHandler failureHandler = new NetworkFailureHandler( blocked, networkFailureHandler -> networkFailureHandlers.remove(networkFailureHandler), channelFactory, remoteHost, remotePort); networkFailureHandlers.add(failureHandler); pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler); } return pipeline; } }); channel = serverBootstrap.bind(new InetSocketAddress(localPort)); LOG.info("Proxying [*:{}] to [{}:{}]", getLocalPort(), remoteHost, remotePort); }
@Test public void testGet() throws IOException { Random rnd = new Random(); FileWriter writer = new FileWriter(INPUT_DIR + "data"); String data; for (int i = 0; i < 100; i++) { data = ""+rnd.nextInt(); writer.write(data); } writer.flush(); writer.close(); DataRetriever ret = new DirectoryRetriever(INPUT_DIR); HttpDataServer server = new HttpDataServer( NetUtils.createSocketAddr("127.0.0.1:0"), ret); server.start(); InetSocketAddress addr = server.getBindAddress(); URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data"); ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1); Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory); fetcher.get(); server.stop(); FileSystem fs = FileSystem.getLocal(new TajoConf()); FileStatus inStatus = fs.getFileStatus(new Path(INPUT_DIR, "data")); FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data")); assertEquals(inStatus.getLen(), outStatus.getLen()); }
/** * make this factory static thus all clients can share its thread pool. * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe */ public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){ //shared woker and boss pool if(factory == null){ TajoConf conf = new TajoConf(); int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM); factory = createClientChannelFactory("Internal-Client", workerNum); } return factory; }
/** * Creates a new HttpClientFactory. * * @param filters the filter chain shared by all Clients created by this factory * @param channelFactory the ClientSocketChannelFactory that all Clients created by this * factory will share * @param shutdownFactory if true, the channelFactory will be shut down when this * factory is shut down * @param executor an executor shared by all Clients created by this factory to schedule * tasks * @param shutdownExecutor if true, the executor will be shut down when this factory is * shut down */ public HttpClientFactory(FilterChain filters, ClientSocketChannelFactory channelFactory, boolean shutdownFactory, ScheduledExecutorService executor, boolean shutdownExecutor) { this(filters, channelFactory, shutdownFactory, executor, shutdownExecutor, executor, false); }
/** * Creates a new HttpNettyClient * * @param factory The ClientSocketChannelFactory; it is the caller's responsibility to * shut it down * @param executor an executor; it is the caller's responsibility to shut it down * @param poolSize Maximum size of the underlying HTTP connection pool * @param requestTimeout timeout, in ms, to get a connection from the pool or create one * @param idleTimeout interval after which idle connections will be automatically closed * @param shutdownTimeout timeout, in ms, the client should wait after shutdown is * initiated before terminating outstanding requests * @param maxResponseSize * @param sslContext {@link SSLContext} * @param sslParameters {@link SSLParameters}with overloaded construct * @param queryPostThreshold length of query params above which requests will be tunneled as POSTS * @param callbackExecutor an optional executor to invoke user callback * @param poolWaiterSize Maximum waiters waiting on the HTTP connection pool */ public HttpNettyClient(ClientSocketChannelFactory factory, ScheduledExecutorService executor, int poolSize, int requestTimeout, int idleTimeout, int shutdownTimeout, int maxResponseSize, SSLContext sslContext, SSLParameters sslParameters, int queryPostThreshold, ExecutorService callbackExecutor, int poolWaiterSize) { this(factory, executor, poolSize, requestTimeout, idleTimeout, shutdownTimeout, maxResponseSize, sslContext, sslParameters, queryPostThreshold, callbackExecutor, poolWaiterSize, HttpClientFactory.DEFAULT_CLIENT_NAME, HttpClientFactory.NULL_JMX_MANAGER); }
/** * legacy constructor for backward-compatibility purpose. */ public HttpNettyClient(ClientSocketChannelFactory factory, ScheduledExecutorService executor, int poolSize, int requestTimeout, int idleTimeout, int shutdownTimeout, int maxResponseSize, SSLContext sslContext, SSLParameters sslParameters, int queryPostThreshold, ExecutorService callbackExecutor, int poolWaiterSize, String name, AbstractJmxManager jmxManager) { this(factory, executor, poolSize, requestTimeout, idleTimeout, shutdownTimeout, maxResponseSize, sslContext, sslParameters, queryPostThreshold, callbackExecutor, poolWaiterSize, name, jmxManager, AsyncPoolImpl.Strategy.MRU, 0); }
/** * Creates a new HttpNettyClient * * @param factory The ClientSocketChannelFactory; it is the caller's responsibility to * shut it down * @param executor an executor; it is the caller's responsibility to shut it down * @param poolSize Maximum size of the underlying HTTP connection pool * @param requestTimeout timeout, in ms, to get a connection from the pool or create one * @param idleTimeout interval after which idle connections will be automatically closed * @param shutdownTimeout timeout, in ms, the client should wait after shutdown is * initiated before terminating outstanding requests * @param maxResponseSize * @param sslContext {@link SSLContext} * @param sslParameters {@link SSLParameters}with overloaded construct * @param queryPostThreshold length of query params above which requests will be tunneled as POSTS * @param callbackExecutor an optional executor to invoke user callback * @param poolWaiterSize Maximum waiters waiting on the HTTP connection pool * @param name Name of the {@link HttpNettyClient} * @param jmxManager A management class that is aware of the creation/shutdown event * of the underlying {@link ChannelPoolManager} * @param strategy The strategy used to return pool objects. * @param minPoolSize Minimum number of objects in the pool. Set to zero for * no minimum. */ public HttpNettyClient(ClientSocketChannelFactory factory, ScheduledExecutorService executor, int poolSize, int requestTimeout, int idleTimeout, int shutdownTimeout, int maxResponseSize, SSLContext sslContext, SSLParameters sslParameters, int queryPostThreshold, ExecutorService callbackExecutor, int poolWaiterSize, String name, AbstractJmxManager jmxManager, AsyncPoolImpl.Strategy strategy, int minPoolSize) { _maxResponseSize = maxResponseSize; _name = name; _channelPoolManager = new ChannelPoolManager(new ChannelPoolFactoryImpl(new ClientBootstrap(factory), poolSize, idleTimeout, sslContext, sslParameters, poolWaiterSize, strategy, minPoolSize), name + ChannelPoolManager.BASE_NAME); _scheduler = executor; _callbackExecutor = callbackExecutor; _requestTimeout = requestTimeout; _shutdownTimeout = shutdownTimeout; _requestTimeoutMessage = "Exceeded request timeout of " + _requestTimeout + "ms"; _queryPostThreshold = queryPostThreshold; _jmxManager = jmxManager; _jmxManager.onProviderCreate(_channelPoolManager); }
@Test public void testGetRawClient() { ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(boss, worker); HttpClientFactory factory = new HttpClientFactory(FilterChains.empty(), channelFactory, true, scheduler, true); Map<String, String> properties = new HashMap<String, String>(); String requestTimeout = "7000"; String poolSize = "10"; String maxResponse = "3000"; String idleTimeout = "8000"; String shutdownTimeout = "14000"; HttpNettyClient client; //test creation using default values client = factory.getRawClient(properties); Assert.assertEquals(client.getMaxResponseSize(), HttpClientFactory.DEFAULT_MAX_RESPONSE_SIZE); Assert.assertEquals(client.getRequestTimeout(), HttpClientFactory.DEFAULT_REQUEST_TIMEOUT); Assert.assertEquals(client.getShutdownTimeout(), HttpClientFactory.DEFAULT_SHUTDOWN_TIMEOUT); //test using only new config keys properties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, requestTimeout); properties.put(HttpClientFactory.HTTP_POOL_SIZE, poolSize); properties.put(HttpClientFactory.HTTP_IDLE_TIMEOUT, idleTimeout); properties.put(HttpClientFactory.HTTP_MAX_RESPONSE_SIZE, maxResponse); properties.put(HttpClientFactory.HTTP_SHUTDOWN_TIMEOUT, shutdownTimeout); client = factory.getRawClient(properties); Assert.assertEquals(client.getMaxResponseSize(), Integer.parseInt(maxResponse)); Assert.assertEquals(client.getRequestTimeout(), Integer.parseInt(requestTimeout)); Assert.assertEquals(client.getShutdownTimeout(), Integer.parseInt(shutdownTimeout)); }
@Test public void testShutdownTimeout() throws ExecutionException, TimeoutException, InterruptedException { ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(boss, worker); HttpClientFactory factory = new HttpClientFactory(FilterChains.empty(), channelFactory, true, scheduler, true); List<Client> clients = new ArrayList<Client>(); for (int i = 0; i < 100; i++) { clients.add(new TransportClientAdapter(factory.getClient(Collections.<String, String>emptyMap()))); } for (Client c : clients) { RestRequest r = new RestRequestBuilder(_testServer.getRequestURI()).build(); c.restRequest(r).get(30, TimeUnit.SECONDS); } FutureCallback<None> factoryShutdown = new FutureCallback<None>(); factory.shutdown(factoryShutdown, 1, TimeUnit.SECONDS); factoryShutdown.get(30, TimeUnit.SECONDS); Assert.assertTrue(boss.awaitTermination(30, TimeUnit.SECONDS), "Failed to shut down boss"); Assert.assertTrue(worker.awaitTermination(30, TimeUnit.SECONDS), "Failed to shut down worker"); Assert.assertTrue(scheduler.awaitTermination(30, TimeUnit.SECONDS), "Failed to shut down scheduler"); }
@Test public void testShutdownNoTimeout() throws ExecutionException, TimeoutException, InterruptedException { ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(boss, worker); HttpClientFactory factory = new HttpClientFactory(FilterChains.empty(), channelFactory, true, scheduler, true); List<Client> clients = new ArrayList<Client>(); for (int i = 0; i < 100; i++) { clients.add(new TransportClientAdapter(factory.getClient(Collections.<String, String>emptyMap()))); } for (Client c : clients) { RestRequest r = new RestRequestBuilder(_testServer.getRequestURI()).build(); c.restRequest(r).get(30, TimeUnit.SECONDS); } FutureCallback<None> factoryShutdown = new FutureCallback<None>(); factory.shutdown(factoryShutdown); try { factoryShutdown.get(1, TimeUnit.SECONDS); Assert.fail("Factory shutdown should have timed out"); } catch (TimeoutException e) { // Expected } Assert.assertFalse(boss.isShutdown(), "Boss should not be shut down"); Assert.assertFalse(worker.isShutdown(), "Worker should not be shut down"); Assert.assertFalse(scheduler.isShutdown(), "Scheduler should not be shut down"); }
@Test public void testShutdownIOThread() throws ExecutionException, TimeoutException, InterruptedException { ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(boss, worker); HttpClientFactory factory = new HttpClientFactory(FilterChains.empty(), channelFactory, true, scheduler, true); Client client = new TransportClientAdapter(factory.getClient( Collections.<String, Object>emptyMap())); Future<RestResponse> responseFuture = client.restRequest(new RestRequestBuilder(_testServer.resetResponseLatch(1)).build()); FutureCallback<None> factoryShutdown = new FutureCallback<None>(); factory.shutdown(factoryShutdown); FutureCallback<None> clientShutdown = new FutureCallback<None>(); client.shutdown(clientShutdown); // Client and factory shutdowns are now pending. When we release the latch, the response will // be returned, which causes the shutdowns to complete on the Netty IO thread that received the // response. _testServer.releaseResponseLatch(); responseFuture.get(60, TimeUnit.SECONDS); clientShutdown.get(60, TimeUnit.SECONDS); factoryShutdown.get(60, TimeUnit.SECONDS); Assert.assertTrue(boss.awaitTermination(60, TimeUnit.SECONDS)); Assert.assertTrue(worker.awaitTermination(60, TimeUnit.SECONDS)); Assert.assertTrue(scheduler.awaitTermination(60, TimeUnit.SECONDS)); }