public static void subscribe(MqttSubscribeMessage message, ClientIdentifier identifier){ for(MqttTopicSubscription topicSubscription : message.payload().topicSubscriptions()){ Topic topic = new Topic(topicSubscription.topicName()); Topic.Element element = topic.getHeadElement(); if (element == null) return; SubscriptionTree cachedTree = subscriptions.get(element.getValue()); if(cachedTree == null){ SubscriptionTree tree = new SubscriptionTree(); tree.build(topic); subscriptions.putIfAbsent(element.getValue(), tree); tree.subscribe(topic,identifier); } else { cachedTree.subscribe(topic,identifier); } } }
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { // deviceSessionCtx.setChannel(ctx); // assetSessionCtx.setChannel(ctx); switch (msg.fixedHeader().messageType()) { case CONNECT: processConnect(ctx, (MqttConnectMessage) msg); break; case PUBLISH: processPublish(ctx, (MqttPublishMessage) msg); // System.out.println("write..."); // ctx.write("just for test"); break; case SUBSCRIBE: processSubscribe(ctx, (MqttSubscribeMessage) msg); break; case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case PINGREQ: if (checkConnected(ctx)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); } break; case DISCONNECT: if (checkConnected(ctx)) { processDisconnect(ctx); } break; } }
public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) { int topicNameSize = 0; int topicCount = topicSubscriptions.length; for (MqttTopicSubscription item : topicSubscriptions) { topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length; } MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 2 + topicNameSize + topicCount); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions)); return new MqttSubscribeMessage(fixedHeader, variableHeader, payload); }
void handleSubscribe(MqttSubscribeMessage message) throws Exception { MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager(); int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos)); sendToClient(ack); }
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { if (!checkConnected(ctx)) { return; } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); List<Integer> grantedQoSList = new ArrayList<>(); for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topicName = subscription.topicName(); // TODO: handle this qos level. MqttQoS reqQoS = subscription.qualityOfService(); try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { // AdaptorToSessionActorMsg msg = // adaptor.convertToActorMsg(deviceSessionCtx, // SUBSCRIBE_ATTRIBUTES_REQUEST, // mqttMsg); // BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new // BasicToDeviceActorSessionMsg( // deviceSessionCtx.getDevice(), msg); // processor.process(basicToDeviceActorSessionMsg); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { // AdaptorToSessionActorMsg msg = // adaptor.convertToActorMsg(deviceSessionCtx, // SUBSCRIBE_RPC_COMMANDS_REQUEST, // mqttMsg); // processor.process(new // BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) { grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { deviceSessionCtx.setAllowAttributeResponses(); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { grantedQoSList.add(getMinSupportedQos(reqQoS)); } else { log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); grantedQoSList.add(FAILURE.value()); } ChannelEntity channelEntity = new TcpChannelEntity(ctx.channel()); MemoryMetaPool.registerTopic(channelEntity, topicName); } catch (Exception e) { e.printStackTrace(); log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); grantedQoSList.add(FAILURE.value()); } } ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { if (stopped) { disconnect(true); return; } MqttMessage message = (MqttMessage) msg; // Disconnect if Netty codec failed to decode the stream. if (message.decoderResult().isFailure()) { log.debug("Bad Message Disconnecting Client."); disconnect(true); return; } connection.dataReceived(); MQTTUtil.logMessage(session.getState(), message, true); this.protocolManager.invokeIncoming(message, this.connection); switch (message.fixedHeader().messageType()) { case CONNECT: handleConnect((MqttConnectMessage) message, ctx); break; case PUBLISH: handlePublish((MqttPublishMessage) message); break; case PUBACK: handlePuback((MqttPubAckMessage) message); break; case PUBREC: handlePubrec(message); break; case PUBREL: handlePubrel(message); break; case PUBCOMP: handlePubcomp(message); break; case SUBSCRIBE: handleSubscribe((MqttSubscribeMessage) message); break; case UNSUBSCRIBE: handleUnsubscribe((MqttUnsubscribeMessage) message); break; case PINGREQ: handlePingreq(); break; case DISCONNECT: disconnect(false); break; case UNSUBACK: case SUBACK: case PINGRESP: case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message. default: disconnect(true); } } catch (Exception e) { log.debug("Error processing Control Packet, Disconnecting Client", e); disconnect(true); } finally { ReferenceCountUtil.release(msg); } }