private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { // deviceSessionCtx.setChannel(ctx); // assetSessionCtx.setChannel(ctx); switch (msg.fixedHeader().messageType()) { case CONNECT: processConnect(ctx, (MqttConnectMessage) msg); break; case PUBLISH: processPublish(ctx, (MqttPublishMessage) msg); // System.out.println("write..."); // ctx.write("just for test"); break; case SUBSCRIBE: processSubscribe(ctx, (MqttSubscribeMessage) msg); break; case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case PINGREQ: if (checkConnected(ctx)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); } break; case DISCONNECT: if (checkConnected(ctx)) { processDisconnect(ctx); } break; } }
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { if (!checkConnected(ctx)) { return; } if (MemoryMetaPool.getClientId(ctx.channel()) == null) { ctx.channel().close(); } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); for (String topicName : mqttMsg.payload().topics()) { try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { // AdaptorToSessionActorMsg msg = // adaptor.convertToActorMsg(deviceSessionCtx, // UNSUBSCRIBE_ATTRIBUTES_REQUEST, // mqttMsg); // processor.process(new // BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { // AdaptorToSessionActorMsg msg = // adaptor.convertToActorMsg(deviceSessionCtx, // UNSUBSCRIBE_RPC_COMMANDS_REQUEST, // mqttMsg); // processor.process(new // BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { deviceSessionCtx.setDisallowAttributeResponses(); } MemoryMetaPool.unregisterTopic(ctx.channel(), topicName); } catch (Exception e) { log.warn("[{}] Failed to process unsubscription [{}] to [{}]", sessionId, mqttMsg.variableHeader().messageId(), topicName); } } ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId())); }
@Override protected void channelRead0(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception { logger.debug("packet incoming [message={}]", msg.toString()); Session session = Session.NEXUS.get(ctx.channel().id()); if (session == null) { logger.error("None exist session message [message={}]", msg.toString()); ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-4.8.0-1] Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs())); return; } session.setLastIncomingTime(new Date()); List<String> topicFilters = msg.payload().topics(); if (topicFilters == null || topicFilters.isEmpty()) { session.dispose(true); // [MQTT-4.8.0-1] return; } topicFilters.stream().forEach(tf -> { TopicSubscription.NEXUS.removeByKey(tf, session.clientId()); TopicSubscriber.NEXUS.removeByTopicFilter(session.clientId(), tf); }); Plugins.INSTANCE.get(UnsubscribeEventListener.class).unsubscribed(new UnsubscribeEventArgs() { @Override public String clientId() { return session.clientId(); } @Override public List<String> topicFilters() { return topicFilters; } }); session.send(MqttMessageFactory.unsuback(msg.variableHeader().messageId()), null); // [MQTT-2.3.1-7],[MQTT-3.10.4-4],[MQTT-3.10.4-5] }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { if (stopped) { disconnect(true); return; } MqttMessage message = (MqttMessage) msg; // Disconnect if Netty codec failed to decode the stream. if (message.decoderResult().isFailure()) { log.debug("Bad Message Disconnecting Client."); disconnect(true); return; } connection.dataReceived(); MQTTUtil.logMessage(session.getState(), message, true); this.protocolManager.invokeIncoming(message, this.connection); switch (message.fixedHeader().messageType()) { case CONNECT: handleConnect((MqttConnectMessage) message, ctx); break; case PUBLISH: handlePublish((MqttPublishMessage) message); break; case PUBACK: handlePuback((MqttPubAckMessage) message); break; case PUBREC: handlePubrec(message); break; case PUBREL: handlePubrel(message); break; case PUBCOMP: handlePubcomp(message); break; case SUBSCRIBE: handleSubscribe((MqttSubscribeMessage) message); break; case UNSUBSCRIBE: handleUnsubscribe((MqttUnsubscribeMessage) message); break; case PINGREQ: handlePingreq(); break; case DISCONNECT: disconnect(false); break; case UNSUBACK: case SUBACK: case PINGRESP: case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message. default: disconnect(true); } } catch (Exception e) { log.debug("Error processing Control Packet, Disconnecting Client", e); disconnect(true); } finally { ReferenceCountUtil.release(msg); } }
void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); sendToClient(m); }