@Bean(name = "pool-server") public TServer poolServer() throws Exception { TServerTransport transport = new TServerSocket(this.port()); TThreadPoolServer.Args args = new TThreadPoolServer.Args(transport); args.transportFactory(new TTransportFactory()); args.protocolFactory(new TBinaryProtocol.Factory()); args.processor(this.processor()); args.executorService(new ThreadPoolExecutor(env.getProperty( "rpc.server.min.worker.threads", Integer.class, 512), env .getProperty("rpc.server.max.worker.threads", Integer.class, 65535), env.getProperty( "rpc.server.thread.keep.alive.time", Long.class, 600l), TimeUnit.SECONDS, new SynchronousQueue<Runnable>())); return new TThreadPoolServer(args); }
private static TServerSocket createServerTransport(boolean secure) throws TTransportException { if (!secure) { return new TServerSocket(0); } try { SSLContext serverSslContext = ClientTestUtils.getServerSslContext(); SSLServerSocket serverSocket = (SSLServerSocket) serverSslContext.getServerSocketFactory().createServerSocket(0); return new TServerSocket(serverSocket); } catch (Exception e) { throw new TTransportException("Error initializing secure socket", e); } }
private TServerTransport getSSLServerTransport() { try { TServerTransport transport; TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); params.setKeyStore(keystore, keystorePassword, getkeyManagerAlgorithm(), keystoreType); transport = TSSLTransportFactory.getServerSocket( port, 120000, InetAddress.getByName(bindAddress), params); ServerSocket serverSock = ((TServerSocket) transport).getServerSocket(); if (serverSock instanceof SSLServerSocket) { SSLServerSocket sslServerSock = (SSLServerSocket) serverSock; List<String> enabledProtocols = new ArrayList<String>(); for (String protocol : sslServerSock.getEnabledProtocols()) { if (!excludeProtocols.contains(protocol)) { enabledProtocols.add(protocol); } } sslServerSock.setEnabledProtocols(enabledProtocols.toArray(new String[0])); } return transport; } catch (Throwable throwable) { throw new FlumeException("Cannot start Thrift source.", throwable); } }
@SuppressWarnings("deprecation") @Override public void start() { try { InetSocketAddress bindAddr = new InetSocketAddress(host, port); serverTransport = new TServerSocket(bindAddr); ThriftFlumeEventServer.Processor processor = new ThriftFlumeEventServer.Processor(new ThriftFlumeEventServerImpl()); server = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); } catch (TTransportException e) { throw new FlumeException("Failed starting source", e); } ThriftHandler thriftHandler = new ThriftHandler(server); thriftHandlerThread = new Thread(thriftHandler); thriftHandlerThread.start(); super.start(); }
/** * @Title: startSchedulerThriftService * @Description: 开启scheduler 同步、异步调用服务 * @return void 返回类型 */ private static void startSchedulerThriftService() { LOG.info("start scheduler thrift service...."); new Thread() { @Override public void run(){ try { SchedulerServiceImpl schedulerServiceImpl = SpringUtil.getBean(SchedulerServiceImpl.class); TProcessor tprocessor = new SchedulerService.Processor<SchedulerService.Iface>(schedulerServiceImpl); TServerSocket serverTransport = new TServerSocket(PropertyLoader.THRIFT_SCHEDULER_PORT); TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport); ttpsArgs.processor(tprocessor); ttpsArgs.protocolFactory(new TBinaryProtocol.Factory()); //线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求。 TServer server = new TThreadPoolServer(ttpsArgs); server.serve(); } catch (Exception e) { LOG.error("start scheduler thrift service error,msg:"+ExceptionUtil.getStackTraceAsString(e)); } } }.start(); LOG.info("start scheduler thrift server success!"); }
public void startServer() { try { logger.info("TSimpleServer start ...."); // TMultiplexedProcessor TMultiplexedProcessor processor = new TMultiplexedProcessor(); processor.registerProcessor("Algorithm", new AlgorithmService.Processor<>(new AlgorithmServiceImpl())); TServerSocket serverTransport = new TServerSocket(SERVER_PORT); TServer.Args args = new TServer.Args(serverTransport); args.processor(processor); args.protocolFactory(new TBinaryProtocol.Factory()); // args.protocolFactory(new TJSONProtocol.Factory()); TServer server = new TSimpleServer(args); server.serve(); } catch (Exception e) { logger.error("Server start error!!!"); e.printStackTrace(); } }
private TServerSocket createServerSocket(boolean useSSL, int port) throws IOException, TTransportException { TServerSocket serverSocket = null; // enable SSL support for HMS List<String> sslVersionBlacklist = new ArrayList<>(); for (String sslVersion : hiveConf.getVar(ConfVars.HIVE_SSL_PROTOCOL_BLACKLIST).split(",")) { sslVersionBlacklist.add(sslVersion); } if (!useSSL) { serverSocket = HiveAuthUtils.getServerSocket(null, port); } else { String keyStorePath = hiveConf.getVar(ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PATH).trim(); if (keyStorePath.isEmpty()) { throw new IllegalArgumentException( ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname + " Not configured for SSL connection"); } String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf, HiveConf.ConfVars.HIVE_METASTORE_SSL_KEYSTORE_PASSWORD.varname); serverSocket = HiveAuthUtils.getServerSSLSocket(null, port, keyStorePath, keyStorePassword, sslVersionBlacklist); } return serverSocket; }
public static void startRPCServer(leafServer leafserver , String ip , int port) throws Exception { ServerSocket serverSocket = new ServerSocket(port,10000, InetAddress.getByName(ip)); TServerSocket serverTransport = new TServerSocket(serverSocket); //设置协议工厂为TBinaryProtocolFactory Factory proFactory = new TBinaryProtocol.Factory(); //关联处理器leafrpc的实现 TProcessor processor = new leafrpc.Processor<leafrpc.Iface>(new RPCService(leafserver)); TThreadPoolServer.Args args2 = new TThreadPoolServer.Args(serverTransport); args2.processor(processor); args2.protocolFactory(proFactory); TServer server = new TThreadPoolServer(args2); LOG.info("leaf RPCServer(type:TThreadPoolServer) start at ip:port : "+ ip +":" + port ); server.serve(); }
ServerThread() throws TTransportException { TMultiplexedProcessor processor = new TMultiplexedProcessor(); for (String beanName : serviceMap.keySet()) { IThriftServerService serverService = (IThriftServerService) serviceMap.getService(beanName); String processorName = serverService.getName(); TProcessor tProcessor = serverService.getProcessor(serverService); processor.registerProcessor(processorName, tProcessor); logger.info("Register a processorName {} processorImpl {}", processorName, tProcessor); } logger.info("init default TServerTransport in addr {} port {}", applicationProperties.getAddr(), applicationProperties.getPort()); TServerTransport tServerTransport = new TServerSocket(new InetSocketAddress(applicationProperties.getAddr(), applicationProperties.getPort())); TThreadPoolServer.Args args = new TThreadPoolServer.Args(tServerTransport); args.processor(processor); args.protocolFactory(tProtocolFactory); server = new TThreadPoolServer(args); }
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, int workerThreads, InetSocketAddress inetSocketAddress, int backlog, int clientTimeout) throws TTransportException { TServerTransport serverTransport = new TServerSocket( new TServerSocket.ServerSocketTransportArgs(). bindAddr(inetSocketAddress).backlog(backlog). clientTimeout(clientTimeout)); log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); serverArgs.protocolFactory(protocolFactory); if (workerThreads > 0) { serverArgs.maxWorkerThreads(workerThreads); } return new TThreadPoolServer(serverArgs); }
/** * Server initialization. * * @throws Exception error */ public void start() throws Exception { log.info("initializing thrift server {}", getServerName()); final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(threadPoolNameFormat) .setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread: {}", t.getName(), e)) .build(); final ExecutorService executorService = new ThreadPoolExecutor( Math.min(2, config.getThriftServerMaxWorkerThreads()), config.getThriftServerMaxWorkerThreads(), 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), threadFactory ); RegistryUtil.registerThreadPool(registry, threadPoolNameFormat, (ThreadPoolExecutor) executorService); final int timeout = config.getThriftServerSocketClientTimeoutInSeconds() * 1000; final TServerTransport serverTransport = new TServerSocket(portNumber, timeout); startServing(executorService, serverTransport); }
public void start() throws TTransportException, UnknownHostException { InetAddress inetAddress = InetAddress.getByName(hostName); TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); params.setKeyStore(keyStore, keyStorePassword); TServerSocket serverTransport; serverTransport = TSSLTransportFactory.getServerSocket(port, clientTimeout, inetAddress, params); AuthenticatorService.Processor<AuthenticatorServiceImpl> processor = new AuthenticatorService.Processor<AuthenticatorServiceImpl>( new AuthenticatorServiceImpl(thriftAuthenticatorService)); authenticationServer = new TThreadPoolServer( new TThreadPoolServer.Args(serverTransport).processor(processor)); Thread thread = new Thread(new ServerRunnable(authenticationServer)); if (log.isDebugEnabled()) { log.debug("Thrift Authentication Service started at ssl://" + hostName + ":" + port); } thread.start(); }
public static void main(String[] args) { try { userProfileServerHandler = new UserProfileServerHandler(); userProfileProcessor = new UserProfileService.Processor(userProfileServerHandler); TMultiplexedProcessor airavataServerProcessor = new TMultiplexedProcessor(); airavataServerProcessor.registerProcessor("UserProfileService", userProfileProcessor); TServerTransport serverTransport = new TServerSocket(9190); TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(airavataServerProcessor)); System.out.println("Starting User Profile server..."); server.serve(); } catch (Exception x) { x.printStackTrace(); } }
/** * Start the SessionDriver callback server * @param conf the corona configuration for this session * @return the server socket of the callback server * @throws IOException */ private ServerSocket initializeServer(CoronaConf conf) throws IOException { // Choose any free port. ServerSocket sessionServerSocket = new ServerSocket(0, 0, getLocalAddress()); TServerSocket tServerSocket = new TServerSocket(sessionServerSocket, conf.getCMSoTimeout()); TFactoryBasedThreadPoolServer.Args args = new TFactoryBasedThreadPoolServer.Args(tServerSocket); args.processor(new SessionDriverServiceProcessor(incoming)); args.transportFactory(new TTransportFactory()); args.protocolFactory(new TBinaryProtocol.Factory(true, true)); args.stopTimeoutVal = 0; server = new TFactoryBasedThreadPoolServer( args, new TFactoryBasedThreadPoolServer.DaemonThreadFactory()); return sessionServerSocket; }
private synchronized void initializeClusterManagerCallbackServer() throws IOException { // Create thrift RPC to serve ClusterManager int soTimeout = fConf.getInt( CORONA_TASK_TRACKER_SERVER_CLIENTTIMEOUT_KEY, 30 * 1000); ServerSocket serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); serverSocket.bind(new InetSocketAddress(0)); TServerSocket tSocket = new TServerSocket(serverSocket, soTimeout); CoronaTaskTrackerService.Processor proc = new CoronaTaskTrackerService.Processor(this); TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory(true, true); TThreadPoolServer.Args args = new TThreadPoolServer.Args(tSocket); args.processor(proc); args.protocolFactory(protocolFactory); clusterManagerCallbackServer = new TThreadPoolServer(args); clusterManagerCallbackServerThread = new TServerThread(clusterManagerCallbackServer); clusterManagerCallbackServerThread.start(); clusterManagerCallbackServerAddr = new InetAddress( getLocalHostname(), serverSocket.getLocalPort()); LOG.info("SessionServer up at " + serverSocket.getLocalSocketAddress()); }
/** * Constrcts a server object */ public HadoopThriftServer(String [] args) { if (args.length > 0) { serverPort = new Integer(args[0]); } try { ServerSocket ssock = createServerSocket(serverPort); TServerTransport serverTransport = new TServerSocket(ssock); Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba"); ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); serverArgs.minWorkerThreads = 10; serverArgs.processor(processor); serverArgs.transportFactory(new TTransportFactory()); serverArgs.protocolFactory(new TBinaryProtocol.Factory()); server = new TThreadPoolServer(serverArgs); System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]..."); HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]..."); System.out.flush(); } catch (Exception x) { x.printStackTrace(); } }
public void startServer2() throws Exception { AwesomeService.Processor<AwesomeService.Iface> processor = new AwesomeService.Processor<>(referenceServer); TServerTransport serverTransport = new TServerSocket(9090); TServer server = new TSimpleServer(new TSimpleServer.Args(serverTransport).processor(processor)); ServerRunner serverRunner = new ServerRunner(server); Thread serverThread = new Thread(serverRunner); serverThread.start(); logger.info("Started binary interface"); joinMethods.add(() -> { try { serverThread.join(); } catch (InterruptedException ignored) { } }); }
@Before public void setUp() throws Exception { Log.setLog(new NoLogging()); rc = copyResourceTo("/pvdrc", temp.getRoot()); copyResourceTo("/test.thrift", temp.getRoot()); impl = Mockito.mock(MyService.Iface.class); TServerSocket transport = new TServerSocket(0); server = new TSimpleServer( new TServer.Args(transport) .protocolFactory(new TBinaryProtocol.Factory()) .processor(new MyService.Processor<>(impl))); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(server::serve); Thread.sleep(1); port = transport.getServerSocket().getLocalPort(); exitCode = 0; rpc = new RPC(console.tty()) { @Override protected void exit(int i) { exitCode = i; } }; }
@BeforeClass public static void setUpServer() throws Exception { Awaitility.setDefaultPollDelay(2, TimeUnit.MILLISECONDS); port = findFreePort(); impl = Mockito.mock(Iface.class); TServerSocket transport = new TServerSocket(port); server = new TSimpleServer( new TServer.Args(transport) .protocolFactory(new TBinaryProtocol.Factory()) .processor(new Processor<>(impl))); executor = Executors.newSingleThreadExecutor(); executor.submit(server::serve); serializer = new BinarySerializer(); address = new InetSocketAddress("localhost", port); }
private static TServerSocket createServer(SSLServerSocketFactory factory, int port, int timeout, boolean clientAuth, InetAddress ifAddress, EzSSLTransportParameters params) throws TTransportException { try { SSLServerSocket serverSocket = (SSLServerSocket) factory.createServerSocket(port, 100, ifAddress); serverSocket.setSoTimeout(timeout); serverSocket.setNeedClientAuth(clientAuth); if (params != null && params.cipherSuites != null) { serverSocket.setEnabledCipherSuites(params.cipherSuites); } return new TServerSocket(serverSocket); } catch (Exception e) { throw new TTransportException("Could not bind to port " + port, e); } }
@Override public TServer getServer(TProcessor processor) throws IOException, TTransportException { int port = type.getPort(storm_conf); TTransportFactory serverTransportFactory = getServerTransportFactory(); TServerSocket serverTransport = new TServerSocket(port); int numWorkerThreads = type.getNumThreads(storm_conf); Integer queueSize = type.getQueueSize(storm_conf); TThreadPoolServer.Args server_args = new TThreadPoolServer.Args(serverTransport).processor(new TUGIWrapProcessor(processor)).minWorkerThreads(numWorkerThreads) .maxWorkerThreads(numWorkerThreads).protocolFactory(new TBinaryProtocol.Factory(false, true)); if (serverTransportFactory != null) { server_args.transportFactory(serverTransportFactory); } BlockingQueue workQueue = new SynchronousQueue(); if (queueSize != null) { workQueue = new ArrayBlockingQueue(queueSize); } ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 60, TimeUnit.SECONDS, workQueue); server_args.executorService(executorService); return new TThreadPoolServer(server_args); }
public static void main(String[] args) { try { handler = new CalculatorHandler(); processor = new Calculator.Processor(handler); try { TServerTransport serverTransport = new TServerSocket(9090); TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(processor)); System.out.println("Starting the server..."); server.serve(); } catch (TTransportException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (Exception x) { x.printStackTrace(); } }
public static void main(String[] args) throws TTransportException, UnsupportedEncodingException { final String msg = "Hello Thrift!\n"; final String stop_cmd = "STOP"; final int buf_size = 1024*8; byte[] buf = new byte[buf_size]; final int port = 9090; TServerTransport acceptor = new TServerSocket(9090); acceptor.listen(); System.out.println("[Server] listening on port: " + port); String input; do { TTransport trans = acceptor.accept(); int len = trans.read(buf, 0, buf_size); input = new String(buf, 0, len,"UTF-8"); System.out.println("[Server] handling request: " + input); trans.write(msg.getBytes()); trans.flush(); trans.close(); } while (! stop_cmd.regionMatches(0, input, 0, 4)); System.out.println("[Server] exiting"); acceptor.close(); }
public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException { TTransportFactory serverTransportFactory = getServerTransportFactory(); //define THsHaServer args //original: THsHaServer + TNonblockingServerSocket //option: TThreadPoolServer + TServerSocket TServerSocket serverTransport = new TServerSocket(port); TThreadPoolServer.Args server_args = new TThreadPoolServer.Args(serverTransport). processor(new TUGIWrapProcessor(processor)). minWorkerThreads(64). maxWorkerThreads(64). protocolFactory(new TBinaryProtocol.Factory()); if (serverTransportFactory != null) server_args.transportFactory(serverTransportFactory); //construct THsHaServer return new TThreadPoolServer(server_args); }
public static void main(String[] args) { int port = 9090; try { TServerTransport serverTransport = new TServerSocket(port); Args processor = new TThreadPoolServer.Args(serverTransport) .inputTransportFactory(new TFramedTransport.Factory()) .outputTransportFactory(new TFramedTransport.Factory()) .processor(new Processor<>(new TestThriftServiceHandler())); // processor.maxWorkerThreads = 20; TThreadPoolServer server = new TThreadPoolServer(processor); System.out.println("Starting the server..."); server.serve(); } catch (Exception e) { e.printStackTrace(); } }
@BeforeClass public static void setUp() { int port = 9090; try { TServerTransport serverTransport = new TServerSocket(port); Args processor = new TThreadPoolServer.Args(serverTransport) .inputTransportFactory(new TFramedTransport.Factory()) .outputTransportFactory(new TFramedTransport.Factory()) .processor(new Processor<>(new TestThriftServiceHandler())); // processor.maxWorkerThreads = 20; TThreadPoolServer server = new TThreadPoolServer(processor); logger.info("Starting test server..."); new Thread(server::serve).start(); Thread.sleep(1000); // waiting server init } catch (Exception e) { e.printStackTrace(); } }
/** * Creates the server socket. * * @return the t server transport * @throws TTransportException the t transport exception */ public TServerTransport createServerSocket() throws TTransportException { return new TServerSocket( new InetSocketAddress(getNodeConfig().getThriftHost(), getNodeConfig().getThriftPort())) { @Override protected TSocket acceptImpl() throws TTransportException { ServerSocket serverSocket = getServerSocket(); if (serverSocket == null) { throw new TTransportException( TTransportException.NOT_OPEN, "No underlying server socket." ); } try { Socket result = serverSocket.accept(); TSocketWrapper result2 = new TSocketWrapper(result); result2.setTimeout(0); openedSockets.add(result2); return result2; } catch (IOException iox) { throw new TTransportException(iox); } } }; }
public static SyncEchoTestServer<TSimpleServer> simpleServer(final TestEnvironment environment) throws TTransportException { TSimpleServer server = new TSimpleServer(new TSimpleServer.Args(new TServerSocket(environment.getPort())) .processor(getProcessor()).inputProtocolFactory(environment.getProtocolFactory()) .outputProtocolFactory(environment.getProtocolFactory())); return new SyncEchoTestServer<TSimpleServer>(server, environment) { @Override public SyncEchoTestClient getSynchronousClient() throws TTransportException { return new SyncEchoTestClient.Client(environment); } @Override public AsyncEchoTestClient getAsynchronousClient() throws IOException { return new AsyncEchoTestClient.Client(environment); } }; }
public static SyncEchoTestServer<TThreadPoolServer> threadedPoolServer(final TestEnvironment environment) throws TTransportException { TThreadPoolServer server = new TThreadPoolServer(new TThreadPoolServer.Args(new TServerSocket( environment.getPort())).processor(getProcessor()) .inputProtocolFactory(environment.getProtocolFactory()) .outputProtocolFactory(environment.getProtocolFactory())); return new SyncEchoTestServer<TThreadPoolServer>(server, environment) { @Override public SyncEchoTestClient getSynchronousClient() throws TTransportException { return new SyncEchoTestClient.Client(environment); } @Override public AsyncEchoTestClient getAsynchronousClient() throws IOException { return new AsyncEchoTestClient.Client(environment); } }; }
@Override protected TServer getServer(TProcessor processor) throws TTransportException { LOGGER.debug("Setting Secured Server on port {} and keystore", remotePort, keystoreFile); TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); params.setKeyStore(keystoreFile, keystorePass); TServerSocket serverTransport; try { serverTransport = TSSLTransportFactory.getServerSocket(remotePort, 1000, InetAddress.getByName("localhost"), params); } catch (UnknownHostException e) { throw new TTransportException(e); } return new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(processor)); }
public int run(String[] args) throws Exception { Configuration conf = getConf(); int port = conf.getInt("wmr.server.bind.port", 50100); SubmissionDatabase.connect(conf); JobServiceHandler service = new JobServiceHandler(new Configuration()); JobService.Processor processor = new JobService.Processor(service); TServerTransport transport = new TServerSocket(port); TServer server = new TSimpleServer(new Args(transport).processor(processor)); server.serve(); return 0; }
protected void init() throws Exception { TServerTransport serverTransport = new TServerSocket( PORT ); TBinaryProtocol.Factory bFactory = new TBinaryProtocol.Factory(); server = new TThreadPoolServer( new TThreadPoolServer.Args( serverTransport ) .inputProtocolFactory( bFactory ) .outputProtocolFactory( bFactory ) .inputTransportFactory( getTransportFactory() ) .outputTransportFactory( getTransportFactory() ) .processor( getProcessor() ) ); Thread startTread = new Thread() { @Override public void run() { server.serve(); } }; startTread.setName( "thrift-server" ); startTread.start(); while( !server.isServing() ) { Thread.sleep( 100 ); } protocol = ExtensionLoader.getExtensionLoader(Protocol.class) .getExtension( ThriftProtocol.NAME ); invoker = protocol.refer( getInterface(), getUrl() ); }
private static int testProcessor(TProcessor processor, List<ToIntFunction<HostAndPort>> clients) throws Exception { try (TServerSocket serverTransport = new TServerSocket(0)) { TProtocolFactory protocolFactory = new Factory(); TTransportFactory transportFactory = new TFramedTransport.Factory(); TServer server = new TSimpleServer(new Args(serverTransport) .protocolFactory(protocolFactory) .transportFactory(transportFactory) .processor(processor)); Thread serverThread = new Thread(server::serve); try { serverThread.start(); int localPort = serverTransport.getServerSocket().getLocalPort(); HostAndPort address = HostAndPort.fromParts("localhost", localPort); int sum = 0; for (ToIntFunction<HostAndPort> client : clients) { sum += client.applyAsInt(address); } return sum; } finally { server.stop(); serverThread.interrupt(); } } }
private static int testProcessor(TProcessor processor, List<ToIntFunction<HostAndPort>> clients) throws Exception { try (TServerSocket serverTransport = new TServerSocket(0)) { TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); TTransportFactory transportFactory = new TFramedTransport.Factory(); TServer server = new TSimpleServer(new Args(serverTransport) .protocolFactory(protocolFactory) .transportFactory(transportFactory) .processor(processor)); Thread serverThread = new Thread(server::serve); try { serverThread.start(); int localPort = serverTransport.getServerSocket().getLocalPort(); HostAndPort address = HostAndPort.fromParts("localhost", localPort); int sum = 0; for (ToIntFunction<HostAndPort> client : clients) { sum += client.applyAsInt(address); } return sum; } finally { server.stop(); serverThread.interrupt(); } } }