/** * Creates a new Server */ private Server() { log.info("Configuring Netty Server...."); String serverLevel = ConfigurationHelper.getSystemThenEnvProperty(Constants.CONF_NETTY_SERVER_LOGLEVEL, Constants.DEFAULT_NETTY_SERVER_LOGLEVEL); loggingHandler = new LoggingHandler(getClass(), LogLevel.valueOf(serverLevel.trim().toUpperCase())); iface = ConfigurationHelper.getSystemThenEnvProperty(Constants.CONF_NETTY_IFACE, Constants.DEFAULT_NETTY_IFACE); port = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_PORT, Constants.DEFAULT_NETTY_PORT); int bossThreads = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_BOSS_THREADS, Constants.DEFAULT_NETTY_BOSS_THREADS); int workerThreads = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_WORKER_THREADS, Constants.DEFAULT_NETTY_WORKER_THREADS); int groupThreads = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_CGROUP_THREADS, Constants.DEFAULT_NETTY_CGROUP_THREADS); bossPool = new ManagedDefaultExecutorServiceFactory("bossPool").newExecutorService(bossThreads); // ForkJoinPoolManager.register(bossPool, BOSS_POOL_ON); workerPool = new ManagedDefaultExecutorServiceFactory("workerPool").newExecutorService(workerThreads); // ForkJoinPoolManager.register(workerPool, WORKER_POOL_ON); channelGroupPool = new ManagedDefaultExecutorServiceFactory("groupPool").newExecutorService(groupThreads); // ForkJoinPoolManager.register(channelGroupPool, CGROUP_POOL_ON); bossGroup = new NioEventLoopGroup(bossThreads, bossPool, selectorProvider); workerGroup = new NioEventLoopGroup(bossThreads, workerPool, selectorProvider); bootStrap = new ServerBootstrap(); groupExecutor = new DefaultEventExecutor(channelGroupPool); channelGroup = new DefaultChannelGroup("TSDBLite", groupExecutor); MetricCache.getInstance(); // fire up the metric cache before we start taking calls log.info("Selector: {}", selectorProvider.getClass().getName()); bootStrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(loggingHandler) .childHandler(this); try { serverChannel = (NioServerSocketChannel)bootStrap.bind(iface, port).sync().channel(); } catch (Exception ex) { stop(); log.error("Failed to bind Netty server on [{}:{}]", iface, port, ex); throw new RuntimeException("Failed to bind Netty server", ex); } JMXHelper.registerMBean(this, OBJECT_NAME); log.info("\n\t======================================\n\tNetty Server started on [{}:{}]\n\t======================================", iface, port); }
@Override public Future<Channel> getConnection(HostAndPort address) { return new DefaultEventExecutor().newPromise(); }
private HubManager(final Properties properties) { Runtime.getRuntime().addShutdownHook(new Thread(){ public void run() { try { close(); } catch (Exception x) {/* No Op */} } }); log.info(">>>>> Initializing HubManager..."); metricMetaService = new MetricsMetaAPIImpl(properties); tsdbEndpoint = TSDBEndpoint.getEndpoint(metricMetaService.getSqlWorker()); for(String url: tsdbEndpoint.getUpServers()) { final URL tsdbUrl = URLHelper.toURL(url); tsdbAddresses.add(new InetSocketAddress(tsdbUrl.getHost(), tsdbUrl.getPort())); } endpointCount = tsdbAddresses.size(); endpointSequence = new AtomicInteger(endpointCount); group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, metricMetaService.getForkJoinPool()); bootstrap = new Bootstrap(); bootstrap .handler(channelInitializer) .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, BufferManager.getInstance()); final ChannelPoolHandler poolHandler = this; poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(final InetSocketAddress key) { final Bootstrap b = new Bootstrap().handler(channelInitializer) .group(group) .remoteAddress(key) .channel(NioSocketChannel.class) .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, BufferManager.getInstance()); return new SimpleChannelPool(b, poolHandler); } }; eventExecutor = new DefaultEventExecutor(metricMetaService.getForkJoinPool()); channelGroup = new DefaultChannelGroup("MetricHubChannelGroup", eventExecutor); // tsdbAddresses.parallelStream().forEach(addr -> { // final Set<Channel> channels = Collections.synchronizedSet(new HashSet<Channel>(3)); // IntStream.of(1,2,3).parallel().forEach(i -> { // final ChannelPool pool = poolMap.get(addr); // try {channels.add(pool.acquire().awaitUninterruptibly().get()); // } catch (Exception e) {} // log.info("Acquired [{}] Channels", channels.size()); // channels.parallelStream().forEach(ch -> pool.release(ch)); // }); // }); log.info("<<<<< HubManager Initialized."); }
/** * 创建eventExecutor用于执行回调 * * @return */ public static EventExecutor createEventExecutor() { return new DefaultEventExecutor(); }