Java 类io.netty.handler.codec.mqtt.MqttDecoder 实例源码

项目:iothub    文件:MqttTransportServerInitializer.java   
@Override
  public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    SslHandler sslHandler = null;
    if (sslHandlerProvider != null) {
      sslHandler = sslHandlerProvider.getSslHandler();
      pipeline.addLast(sslHandler);
    }
    pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);

    MqttTransportHandler handler = new MqttTransportHandler(msgProducer, deviceService, authService, assetService,
        assetAuthService, relationService, sslHandler);
    pipeline.addLast(handler);

//    ch.closeFuture().addListener(handler);

  }
项目:iot-platform    文件:Application.java   
public void start(int port) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MqttDecoder());
                        ch.pipeline().addLast(MqttEncoder.INSTANCE);
                        ch.pipeline().addLast(new MqttInBoundHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

        ChannelFuture future = bootstrap.bind(port).sync();
        future.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

}
项目:iotplatform    文件:MqttTransportServerInitializer.java   
@Override
  public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    SslHandler sslHandler = null;
    if (sslHandlerProvider != null) {
      sslHandler = sslHandlerProvider.getSslHandler();
      pipeline.addLast(sslHandler);
    }
    pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);

    MqttTransportHandler handler = new MqttTransportHandler(msgProducer, deviceService, authService, assetService,
        assetAuthService, relationService, sslHandler);
    pipeline.addLast(handler);

//    ch.closeFuture().addListener(handler);

  }
项目:blynk-server    文件:MQTTHardwareServer.java   
public MQTTHardwareServer(Holder holder) {
    super(holder.props.getProperty("listen.address"),
            holder.props.getIntProperty("hardware.mqtt.port"), holder.transportTypeHolder);

    int hardTimeoutSecs = holder.limits.hardwareIdleTimeout;
    MqttHardwareLoginHandler mqttHardwareLoginHandler = new MqttHardwareLoginHandler(holder);
    HardwareChannelStateHandler hardwareChannelStateHandler =
            new HardwareChannelStateHandler(holder.sessionDao, holder.gcmWrapper);

    channelInitializer = new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline()
                .addLast("MqttIdleStateHandler", new IdleStateHandler(hardTimeoutSecs, hardTimeoutSecs, 0))
                .addLast(hardwareChannelStateHandler)
                .addLast(new MqttDecoder())
                .addLast(MqttEncoder.INSTANCE)
                .addLast(mqttHardwareLoginHandler)
                .addLast(new HardwareNotLoggedHandler());
        }
    };

    log.debug("hard.socket.idle.timeout = {}", hardTimeoutSecs);
}
项目:DovakinMQ    文件:TCPHandlerInitializer.java   
protected void initChannel(SocketChannel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();
    pipeline.addLast("idleState", new IdleStateHandler(90,0,0, TimeUnit.SECONDS));
    pipeline.addLast("mqttDecoder", new MqttDecoder());
    pipeline.addLast("mqttEncoder", MqttEncoder.INSTANCE);
    pipeline.addLast("mqttHandler", new MqttHandler(new MqttProcessor(server)));
}
项目:vertx-mqtt    文件:MqttServerImpl.java   
private void initChannel(ChannelPipeline pipeline) {

    pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
    if (this.options.getMaxMessageSize() > 0) {
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
    } else {
      // max message size not set, so the default from Netty MQTT codec is used
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
    }

    // adding the idle state handler for timeout on CONNECT packet
    pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
    pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() {

      @Override
      public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
          IdleStateEvent e = (IdleStateEvent) evt;
          if (e.state() == IdleState.READER_IDLE) {
            // as MQTT 3.1.1 describes, if no packet is sent after a "reasonable" time (here CONNECT timeout)
            // the connection is closed
            ctx.channel().close();
          }
        }
      }
    });
  }
项目:vertx-mqtt    文件:MqttClientImpl.java   
private void initChannel(ChannelPipeline pipeline) {

    // add into pipeline netty's (en/de)coder
    pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);

    if (this.options.getMaxMessageSize() > 0) {
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
    } else {
      // max message size not set, so the default from Netty MQTT codec is used
      pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
    }

    if (this.options.isAutoKeepAlive() &&
      this.options.getKeepAliveTimeSeconds() != 0) {

      pipeline.addBefore("handler", "idle",
        new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
      pipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() {

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

          if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.WRITER_IDLE) {
              ping();
            }
          }
        }
      });
    }
  }
项目:thingsboard    文件:MqttTransportServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    SslHandler sslHandler = null;
    if (sslHandlerProvider != null) {
        sslHandler = sslHandlerProvider.getSslHandler();
        pipeline.addLast(sslHandler);
    }
    pipeline.addLast("decoder", new MqttDecoder(MAX_PAYLOAD_SIZE));
    pipeline.addLast("encoder", MqttEncoder.INSTANCE);

    MqttTransportHandler handler = new MqttTransportHandler(processor, deviceService, authService, relationService, adaptor, sslHandler);
    pipeline.addLast(handler);
    ch.closeFuture().addListener(handler);
}
项目:iotracah    文件:MqttServerInitializer.java   
@Override
protected void customizePipeline(EventExecutorGroup eventExecutorGroup, ChannelPipeline pipeline) {
    pipeline.addLast("decoder", new MqttDecoder());
    pipeline.addLast("encoder", new MqttEncoder());

    // we finally have the chance to add some business logic.
    pipeline.addLast(eventExecutorGroup, "iotracah-mqtt", new MqttServerHandler((MqttServerImpl) getServerImpl()));
}
项目:activemq-artemis    文件:MQTTProtocolManager.java   
@Override
public void addChannelHandlers(ChannelPipeline pipeline) {
   pipeline.addLast(MqttEncoder.INSTANCE);
   pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));

   pipeline.addLast(new MQTTProtocolHandler(server, this));
}
项目:moquette    文件:ProtocolPublishDecodingServer.java   
void init() {
        String host = "0.0.0.0";
        int port = 1883;
        m_bossGroup = new NioEventLoopGroup();
        m_workerGroup = new NioEventLoopGroup();

        ServerBootstrap b = new ServerBootstrap();
        b.group(m_bossGroup, m_workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        try {
                            pipeline.addLast("decoder", new MqttDecoder());
                            pipeline.addLast("encoder", MqttEncoder.INSTANCE);
                            pipeline.addLast("handler", new PublishReceiverHandler());
//                            pipeline.addLast("decoder", new MqttDecoder());
//                            pipeline.addLast("encoder", MqttEncoder.INSTANCE);
//                            pipeline.addLast("handler", new NettyPublishReceiverHandler());
                        } catch (Throwable th) {
                            LOG.error("Severe error during pipeline creation", th);
                            throw th;
                        }
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(host, port);
            LOG.info("Server binded host: {}, port: {}", host, port);
            f.sync();
        } catch (InterruptedException ex) {
            LOG.error(null, ex);
        }
    }
项目:moquette    文件:ProtocolDecodingServer.java   
void init() {
    String host = "0.0.0.0";
    int port = 1883;
    m_bossGroup = new NioEventLoopGroup();
    m_workerGroup = new NioEventLoopGroup();

    ServerBootstrap b = new ServerBootstrap();
    b.group(m_bossGroup, m_workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    try {
                        pipeline.addFirst("idleStateHandler", new IdleStateHandler(2, 0, 0));
                        pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimeoutHandler());
                        pipeline.addLast("decoder", new MqttDecoder());
                        pipeline.addLast("encoder", MqttEncoder.INSTANCE);
                        pipeline.addLast("handler", new LoopMQTTHandler(state));
                    } catch (Throwable th) {
                        LOG.error("Severe error during pipeline creation", th);
                        throw th;
                    }
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)
            .option(ChannelOption.SO_REUSEADDR, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true);
    try {
        // Bind and start to accept incoming connections.
        ChannelFuture f = b.bind(host, port);
        LOG.info("Server binded host: {}, port: {}", host, port);
        f.sync();
    } catch (InterruptedException ex) {
        LOG.error(null, ex);
    }
}
项目:moquette    文件:NettyAcceptor.java   
private void initializePlainTCPTransport(final NettyMQTTHandler handler,
                                         IConfig props) throws IOException {
    LOG.info("Configuring TCP MQTT transport");
    final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
    String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
    String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
    if (DISABLED_PORT_BIND.equals(tcpPortProp)) {
        LOG.info("Property {} has been set to {}. TCP MQTT will be disabled", BrokerConstants.PORT_PROPERTY_NAME,
            DISABLED_PORT_BIND);
        return;
    }
    int port = Integer.parseInt(tcpPortProp);
    initFactory(host, port, "TCP MQTT", new PipelineInitializer() {

        @Override
        void init(ChannelPipeline pipeline) {
            pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
            pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
            // pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
            if (errorsCather.isPresent()) {
                pipeline.addLast("bugsnagCatcher", errorsCather.get());
            }
            pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
            pipeline.addLast("decoder", new MqttDecoder());
            pipeline.addLast("encoder", MqttEncoder.INSTANCE);
            pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
            pipeline.addLast("messageLogger", new MQTTMessageLogger());
            if (metrics.isPresent()) {
                pipeline.addLast("wizardMetrics", metrics.get());
            }
            pipeline.addLast("handler", handler);
        }
    });
}
项目:moquette    文件:NettyAcceptor.java   
private void initializeWebSocketTransport(final NettyMQTTHandler handler, IConfig props) throws IOException {
    LOG.info("Configuring Websocket MQTT transport");
    String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
    if (DISABLED_PORT_BIND.equals(webSocketPortProp)) {
        // Do nothing no WebSocket configured
        LOG.info("Property {} has been setted to {}. Websocket MQTT will be disabled",
            BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
        return;
    }
    int port = Integer.parseInt(webSocketPortProp);

    final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();

    String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
    initFactory(host, port, "Websocket MQTT", new PipelineInitializer() {

        @Override
        void init(ChannelPipeline pipeline) {
            pipeline.addLast(new HttpServerCodec());
            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
            pipeline.addLast("webSocketHandler",
                    new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
            pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
            pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
            pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
            pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
            pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
            pipeline.addLast("decoder", new MqttDecoder());
            pipeline.addLast("encoder", MqttEncoder.INSTANCE);
            pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
            pipeline.addLast("messageLogger", new MQTTMessageLogger());
            pipeline.addLast("handler", handler);
        }
    });
}
项目:moquette    文件:NettyAcceptor.java   
private void initializeSSLTCPTransport(final NettyMQTTHandler handler, IConfig props, final SSLContext sslContext)
        throws IOException {
    LOG.info("Configuring SSL MQTT transport");
    String sslPortProp = props.getProperty(SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
    if (DISABLED_PORT_BIND.equals(sslPortProp)) {
        // Do nothing no SSL configured
        LOG.info("Property {} has been set to {}. SSL MQTT will be disabled",
            BrokerConstants.SSL_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
        return;
    }

    int sslPort = Integer.parseInt(sslPortProp);
    LOG.info("Starting SSL on port {}", sslPort);

    final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
    String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
    String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false");
    final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
    initFactory(host, sslPort, "SSL MQTT", new PipelineInitializer() {

        @Override
        void init(ChannelPipeline pipeline) throws Exception {
            pipeline.addLast("ssl", createSslHandler(sslContext, needsClientAuth));
            pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
            pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
            // pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
            pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
            pipeline.addLast("decoder", new MqttDecoder());
            pipeline.addLast("encoder", MqttEncoder.INSTANCE);
            pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
            pipeline.addLast("messageLogger", new MQTTMessageLogger());
            pipeline.addLast("handler", handler);
        }
    });
}
项目:moquette    文件:NettyAcceptor.java   
private void initializeWSSTransport(final NettyMQTTHandler handler, IConfig props, final SSLContext sslContext)
        throws IOException {
    LOG.info("Configuring secure websocket MQTT transport");
    String sslPortProp = props.getProperty(WSS_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
    if (DISABLED_PORT_BIND.equals(sslPortProp)) {
        // Do nothing no SSL configured
        LOG.info("Property {} has been set to {}. Secure websocket MQTT will be disabled",
            BrokerConstants.WSS_PORT_PROPERTY_NAME, DISABLED_PORT_BIND);
        return;
    }
    int sslPort = Integer.parseInt(sslPortProp);
    final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
    String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
    String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false");
    final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
    initFactory(host, sslPort, "Secure websocket", new PipelineInitializer() {

        @Override
        void init(ChannelPipeline pipeline) throws Exception {
            pipeline.addLast("ssl", createSslHandler(sslContext, needsClientAuth));
            pipeline.addLast("httpEncoder", new HttpResponseEncoder());
            pipeline.addLast("httpDecoder", new HttpRequestDecoder());
            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
            pipeline.addLast("webSocketHandler",
                    new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
            pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
            pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
            pipeline.addFirst("idleStateHandler", new IdleStateHandler(nettyChannelTimeoutSeconds, 0, 0));
            pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
            pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector));
            pipeline.addLast("decoder", new MqttDecoder());
            pipeline.addLast("encoder", MqttEncoder.INSTANCE);
            pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
            pipeline.addLast("messageLogger", new MQTTMessageLogger());
            pipeline.addLast("handler", handler);
        }
    });
}
项目:london    文件:MqttServer.java   
public static void main(String[] args) throws Exception {
    Resources resources = new Resources();
    C3P0NativeJdbcExtractor cp30NativeJdbcExtractor = new C3P0NativeJdbcExtractor();
    Dispatcher dispatcher = new Dispatcher(cp30NativeJdbcExtractor.getNativeConnection(resources.postgres.getConnection()));
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(dispatcher);

    OnlineState state = new OnlineState();

    ServerBootstrap bootstrap = new ServerBootstrap();
    EventLoopGroup worker = new NioEventLoopGroup();
    try {
        bootstrap.group(worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MqttDecoder());
                        ch.pipeline().addLast(MqttEncoder.INSTANCE);
                        ch.pipeline().addLast(new IdleStateHandler(resources.maxIdleTime, 0, 0));
                        ch.pipeline().addLast(new MqttHandler(resources.postgres, dispatcher, resources.mongo, state));
                    }
                });
        ChannelFuture future = bootstrap.bind(resources.port).sync();
        future.channel().closeFuture().sync();
    } finally {
        worker.shutdownGracefully();
    }
}
项目:lannister    文件:MqttClient.java   
public MqttConnectReturnCode connect() throws InterruptedException {

        Class<? extends SocketChannel> socketChannelClass;

        if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
            group = new EpollEventLoopGroup(1, new DefaultThreadFactory("client"));
            socketChannelClass = EpollSocketChannel.class;
        }
        else {
            group = new NioEventLoopGroup(1, new DefaultThreadFactory("client"));
            socketChannelClass = NioSocketChannel.class;
        }

        bootstrap.group(group).channel(socketChannelClass).handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                if ("mqtts".equalsIgnoreCase(uri.getScheme())) {
                    SslContext sslCtx = SslContextBuilder.forClient().trustManager(trustManagerFactory).build();

                    ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), uri.getPort()));
                }

                ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder());
                ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE);
                ch.pipeline().addLast(MqttPacketReceiver.class.getName(),
                        new MqttPacketReceiver(MqttClient.this, receiver, sharedObject));
            }
        });

        channel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();

        normalizeMessage(options.will());
        send(MqttMessageFactory.connect(options));

        synchronized (sharedObject.locker()) {
            int timeout = Settings.INSTANCE.getInt("mqttclient.responseTimeoutSeconds", 15);

            sharedObject.locker().wait(timeout * 1000);
        }
        if (sharedObject.receivedMessage() == null) { return null; }

        return ((MqttConnAckMessage) sharedObject.receivedMessage()).variableHeader().connectReturnCode();
    }
项目:lannister    文件:MqttChannelInitializer.java   
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    logger.debug("Initializaing channels...");

    ch.pipeline().addLast(ByteCounterCodec.class.getName(), new ByteCounterCodec());

    if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) {
        ch.pipeline().addLast(LoggingHandler.class.getName(), new LoggingHandler(LogLevel.DEBUG));
    }

    if (useSsl) {
        SslContext sslCtx = SslContextBuilder
                .forServer(Settings.INSTANCE.certChainFile(), Settings.INSTANCE.privateKeyFile()).build();

        logger.debug("SSL Provider : {}", SslContext.defaultServerProvider());

        ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
    }

    if (useWebSocket) {
        String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path", "/");

        ch.pipeline().addLast(HttpServerCodec.class.getName(), new HttpServerCodec());
        ch.pipeline().addLast(HttpObjectAggregator.class.getName(), new HttpObjectAggregator(1048576));
        ch.pipeline().addLast(HttpContentCompressor.class.getName(), new HttpContentCompressor());
        ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),
                new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536)); // [MQTT-6.0.0-3]
        ch.pipeline().addLast(new MqttWebSocketCodec());
    }

    int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage", 8092);

    ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder(maxBytesInMessage));
    ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE);

    ch.pipeline().addLast(ConnectReceiver.class.getName(), ConnectReceiver.INSTANCE);
    ch.pipeline().addLast(PubAckReceiver.class.getName(), PubAckReceiver.INSTANCE);
    ch.pipeline().addLast(PublishReceiver.class.getName(), PublishReceiver.INSTANCE);
    ch.pipeline().addLast(SubscribeReceiver.class.getName(), SubscribeReceiver.INSTANCE);
    ch.pipeline().addLast(UnsubscribeReceiver.class.getName(), UnsubscribeReceiver.INSTANCE);
    ch.pipeline().addLast(GenericReceiver.class.getName(), GenericReceiver.INSTANCE);
}