@PostConstruct public void init() throws Exception { log.info("Setting resource leak detector level to {}", leakDetectorLevel); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); log.info("Starting MQTT transport..."); log.info("Lookup MQTT transport adaptor {}", adaptorName); // this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName); log.info("Starting MQTT transport server"); bossGroup = new NioEventLoopGroup(bossGroupThreadCount); workerGroup = new NioEventLoopGroup(workerGroupThreadCount); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).option(ChannelOption.SO_BACKLOG, 1000).option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true).channel(NioServerSocketChannel.class) .childHandler(new MqttTransportServerInitializer(msgProducer, deviceService, authService, assetService, assetAuthService, relationService, sslHandlerProvider)); serverChannel = b.bind(host, port).sync().channel(); log.info("Mqtt transport started: {}:{}!", host, port); }
@Test public void verify_essential_behavior() throws Exception { // given setAppAndEnvironment("typesafeconfigserver", "compiletimetest"); int port = findFreePort(); TypesafeConfigServer server = generateTypesafeConfigServer(port); assertThat(System.getProperty("org.jboss.logging.provider")).isNull(); // when server.launchServer(null); ExtractableResponse<Response> response = given() .baseUri("http://localhost") .port(port) .when() .get(SomeEndpoint.MATCHING_PATH) .then() .extract(); // then assertThat(response.statusCode()).isEqualTo(200); assertThat(response.asString()).isEqualTo("overridevalue"); assertThat(System.getProperty("org.jboss.logging.provider")).isEqualTo("slf4j"); assertThat(ResourceLeakDetector.getLevel()).isEqualTo(ResourceLeakDetector.Level.PARANOID); }
@Test public void verify_essential_behavior() throws Exception { // given setAppAndEnvironment("archaiusserver", "compiletimetest"); int port = findFreePort(); ArchaiusServer server = generateArchaiusServer(port); assertThat(System.getProperty("org.jboss.logging.provider")).isNull(); // when server.launchServer(null); ExtractableResponse<Response> response = given() .baseUri("http://localhost") .port(port) .when() .get(SomeEndpoint.MATCHING_PATH) .then() .extract(); // then assertThat(response.statusCode()).isEqualTo(200); assertThat(response.asString()).isEqualTo("overridevalue"); assertThat(System.getProperty("org.jboss.logging.provider")).isEqualTo("slf4j"); assertThat(ResourceLeakDetector.getLevel()).isEqualTo(ResourceLeakDetector.Level.PARANOID); }
public NettyTransport(ChannelType channelType, ExecutorService executor) { executorService = executor; bootstrap = new Bootstrap(); //Make sure we have a type set if (channelType == null) throw new IllegalStateException("No channel type has been specified"); //Pick the proper event loop group if (eventLoopGroup == null) { eventLoopGroup = createEventLoopGroup(channelType); } //Default Channel Options addChannelOption(ChannelOption.ALLOCATOR, allocator); addChannelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, WriteBufferWaterMark.DEFAULT); addChannelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); //Set resource leak detection if debugging is enabled if (log.isDebugEnabled()) ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); //Initialize bootstrap bootstrap.group(eventLoopGroup).channel(channelType.getChannelClass()); }
@PostConstruct public void init() throws Exception { log.info("Setting resource leak detector level to {}", leakDetectorLevel); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); log.info("Starting MQTT transport..."); log.info("Lookup MQTT transport adaptor {}", adaptorName); this.adaptor = (MqttTransportAdaptor) appContext.getBean(adaptorName); log.info("Starting MQTT transport server"); bossGroup = new NioEventLoopGroup(bossGroupThreadCount); workerGroup = new NioEventLoopGroup(workerGroupThreadCount); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MqttTransportServerInitializer(processor, deviceService, authService, relationService, adaptor, sslHandlerProvider)); serverChannel = b.bind(host, port).sync().channel(); log.info("Mqtt transport started!"); }
public static void main(String... args) throws Exception { // RakNet doesn't really like IPv6 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); System.setProperty("java.net.preferIPv4Stack", "true"); // Load native libraries early. boolean partiallySupportedLinux = Epoll.isAvailable(); boolean fullySupportedLinux = NativeCodeFactory.cipher.load(); if (partiallySupportedLinux) { NativeCodeFactory.zlib.load(); if (fullySupportedLinux) { NativeCodeFactory.hash.load(); } else { LOGGER.warn("You are running x64 Linux, but you are not using a fully-supported distribution. Server throughput and performance will be affected. Visit https://wiki.voxelwind.com/why_linux for more information."); } } else { LOGGER.warn("You are not running x64 Linux. Server throughput and performance will be affected. Visit https://wiki.voxelwind.com/why_linux for more information."); } VoxelwindServer server = new VoxelwindServer(); server.boot(); }
/** * Init timeouts and the connection registry and start the netty IO server synchronously */ @Override public void init(Container container) { super.init(container); try { // Configure netty InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory() { @Override public InternalLogger newInstance(String name) { return new NettyInternalLogger(name); } }); ResourceLeakDetector.setLevel(CoreConstants.NettyConstants.RESOURCE_LEAK_DETECTION); // Start server startServer(); } catch (InterruptedException e) { throw new StartupException("Could not start netty server", e); } }
/** * Configure netty and initialize related Components. * Afterwards call {@link #initClient()} method to start the netty IO client asynchronously. */ @Override public void init(Container container) { super.init(container); // Configure netty InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory() { @Override public InternalLogger newInstance(String name) { return new NettyInternalLogger(name); } }); ResourceLeakDetector.setLevel(CoreConstants.NettyConstants.RESOURCE_LEAK_DETECTION); // And try to connect isActive = true; initClient(); // register BroadcastListener IntentFilter filter = new IntentFilter(); filter.addAction(WifiManager.SUPPLICANT_CONNECTION_CHANGE_ACTION); filter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION); filter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION); filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION); requireComponent(ContainerService.KEY_CONTEXT).registerReceiver(broadcastReceiver, filter); }
/** * tcpdump udp port 2225 -x -vv -s0 -w 1112.pcap * * @param args * @throws java.lang.InterruptedException */ public static void main(String[] args) throws InterruptedException { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); TestClient tc = new TestClient(); tc.noDelay(1, 20, 2, 1); tc.setMinRto(10); tc.wndSize(32, 32); tc.setTimeout(10 * 1000); tc.setMtu(512); // tc.setConv(121106);//默认conv随机 tc.connect(new InetSocketAddress("localhost", 2222)); tc.start(); String content = "sdfkasd你好。。。。。。。"; ByteBuf bb = PooledByteBufAllocator.DEFAULT.buffer(1500); bb.writeBytes(content.getBytes(Charset.forName("utf-8"))); tc.send(bb); }
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeak leak; switch (ResourceLeakDetector.getLevel()) { case SIMPLE: leak = AbstractByteBuf.leakDetector.open(buf); if (leak != null) { buf = new SimpleLeakAwareByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.open(buf); if (leak != null) { buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; } return buf; }
@Test public void testExecuteMultiThreadRpc() { Random rand = new Random(System.currentTimeMillis()); ResourceLeakDetector.setLevel(Level.PARANOID); GridConfiguration config = GridConfigFactory.configure(this.getClass().getResourceAsStream("/grid-config.xml")); GridRuntime.initialize(config); RpcExecutor.registerMethod("print", RemoteObject.class, new RemoteObject()); RpcExecutor.registerMethod("add", RemoteObject.class, new RemoteObject()); ThreadUtils.threadSleep(5000); for (int i = 1; i <= 100; i++) { int a = rand.nextInt(100); int b = rand.nextInt(100); long st = System.currentTimeMillis(); RpcResult result = RpcExecutor.callMethod("add", new Object[]{a, b}, new ExecuteConfig()); System.out.println("==========>" + i + " " + result + " cost:" + (System.currentTimeMillis() - st)); ThreadUtils.threadSleep(10); } while(true) { ThreadUtils.threadSleep(10000); } }
public static void main(String... args) throws Exception { ResourceLeakDetector.setEnabled(true); final int port = Integer.parseInt(args[0]); int iscsiPort = 3260 + port; SocketAddress iscsiSocketAddress = new InetSocketAddress(iscsiPort); File basePath = new File("data" + port); final BlockStoreServer server = new BlockStoreServer(iscsiSocketAddress, basePath); server.start(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { server.stop(); } catch (Exception e) { log.error("Error stopping server", e); } } }); }
@Override public void afterPropertiesSet() throws Exception { ACTIVE = active; if (isDev()) { VIEW_SERVER_PORT = viewServerPort; ResourceLeakDetector.setLevel(Level.ADVANCED); } else { VIEW_SERVER_PORT = OsUtil.getFreePort(); } }
@Override public boolean execute(ViaCommandSender sender, String[] args) { if (ResourceLeakDetector.getLevel() != ResourceLeakDetector.Level.ADVANCED) ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); else ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); sendMessage(sender, "&6Leak detector is now %s", (ResourceLeakDetector.getLevel() == ResourceLeakDetector.Level.ADVANCED ? "&aenabled" : "&cdisabled")); return true; }
/** * Builds the network by creating the netty server bootstrap and binding to a specified port. * * @return The instance of this bootstrap. */ public Bootstrap bind() throws InterruptedException { logger.info("Building network"); ResourceLeakDetector.setLevel(Level.DISABLED); EventLoopGroup loopGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(loopGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelPiplineInitializer()).bind(43593 + world.getId()).syncUninterruptibly(); Server.serverStarted = true; logger.info(String.format("World %d has been bound to port %d", world.getId(), world.getPort())); return this; }
private static ByteBuf toLeakAwareBuffer(DoubleByteBuf buf) { try { ResourceLeakTracker<DoubleByteBuf> leak; switch (ResourceLeakDetector.getLevel()) { case DISABLED: break; case SIMPLE: leak = leakDetector.track(buf); if (leak != null) { return simpleLeakAwareByteBufConstructor.newInstance(buf, leak); } break; case ADVANCED: case PARANOID: leak = leakDetector.track(buf); if (leak != null) { return advancedLeakAwareByteBufConstructor.newInstance(buf, leak); } break; } return buf; } catch (Throwable t) { // Catch reflection exception throw new RuntimeException(t); } }
@Test public void releaseInboundChannelOnNonKeepAliveRequest() throws Exception { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); NettyContext c = HttpServer.create(0) .newHandler((req, resp) -> resp.status(200).send()) .block(); Flux<ByteBuf> src = Flux.range(0, 3) .map(n -> Unpooled.wrappedBuffer(Integer.toString(n) .getBytes())); Flux.range(0, 100) .concatMap(n -> HttpClient.create(c.address() .getPort()) .post("/return", r -> r.keepAlive(false) .send(src)) .map(resp -> { resp.dispose(); return resp.status() .code(); })) .collectList() .block(); c.dispose(); }
@Before public void setup() throws Exception { originalLevel = ResourceLeakDetector.getLevel(); ResourceLeakDetector.setLevel(Level.PARANOID); InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); this.serviceBuilder = ServiceBuilder.newInMemoryBuilder(ServiceBuilderConfig.getDefaultConfig()); this.serviceBuilder.initialize(); }
/** * Initializes this network handler effectively preparing the server to * listen for connections and handle network events. * * @param port * the port that this network will be bound to. * @throws Exception * if any issues occur while starting the network. */ public void initialize(int port) throws IOException { if (port != 43594 && port != 5555 && port != 43595) logger.warning("The preferred ports for Runescape servers are 43594, 5555, and 43595!"); ResourceLeakDetector.setLevel(Server.DEBUG ? Level.PARANOID : NetworkConstants.RESOURCE_DETECTION); bootstrap.group(loopGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(channelInitializer); bootstrap.bind(port).syncUninterruptibly(); }
@Test public void testStartup() { ResourceLeakDetector.setLevel(Level.PARANOID); GridConfiguration config = GridConfigFactory.configure(this.getClass().getResourceAsStream("/grid-config.xml")); GridRuntime.initialize(config); while(true) { ThreadUtils.threadSleep(10000); } }
@Test public void testStartRpcNode() { ResourceLeakDetector.setLevel(Level.PARANOID); GridConfiguration config = GridConfigFactory.configure(this.getClass().getResourceAsStream("/grid-config.xml")); GridRuntime.initialize(config); RpcExecutor.registerMethod("print", RemoteObject.class, new RemoteObject()); RpcExecutor.registerMethod("add", RemoteObject.class, new RemoteObject()); while(true) { ThreadUtils.threadSleep(10000); } }
@Override public boolean handle(CommandSender sender, String[] args) { if (ResourceLeakDetector.isEnabled()) { ResourceLeakDetector.setLevel(Level.DISABLED); sender.sendMessage(ChatColor.YELLOW + "Disabled leak detector"); } else { ResourceLeakDetector.setLevel(Level.PARANOID); sender.sendMessage(ChatColor.YELLOW + "Enabled leak detector"); } return true; }
@Test public void testAsyncSocketServer() throws Exception { ResourceLeakDetector.setLevel(Level.ADVANCED); TransientMockNetworkOfNodes mockNetworkOfNodes=new TransientMockNetworkOfNodes(); final CountDownLatch serverDoneBarrier = new CountDownLatch(NB_CLIENTS*NUMBER_OF_MESSAGE); MessageEchoApp serverSideCountingHandler=new MessageEchoApp(mockNetworkOfNodes.server1, serverDoneBarrier); final CountDownLatch clientsDoneBarrier = new CountDownLatch(NB_CLIENTS); for(int i=0; i<NB_CLIENTS; i++){ new Thread(){ @Override public void run() { try { doNettyClientWrite(mockNetworkOfNodes.client1ToServer1Connection); clientsDoneBarrier.countDown(); } catch (Exception e) { e.printStackTrace(); } } }.start(); } clientsDoneBarrier.await(); mockNetworkOfNodes.client1ToServer1Connection.close(); serverDoneBarrier.await(); mockNetworkOfNodes.server1.networkServer.stopAcceptingConnections(); assertEquals(NB_CLIENTS*NUMBER_OF_MESSAGE, serverSideCountingHandler.numberOfMessagesReceived.intValue()); }
@Ignore("Used for checking for transport level leaks, my be unstable on CI.") @Test(timeout = 60 * 1000) public void testSendToClosedTransportFailsButDoesNotLeak() throws Exception { Transport transport = null; ResourceLeakDetector.setLevel(Level.PARANOID); try (NettyEchoServer server = createEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); for (int i = 0; i < 256; ++i) { transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(null); LOG.info("Connected to server:{} as expected.", serverLocation); } catch (Exception e) { fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); } assertTrue(transport.isConnected()); ByteBuf sendBuffer = transport.allocateSendBuffer(10 * 1024 * 1024); sendBuffer.writeBytes(new byte[] {0, 1, 2, 3, 4}); transport.close(); try { transport.send(sendBuffer); fail("Should throw on send of closed transport"); } catch (IOException ex) { } } System.gc(); } }
public static LeakCounter installLeakTrap(){ try { Field loggerField = ResourceLeakDetector.class.getDeclaredField("logger"); loggerField.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); modifiersField.setAccessible(true); modifiersField.setInt(loggerField, loggerField.getModifiers() & ~Modifier.FINAL); InternalLogger leakLogger = (InternalLogger)loggerField.get(null); if (leakLogger instanceof LeakCounter) { logger.debug("leak counter already installed."); return (LeakCounter) leakLogger; }else if (leakLogger != null){ logger.debug("monkeypatching leak counter into Netty leak detection."); LeakTrap trap = new LeakTrap(leakLogger); loggerField.set(null, trap); return trap; } else { throw new NullPointerException("Netty ResourceLeakDetector had null logger reference?"); } } catch (Throwable t){ t.printStackTrace(); logger.warn("Couldn't monkeypatch leak counter into netty leak detection, returning noop counter to tests."); return new NoopLeakTrap(); } }
/** * Start this proxy instance by loading the configuration, plugins and * starting the connect thread. * * @throws Exception */ @Override @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE") public void start() throws Exception { System.setProperty( "java.net.preferIPv4Stack", "true" ); // Minecraft does not support IPv6 System.setProperty( "io.netty.selectorAutoRebuildThreshold", "0" ); // Seems to cause Bungee to stop accepting connections if ( System.getProperty( "io.netty.leakDetectionLevel" ) == null ) { ResourceLeakDetector.setLevel( ResourceLeakDetector.Level.DISABLED ); // Eats performance } bossEventLoopGroup = PipelineUtils.newEventLoopGroup( 0, new ThreadFactoryBuilder().setNameFormat( "Netty Boss IO Thread #%1$d" ).build() ); workerEventLoopGroup = PipelineUtils.newEventLoopGroup( 0, new ThreadFactoryBuilder().setNameFormat( "Netty Worker IO Thread #%1$d" ).build() ); File moduleDirectory = new File( "modules" ); moduleManager.load( this, moduleDirectory ); pluginManager.detectPlugins( moduleDirectory ); pluginsFolder.mkdir(); pluginManager.detectPlugins( pluginsFolder ); pluginManager.loadPlugins(); config.load(); registerChannel( ForgeConstants.FML_TAG ); registerChannel( ForgeConstants.FML_HANDSHAKE_TAG ); registerChannel( ForgeConstants.FORGE_REGISTER ); isRunning = true; pluginManager.enablePlugins(); if ( config.getJoinThrottle() > 0 ) { joinThrottle = new ConnectionThrottle( config.getJoinThrottle() ); } startListeners(); saveThread.scheduleAtFixedRate( new TimerTask() { @Override public void run() { if ( getReconnectHandler() != null ) { getReconnectHandler().save(); } } }, 0, TimeUnit.MINUTES.toMillis( 5 ) ); if (config.isMetrics()) { metricsThread.scheduleAtFixedRate(new Metrics(), 0, TimeUnit.MINUTES.toMillis(Metrics.PING_INTERVAL)); } }
private void resetNettyLeakDetectionLevel() { System.clearProperty(NETTY_LEAK_DETECTION_LEVEL_SYSTEM_PROP_KEY); ResourceLeakDetector.setLevel(Level.SIMPLE); }
@DataProvider(value = { // no-op case "null | null | null | null", // cases showing that system property takes precedence over everything "PARANOID | null | null | PARANOID", "disabled | PARANOID | null | DISABLED", // also - lowercase works "aDvAnCeD | PARANOID | DISABLED | ADVANCED", // also - mixed case works // cases showing that NETTY_LEAK_DETECTION_LEVEL_SYSTEM_PROP_KEY takes precedence // over NETTY_LEAK_DETECTION_LEVEL_APP_PROP_KEY if the system property is absent "null | ADVANCED | null | ADVANCED", "null | aDvAnCeD | PARANOID | ADVANCED", // yes, lower/mixed case still works here too // cases showing NETTY_LEAK_DETECTION_LEVEL_APP_PROP_KEY will be used if the other // options are not available "null | null | DISABLED | DISABLED", "null | null | pArAnOiD | PARANOID", // yes, lower/mixed case still works here too }, splitBy = "\\|") @Test public void setupNettyLeakDetectionLevel_works_as_expected( String systemPropValue, String configValueForSystemPropKey, String configValueForAppPropKey, Level expectedFinalLevel ) { // given assertThat(ResourceLeakDetector.getLevel()).isEqualTo(Level.SIMPLE); assertThat(expectedFinalLevel).isNotEqualTo(Level.SIMPLE); setSystemPropWithNullSupport(NETTY_LEAK_DETECTION_LEVEL_SYSTEM_PROP_KEY, systemPropValue); Function<String, String> propertyExtractionFunction = (key) -> { switch(key) { case NETTY_LEAK_DETECTION_LEVEL_SYSTEM_PROP_KEY: return configValueForSystemPropKey; case NETTY_LEAK_DETECTION_LEVEL_APP_PROP_KEY: return configValueForAppPropKey; default: throw new IllegalArgumentException("Unhandled config key: " + key); } }; Function<String, Boolean> hasPropertyFunction = (key) -> (propertyExtractionFunction.apply(key) != null); // when MainClassUtils.setupNettyLeakDetectionLevel(hasPropertyFunction, propertyExtractionFunction); // then if (expectedFinalLevel == null) { // We expect that the method did nothing since it couldn't find anything to set assertThat(System.getProperty(NETTY_LEAK_DETECTION_LEVEL_SYSTEM_PROP_KEY)).isNull(); assertThat(ResourceLeakDetector.getLevel()).isEqualTo(Level.SIMPLE); } else { assertThat(System.getProperty(NETTY_LEAK_DETECTION_LEVEL_SYSTEM_PROP_KEY)) .isEqualTo(expectedFinalLevel.name()); assertThat(ResourceLeakDetector.getLevel()).isEqualTo(expectedFinalLevel); } }
@SneakyThrows(InterruptedException.class) public void start() { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); ServerBootstrap bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(proxyProperties.getBoss()); workerGroup = new NioEventLoopGroup(proxyProperties.getWorker()); clientGroup = new NioEventLoopGroup(proxyProperties.getClient()); try { bootstrap .group(bossGroup, workerGroup) .channel(getChannelClass()) .option(ChannelOption.SO_BACKLOG, proxyProperties.getBackLog()) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, proxyProperties.getConnectTimeout()) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //channel time out handler pipeline.addLast(new IdleStateHandler(0, 0, 30)); pipeline.addLast(new IdleEventHandler()); //logging pipeline.addLast(new LoggingHandler()); if (isRouter()) { pipeline.addLast(getProxyHandler(proxyProperties)); } else { pipeline.addLast(getCustomChannelHandlers(clientGroup)); } pipeline.addLast(ExceptionHandler.INSTANCE); } }); //start server ChannelFuture future = bootstrap.bind(proxyProperties.getPort()).sync(); log.debug("Starting proxy server , port is {}", proxyProperties.getPort()); future.channel().closeFuture().sync(); } finally { stop(); } }
@Test public void testByteBufsReleasedWhenTimeout() { ResourceLeakDetector.setLevel(Level.PARANOID); byte[] content = new byte[1024*8]; Random rndm = new Random(); rndm.nextBytes(content); NettyContext server1 = HttpServer.create(0) .newRouter(routes -> routes.get("/target", (req, res) -> res.sendByteArray(Flux.just(content) .delayElements(Duration.ofMillis(100))))) .block(Duration.ofSeconds(30)); NettyContext server2 = HttpServer.create(0) .newRouter(routes -> routes.get("/forward", (req, res) -> HttpClient.create(server1.address().getPort()) .get("/target") .log() .delayElement(Duration.ofMillis(50)) .flatMap(response -> response.receive().aggregate().asString()) .timeout(Duration.ofMillis(50)) .then())) .block(Duration.ofSeconds(30)); Flux.range(0, 50) .flatMap(i -> HttpClient.create(server2.address().getPort()) .get("/forward") .log() .onErrorResume(t -> Mono.empty())) .blockLast(Duration.ofSeconds(30)); server1.dispose(); server2.dispose(); ResourceLeakDetector.setLevel(Level.SIMPLE); }
@Override public void initChannel(SocketChannel ch) throws Exception { IConfig cfg = Config.getInstance(); //if we need to check for ByteBuf leaks. if(cfg.isLeakDetector()){ ResourceLeakDetector.setLevel(Level.ADVANCED); } //so we get enough data to build our pipeline ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(1024)); ChannelPipeline pipeline = ch.pipeline(); int incomingPort = ch.localAddress().getPort(); //if users are coming in on a different port than the proxy port we need to redirect them. if(cfg.isProxy() && cfg.getPort() != incomingPort){ redirectBuilder.apply(pipeline); return; } if (cfg.isEncrypted()) { SslContext sslContext = factory.createSslContext(Config.getInstance()); SSLEngine engine = sslContext.newEngine(ch.alloc()); engine.setUseClientMode(false); engine.setNeedClientAuth(cfg.isCertAuth()); ch.pipeline().addFirst("ssl",new SslHandler(engine)); } if(cfg.isProxy()){ pipeline.channel().config().setAutoRead(false); pipeline.addLast(guicer.inject(new ProxyFrontendHandler(cfg.getProxyBackendHost(),cfg.getProxyBackendPort()))); }else{ websocketBuilder.apply(pipeline); } }
@Before public void setup() { origionalLogLevel = ResourceLeakDetector.getLevel(); ResourceLeakDetector.setLevel(Level.PARANOID); }
@After public void teardown() { ResourceLeakDetector.setLevel(origionalLogLevel); }
@After public void teardown() { this.serviceBuilder.close(); ResourceLeakDetector.setLevel(originalLevel); }