@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE)); pipeline.addLast("encoder", MqttEncoder.INSTANCE); MqttTransportHandler handler = new MqttTransportHandler(msgProducer, deviceService, authService, assetService, assetAuthService, relationService, sslHandler); pipeline.addLast(handler); // ch.closeFuture().addListener(handler); }
public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.INSTANCE); ch.pipeline().addLast(new MqttInBoundHandler()); } }) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
public MQTTHardwareServer(Holder holder) { super(holder.props.getProperty("listen.address"), holder.props.getIntProperty("hardware.mqtt.port"), holder.transportTypeHolder); int hardTimeoutSecs = holder.limits.hardwareIdleTimeout; MqttHardwareLoginHandler mqttHardwareLoginHandler = new MqttHardwareLoginHandler(holder); HardwareChannelStateHandler hardwareChannelStateHandler = new HardwareChannelStateHandler(holder.sessionDao, holder.gcmWrapper); channelInitializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("MqttIdleStateHandler", new IdleStateHandler(hardTimeoutSecs, hardTimeoutSecs, 0)) .addLast(hardwareChannelStateHandler) .addLast(new MqttDecoder()) .addLast(MqttEncoder.INSTANCE) .addLast(mqttHardwareLoginHandler) .addLast(new HardwareNotLoggedHandler()); } }; log.debug("hard.socket.idle.timeout = {}", hardTimeoutSecs); }
protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("idleState", new IdleStateHandler(90,0,0, TimeUnit.SECONDS)); pipeline.addLast("mqttDecoder", new MqttDecoder()); pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE); pipeline.addLast("mqttHandler", new MqttHandler(new MqttProcessor(server))); }
private void initChannel(ChannelPipeline pipeline) { pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize())); } else { // max message size not set, so the default from Netty MQTT codec is used pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder()); } // adding the idle state handler for timeout on CONNECT packet pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0)); pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { // as MQTT 3.1.1 describes, if no packet is sent after a "reasonable" time (here CONNECT timeout) // the connection is closed ctx.channel().close(); } } } }); }
private void initChannel(ChannelPipeline pipeline) { // add into pipeline netty's (en/de)coder pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE); if (this.options.getMaxMessageSize() > 0) { pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize())); } else { // max message size not set, so the default from Netty MQTT codec is used pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder()); } if (this.options.isAutoKeepAlive() && this.options.getKeepAliveTimeSeconds() != 0) { pipeline.addBefore("handler", "idle", new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0)); pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.WRITER_IDLE) { ping(); } } } }); } }
@Test public void noConnectTest(TestContext context) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE); } }); // Start the client. ChannelFuture f = bootstrap.connect(MQTT_SERVER_HOST, MQTT_SERVER_PORT).sync(); f.channel().writeAndFlush(createPublishMessage()); // Wait until the connection is closed. f.channel().closeFuture().sync(); context.assertTrue(true); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
@Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); SslHandler sslHandler = null; if (sslHandlerProvider != null) { sslHandler = sslHandlerProvider.getSslHandler(); pipeline.addLast(sslHandler); } pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE)); pipeline.addLast("encoder", MqttEncoder.INSTANCE); MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler); pipeline.addLast(handler); ch.closeFuture().addListener(handler); }
@Test public void noConnectMessage(TestContext context) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE); } }); // Start the client. ChannelFuture f = bootstrap.connect(MQTT_BIND_ADDRESS, MQTT_LISTEN_PORT).sync(); f.channel().writeAndFlush(createPublishMessage()); // Wait until the connection is closed. f.channel().closeFuture().sync(); context.assertTrue(true); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }
@Override protected void customizePipeline(EventExecutorGroup eventExecutorGroup, ChannelPipeline pipeline) { pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", new MqttEncoder()); // we finally have the chance to add some business logic. pipeline.addLast(eventExecutorGroup, "iotracah-mqtt", new MqttServerHandler((MqttServerImpl) getServerImpl())); }
@Override public void addChannelHandlers(ChannelPipeline pipeline) { pipeline.addLast(MqttEncoder.INSTANCE); pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE)); pipeline.addLast(new MQTTProtocolHandler(server, this)); }
void init() { String host = "0.0.0.0"; int port = 1883; m_bossGroup = new NioEventLoopGroup(); m_workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(m_bossGroup, m_workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); try { pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("handler", new PublishReceiverHandler()); // pipeline.addLast("decoder", new MqttDecoder()); // pipeline.addLast("encoder", MqttEncoder.INSTANCE); // pipeline.addLast("handler", new NettyPublishReceiverHandler()); } catch (Throwable th) { LOG.error("Severe error during pipeline creation", th); throw th; } } }) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); try { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host, port); LOG.info("Server binded host: {}, port: {}", host, port); f.sync(); } catch (InterruptedException ex) { LOG.error(null, ex); } }
void init() { String host = "0.0.0.0"; int port = 1883; m_bossGroup = new NioEventLoopGroup(); m_workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(m_bossGroup, m_workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); try { pipeline.addFirst("idleStateHandler", new IdleStateHandler(2, 0, 0)); pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler()); pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("handler", new LoopMQTTHandler(state)); } catch (Throwable th) { LOG.error("Severe error during pipeline creation", th); throw th; } } }) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true); try { // Bind and start to accept incoming connections. ChannelFuture f = b.bind(host, port); LOG.info("Server binded host: {}, port: {}", host, port); f.sync(); } catch (InterruptedException ex) { LOG.error(null, ex); } }
private void initializePlainTCPTransport(final NettyMQTTHandler handler, IConfig props) throws IOException { LOG.info("Configuring TCP MQTT transport"); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME); String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME, DISABLED_PORT_BIND); if (DISABLED_PORT_BIND.equals(tcpPortProp)) { LOG.info("Property {} has been set to {}. TCP MQTT will be disabled", BrokerConstants.PORT_PROPERTY_NAME, DISABLED_PORT_BIND); return; } int port = Integer.parseInt(tcpPortProp); initFactory(host, port, "TCP MQTT", new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0)); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); // pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR)); if (errorsCather.isPresent()) { pipeline.addLast("bugsnagCatcher", errorsCather.get()); } pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("messageLogger", new MQTTMessageLogger()); if (metrics.isPresent()) { pipeline.addLast("wizardMetrics", metrics.get()); } pipeline.addLast("handler", handler); } }); }
private void initializeWebSocketTransport(final NettyMQTTHandler handler, IConfig props) throws IOException { LOG.info("Configuring Websocket MQTT transport"); String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); if (DISABLED_PORT_BIND.equals(webSocketPortProp)) { // Do nothing no WebSocket configured LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled", BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); return; } int port = Integer.parseInt(webSocketPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME); initFactory(host, port, "Websocket MQTT", new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addLast(new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST)); pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder()); pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder()); pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0)); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("messageLogger", new MQTTMessageLogger()); pipeline.addLast("handler", handler); } }); }
private void initializeSSLTCPTransport(final NettyMQTTHandler handler, IConfig props, final SSLContext sslContext) throws IOException { LOG.info("Configuring SSL MQTT transport"); String sslPortProp = props.getProperty(SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); if (DISABLED_PORT_BIND.equals(sslPortProp)) { // Do nothing no SSL configured LOG.info("Property {} has been set to {}. SSL MQTT will be disabled", BrokerConstants.SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); return; } int sslPort = Integer.parseInt(sslPortProp); LOG.info("Starting SSL on port {}", sslPort); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME); String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false"); final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth); initFactory(host, sslPort, "SSL MQTT", new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) throws Exception { pipeline.addLast("ssl", createSslHandler(sslContext, needsClientAuth)); pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0)); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); // pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR)); pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("messageLogger", new MQTTMessageLogger()); pipeline.addLast("handler", handler); } }); }
private void initializeWSSTransport(final NettyMQTTHandler handler, IConfig props, final SSLContext sslContext) throws IOException { LOG.info("Configuring secure websocket MQTT transport"); String sslPortProp = props.getProperty(WSS_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); if (DISABLED_PORT_BIND.equals(sslPortProp)) { // Do nothing no SSL configured LOG.info("Property {} has been set to {}. Secure websocket MQTT will be disabled", BrokerConstants.WSS_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); return; } int sslPort = Integer.parseInt(sslPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME); String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false"); final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth); initFactory(host, sslPort, "Secure websocket", new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) throws Exception { pipeline.addLast("ssl", createSslHandler(sslContext, needsClientAuth)); pipeline.addLast("httpEncoder", new HttpResponseEncoder()); pipeline.addLast("httpDecoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST)); pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder()); pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder()); pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0)); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder", new MqttDecoder()); pipeline.addLast("encoder", MqttEncoder.INSTANCE); pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("messageLogger", new MQTTMessageLogger()); pipeline.addLast("handler", handler); } }); }
public static void main(String[] args) throws Exception { Resources resources = new Resources(); C3P0NativeJdbcExtractor cp30NativeJdbcExtractor = new C3P0NativeJdbcExtractor(); Dispatcher dispatcher = new Dispatcher(cp30NativeJdbcExtractor.getNativeConnection(resources.postgres.getConnection())); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(dispatcher); OnlineState state = new OnlineState(); ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup worker = new NioEventLoopGroup(); try { bootstrap.group(worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.INSTANCE); ch.pipeline().addLast(new IdleStateHandler(resources.maxIdleTime, 0, 0)); ch.pipeline().addLast(new MqttHandler(resources.postgres, dispatcher, resources.mongo, state)); } }); ChannelFuture future = bootstrap.bind(resources.port).sync(); future.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } }
public MqttConnectReturnCode connect() throws InterruptedException { Class<? extends SocketChannel> socketChannelClass; if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) { group = new EpollEventLoopGroup(1, new DefaultThreadFactory("client")); socketChannelClass = EpollSocketChannel.class; } else { group = new NioEventLoopGroup(1, new DefaultThreadFactory("client")); socketChannelClass = NioSocketChannel.class; } bootstrap.group(group).channel(socketChannelClass).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { if ("mqtts".equalsIgnoreCase(uri.getScheme())) { SslContext sslCtx = SslContextBuilder.forClient().trustManager(trustManagerFactory).build(); ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), uri.getPort())); } ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder()); ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE); ch.pipeline().addLast(MqttPacketReceiver.class.getName(), new MqttPacketReceiver(MqttClient.this, receiver, sharedObject)); } }); channel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel(); normalizeMessage(options.will()); send(MqttMessageFactory.connect(options)); synchronized (sharedObject.locker()) { int timeout = Settings.INSTANCE.getInt("mqttclient.responseTimeoutSeconds", 15); sharedObject.locker().wait(timeout * 1000); } if (sharedObject.receivedMessage() == null) { return null; } return ((MqttConnAckMessage) sharedObject.receivedMessage()).variableHeader().connectReturnCode(); }
@Override protected void initChannel(SocketChannel ch) throws Exception { logger.debug("Initializaing channels..."); ch.pipeline().addLast(ByteCounterCodec.class.getName(), new ByteCounterCodec()); if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) { ch.pipeline().addLast(LoggingHandler.class.getName(), new LoggingHandler(LogLevel.DEBUG)); } if (useSsl) { SslContext sslCtx = SslContextBuilder .forServer(Settings.INSTANCE.certChainFile(), Settings.INSTANCE.privateKeyFile()).build(); logger.debug("SSL Provider : {}", SslContext.defaultServerProvider()); ch.pipeline().addLast(sslCtx.newHandler(ch.alloc())); } if (useWebSocket) { String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path", "/"); ch.pipeline().addLast(HttpServerCodec.class.getName(), new HttpServerCodec()); ch.pipeline().addLast(HttpObjectAggregator.class.getName(), new HttpObjectAggregator(1048576)); ch.pipeline().addLast(HttpContentCompressor.class.getName(), new HttpContentCompressor()); ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(), new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536)); // [MQTT-6.0.0-3] ch.pipeline().addLast(new MqttWebSocketCodec()); } int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage", 8092); ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder(maxBytesInMessage)); ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE); ch.pipeline().addLast(ConnectReceiver.class.getName(), ConnectReceiver.INSTANCE); ch.pipeline().addLast(PubAckReceiver.class.getName(), PubAckReceiver.INSTANCE); ch.pipeline().addLast(PublishReceiver.class.getName(), PublishReceiver.INSTANCE); ch.pipeline().addLast(SubscribeReceiver.class.getName(), SubscribeReceiver.INSTANCE); ch.pipeline().addLast(UnsubscribeReceiver.class.getName(), UnsubscribeReceiver.INSTANCE); ch.pipeline().addLast(GenericReceiver.class.getName(), GenericReceiver.INSTANCE); }
@Test public void multipleConnect(TestContext context) throws InterruptedException { // There are should not be any exceptions during the test mqttServer.exceptionHandler(t -> { context.assertTrue(false); }); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE); } }); // Start the client. ChannelFuture f = bootstrap.connect(MQTT_SERVER_HOST, MQTT_SERVER_PORT).sync(); long tick = System.currentTimeMillis(); MqttClientOptions options = new MqttClientOptions(); f.channel().writeAndFlush(createConnectPacket(options)).sync(); f.channel().writeAndFlush(createConnectPacket(options)).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); long tock = System.currentTimeMillis(); // Default timeout is 90 seconds // If connection was closed earlier that means that it was a server context.assertTrue((tock - tick) / 1000 < 90); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }