Java 类io.netty.util.concurrent.DefaultEventExecutor 实例源码

项目:tsdblite    文件:Server.java   
/**
     * Creates a new Server
     */
    private Server() {
        log.info("Configuring Netty Server....");

        String serverLevel = ConfigurationHelper.getSystemThenEnvProperty(Constants.CONF_NETTY_SERVER_LOGLEVEL, Constants.DEFAULT_NETTY_SERVER_LOGLEVEL);
        loggingHandler = new LoggingHandler(getClass(), LogLevel.valueOf(serverLevel.trim().toUpperCase()));
        iface = ConfigurationHelper.getSystemThenEnvProperty(Constants.CONF_NETTY_IFACE, Constants.DEFAULT_NETTY_IFACE);
        port = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_PORT, Constants.DEFAULT_NETTY_PORT);
        int bossThreads = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_BOSS_THREADS, Constants.DEFAULT_NETTY_BOSS_THREADS);
        int workerThreads = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_WORKER_THREADS, Constants.DEFAULT_NETTY_WORKER_THREADS);
        int groupThreads = ConfigurationHelper.getIntSystemThenEnvProperty(Constants.CONF_NETTY_CGROUP_THREADS, Constants.DEFAULT_NETTY_CGROUP_THREADS);
        bossPool = new ManagedDefaultExecutorServiceFactory("bossPool").newExecutorService(bossThreads);
//      ForkJoinPoolManager.register(bossPool, BOSS_POOL_ON);
        workerPool = new ManagedDefaultExecutorServiceFactory("workerPool").newExecutorService(workerThreads);
//      ForkJoinPoolManager.register(workerPool, WORKER_POOL_ON);
        channelGroupPool = new ManagedDefaultExecutorServiceFactory("groupPool").newExecutorService(groupThreads);
//      ForkJoinPoolManager.register(channelGroupPool, CGROUP_POOL_ON);
        bossGroup = new NioEventLoopGroup(bossThreads, bossPool, selectorProvider);
        workerGroup = new NioEventLoopGroup(bossThreads, workerPool, selectorProvider);
        bootStrap = new ServerBootstrap();
        groupExecutor = new DefaultEventExecutor(channelGroupPool);
        channelGroup = new DefaultChannelGroup("TSDBLite", groupExecutor);
        MetricCache.getInstance(); // fire up the metric cache before we start taking calls 
        log.info("Selector: {}", selectorProvider.getClass().getName());
        bootStrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(loggingHandler)
            .childHandler(this);
        try {
            serverChannel = (NioServerSocketChannel)bootStrap.bind(iface, port).sync().channel();
        } catch (Exception ex) {            
            stop();
            log.error("Failed to bind Netty server on [{}:{}]", iface, port, ex);
            throw new RuntimeException("Failed to bind Netty server", ex);
        }
        JMXHelper.registerMBean(this, OBJECT_NAME);
        log.info("\n\t======================================\n\tNetty Server started on [{}:{}]\n\t======================================", iface, port);
    }
项目:drift    文件:TestDriftNettyMethodInvoker.java   
@Override
public Future<Channel> getConnection(HostAndPort address)
{
    return new DefaultEventExecutor().newPromise();
}
项目:HeliosStreams    文件:HubManager.java   
private HubManager(final Properties properties) {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            public void run() { try { close(); } catch (Exception x) {/* No Op */} }
        });
        log.info(">>>>> Initializing HubManager...");
        metricMetaService = new MetricsMetaAPIImpl(properties);
        tsdbEndpoint = TSDBEndpoint.getEndpoint(metricMetaService.getSqlWorker());
        for(String url: tsdbEndpoint.getUpServers()) {
            final URL tsdbUrl = URLHelper.toURL(url);
            tsdbAddresses.add(new InetSocketAddress(tsdbUrl.getHost(), tsdbUrl.getPort()));
        }
        endpointCount = tsdbAddresses.size(); 
        endpointSequence = new AtomicInteger(endpointCount);
        group = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, metricMetaService.getForkJoinPool());
        bootstrap = new Bootstrap();
        bootstrap
            .handler(channelInitializer)
            .group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())
            .option(ChannelOption.ALLOCATOR, BufferManager.getInstance());
        final ChannelPoolHandler poolHandler = this;
        poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
            @Override
            protected SimpleChannelPool newPool(final InetSocketAddress key) {
                final Bootstrap b = new Bootstrap().handler(channelInitializer)
                .group(group)
                .remoteAddress(key)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator())
                .option(ChannelOption.ALLOCATOR, BufferManager.getInstance());              
                return new SimpleChannelPool(b, poolHandler);
            }
        };
        eventExecutor = new DefaultEventExecutor(metricMetaService.getForkJoinPool());
        channelGroup = new DefaultChannelGroup("MetricHubChannelGroup", eventExecutor);

//      tsdbAddresses.parallelStream().forEach(addr -> {
//          final Set<Channel> channels = Collections.synchronizedSet(new HashSet<Channel>(3));
//          IntStream.of(1,2,3).parallel().forEach(i -> {
//              final ChannelPool pool = poolMap.get(addr); 
//              try {channels.add(pool.acquire().awaitUninterruptibly().get());
//              } catch (Exception e) {}
//              log.info("Acquired [{}] Channels", channels.size());
//              channels.parallelStream().forEach(ch -> pool.release(ch));
//          });
//      });




        log.info("<<<<< HubManager Initialized.");
    }
项目:nettyRpc    文件:RpcClient.java   
/**
 * 创建eventExecutor用于执行回调
 *
 * @return
 */
public static EventExecutor createEventExecutor() {
    return new DefaultEventExecutor();
}