private FromDeviceMsg convertToGetAttributesRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); try { Integer requestId = Integer .valueOf(topicName.substring(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX.length())); String payload = inbound.payload().toString(UTF8); JsonElement requestBody = new JsonParser().parse(payload); Set<String> clientKeys = toStringSet(requestBody, "clientKeys"); Set<String> sharedKeys = toStringSet(requestBody, "sharedKeys"); if (clientKeys == null && sharedKeys == null) { return new BasicGetAttributesRequest(requestId); } else { return new BasicGetAttributesRequest(requestId, clientKeys, sharedKeys); } } catch (RuntimeException e) { log.warn("Failed to decode get attributes request", e); throw new AdaptorException(e); } }
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); int requestId = mqttMsg.variableHeader().messageId(); if (json.isJsonObject()) { JsonObject jsonObj = json.getAsJsonObject(); for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) { String deviceName = checkDeviceConnected(deviceEntry.getKey()); if (!deviceEntry.getValue().isJsonArray()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId); JsonArray deviceData = deviceEntry.getValue().getAsJsonArray(); for (JsonElement element : deviceData) { JsonConverter.parseWithTs(request, element.getAsJsonObject()); } GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); } } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } }
public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); int requestId = mqttMsg.variableHeader().messageId(); if (json.isJsonObject()) { JsonObject jsonObj = json.getAsJsonObject(); for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) { String deviceName = checkDeviceConnected(deviceEntry.getKey()); if (!deviceEntry.getValue().isJsonObject()) { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } long ts = System.currentTimeMillis(); BasicUpdateAttributesRequest request = new BasicUpdateAttributesRequest(requestId); JsonObject deviceData = deviceEntry.getValue().getAsJsonObject(); request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList())); GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); } } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object message) { MqttMessage msg = (MqttMessage) message; MqttMessageType messageType = msg.fixedHeader().messageType(); try { switch (messageType) { case PUBLISH: LOG.info("Received a message of type {}", messageType); handlePublish((MqttPublishMessage) msg); return; default: LOG.info("Received a message of type {}", messageType); } } catch (Exception ex) { LOG.error("Bad error in processing the message", ex); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object message) { MqttMessage msg = (MqttMessage) message; MqttMessageType type = msg.fixedHeader().messageType(); try { switch (type) { case PUBLISH: LOG.info("Received a message of type {}", type); handlePublish((MqttPublishMessage) msg); return; default: LOG.info("Received a message of type {}", type); } } catch (Exception ex) { LOG.error("Bad error in processing the message", ex); } }
void sendPublish(ClientSession clientsession, MqttPublishMessage pubMessage) { String clientId = clientsession.clientID; final int messageId = pubMessage.variableHeader().messageId(); final String topicName = pubMessage.variableHeader().topicName(); MqttQoS qos = pubMessage.fixedHeader().qosLevel(); if (LOG.isDebugEnabled()) { LOG.debug("Sending PUBLISH message. MessageId={}, CId={}, topic={}, qos={}, payload={}", messageId, clientId, topicName, qos, DebugUtils.payload2Str(pubMessage.payload())); } else { LOG.info("Sending PUBLISH message. MessageId={}, CId={}, topic={}", messageId, clientId, topicName); } boolean messageDelivered = connectionDescriptorStore.sendMessage(pubMessage, messageId, clientId); if (!messageDelivered) { if (qos != AT_MOST_ONCE && !clientsession.isCleanSession()) { LOG.warn("PUBLISH message could not be delivered. It will be stored. MessageId={}, CId={}, topic={}, " + "qos={}, removeTemporaryQoS2={}", messageId, clientId, topicName, qos, false); clientsession.enqueue(asStoredMessage(pubMessage)); } else { LOG.warn("PUBLISH message could not be delivered. It will be discarded. MessageId={}, CId={}, topic={}, " + "qos={}, removeTemporaryQoS2={}", messageId, clientId, topicName, qos, true); } } }
@Override public void notifyTopicPublished(final MqttPublishMessage msg, final String clientID, final String username) { msg.retain(); executor.execute(() -> { try { int messageId = msg.variableHeader().messageId(); String topic = msg.variableHeader().topicName(); for (InterceptHandler handler : handlers.get(InterceptPublishMessage.class)) { LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, " + "interceptorId={}", clientID, messageId, topic, handler.getID()); handler.onPublish(new InterceptPublishMessage(msg, clientID, username)); } } finally { ReferenceCountUtil.release(msg); } }); }
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) { // verify if topic can be write final Topic topic = new Topic(msg.variableHeader().topicName()); String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); if (!m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic); return; } // route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); this.publisher.publish2Subscribers(toStoreMsg, topic); if (msg.fixedHeader().isRetain()) { // QoS == 0 && retain => clean old retained m_messagesStore.cleanRetained(topic); } m_interceptor.notifyTopicPublished(msg, clientID, username); }
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { // deviceSessionCtx.setChannel(ctx); // assetSessionCtx.setChannel(ctx); switch (msg.fixedHeader().messageType()) { case CONNECT: processConnect(ctx, (MqttConnectMessage) msg); break; case PUBLISH: processPublish(ctx, (MqttPublishMessage) msg); // System.out.println("write..."); // ctx.write("just for test"); break; case SUBSCRIBE: processSubscribe(ctx, (MqttSubscribeMessage) msg); break; case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case PINGREQ: if (checkConnected(ctx)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); } break; case DISCONNECT: if (checkConnected(ctx)) { processDisconnect(ctx); } break; } }
private void processPublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg) { if (!checkConnected(ctx)) { return; } String topicName = mqttMsg.variableHeader().topicName(); int msgId = mqttMsg.variableHeader().messageId(); log.info("[{}] Processing publish msg [{}][{}]!", sessionId, topicName, msgId); if (topicName.startsWith(BASE_GATEWAY_API_TOPIC)) { // if (gatewaySessionCtx != null) { // gatewaySessionCtx.setChannel(ctx); // try { // if (topicName.equals(GATEWAY_TELEMETRY_TOPIC)) { // gatewaySessionCtx.onDeviceTelemetry(mqttMsg); // } else if (topicName.equals(GATEWAY_ATTRIBUTES_TOPIC)) { // gatewaySessionCtx.onDeviceAttributes(mqttMsg); // } else if (topicName.equals(GATEWAY_ATTRIBUTES_REQUEST_TOPIC)) { // gatewaySessionCtx.onDeviceAttributesRequest(mqttMsg); // } else if (topicName.equals(GATEWAY_RPC_TOPIC)) { // gatewaySessionCtx.onDeviceRpcResponse(mqttMsg); // } else if (topicName.equals(GATEWAY_CONNECT_TOPIC)) { // gatewaySessionCtx.onDeviceConnect(mqttMsg); // } else if (topicName.equals(GATEWAY_DISCONNECT_TOPIC)) { // gatewaySessionCtx.onDeviceDisconnect(mqttMsg); // } // } catch (RuntimeException | AdaptorException e) { // log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, // topicName, msgId, e); // } // } } else { processDevicePublish(ctx, mqttMsg, topicName, msgId); } }
public void add(Topic topic, MqttPublishMessage message){ synchronized (lock){ if(node == null || topic == null) return; topic.reset(); if(message != null) node.publish(message,topic.isTail()); buildNodes(node,topic.moveToNext(),message); } }
private void buildNodes(SubscriptionNode node, Topic topic, MqttPublishMessage message){ Topic.Element element = topic.next(); if (element == null) return; SubscriptionNode var = node.addNode(element); if(message != null) var.publish(message, !topic.hasNext()); buildNodes(var,topic,message); }
public static void publish(MqttPublishMessage message){ Topic topic = new Topic(message.variableHeader().topicName()); Topic.Element element = topic.getHeadElement(); if (element == null) return; SubscriptionTree cachedTree = subscriptions.get(element.getValue()); if(cachedTree == null){ SubscriptionTree tree = new SubscriptionTree(); tree.build(topic); subscriptions.putIfAbsent(element.getValue(), tree); } else { cachedTree.add(topic,message); } }
@Override public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException { FromDeviceMsg msg; switch (type) { case POST_TELEMETRY_REQUEST: msg = convertToTelemetryUploadRequest(ctx, (MqttPublishMessage) inbound); break; case POST_ATTRIBUTES_REQUEST: msg = convertToUpdateAttributesRequest(ctx, (MqttPublishMessage) inbound); break; case SUBSCRIBE_ATTRIBUTES_REQUEST: msg = new AttributesSubscribeMsg(); break; case UNSUBSCRIBE_ATTRIBUTES_REQUEST: msg = new AttributesUnsubscribeMsg(); break; case SUBSCRIBE_RPC_COMMANDS_REQUEST: msg = new RpcSubscribeMsg(); break; case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: msg = new RpcUnsubscribeMsg(); break; case GET_ATTRIBUTES_REQUEST: msg = convertToGetAttributesRequest(ctx, (MqttPublishMessage) inbound); break; case TO_DEVICE_RPC_RESPONSE: msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound); break; case TO_SERVER_RPC_REQUEST: msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound); break; default: log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type); throw new AdaptorException(new IllegalArgumentException("Unsupported msg type: " + type + "!")); } return new BasicAdaptorToSessionActorMsg(ctx, msg); }
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId()); ByteBuf payload = ALLOCATOR.buffer(); payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); return new MqttPublishMessage(mqttFixedHeader, header, payload); }
private FromDeviceMsg convertToRpcCommandResponse(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); try { Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_RESPONSE_TOPIC.length())); String payload = inbound.payload().toString(UTF8); return new ToDeviceRpcResponseMsg(requestId, payload); } catch (RuntimeException e) { log.warn("Failed to decode get attributes request", e); throw new AdaptorException(e); } }
private UpdateAttributesRequest convertToUpdateAttributesRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { String payload = validatePayload(ctx.getSessionId(), inbound.payload()); try { return JsonConverter.convertToAttributes(new JsonParser().parse(payload), inbound.variableHeader().messageId()); } catch (IllegalStateException | JsonSyntaxException ex) { throw new AdaptorException(ex); } }
private TelemetryUploadRequest convertToTelemetryUploadRequest(SessionContext ctx, MqttPublishMessage inbound) throws AdaptorException { String payload = validatePayload(ctx.getSessionId(), inbound.payload()); try { return JsonConverter.convertToTelemetry(new JsonParser().parse(payload), inbound.variableHeader().messageId()); } catch (IllegalStateException | JsonSyntaxException ex) { throw new AdaptorException(ex); } }
private FromDeviceMsg convertToServerRpcRequest(DeviceSessionCtx ctx, MqttPublishMessage inbound) throws AdaptorException { String topicName = inbound.variableHeader().topicName(); String payload = validatePayload(ctx.getSessionId(), inbound.payload()); try { Integer requestId = Integer.valueOf(topicName.substring(MqttTopics.DEVICE_RPC_REQUESTS_TOPIC.length())); return JsonConverter.convertToServerRpcRequest(new JsonParser().parse(payload), requestId); } catch (IllegalStateException | JsonSyntaxException ex) { throw new AdaptorException(ex); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { switch (msg.fixedHeader().messageType()) { case PUBLISH: if (receiver != null) { receiver.messageReceived(Message.newMessage(client.clientId(), (MqttPublishMessage) msg)); } int messageId = ((MqttPublishMessage) msg).variableHeader().messageId(); if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) { client.send(MqttMessageFactory.puback(messageId)); } else if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { client.send(MqttMessageFactory.pubrec(messageId)); } break; case CONNACK: sharedObject.receivedMessage(msg); synchronized (sharedObject.locker()) { sharedObject.locker().notify(); } break; case PUBREC: client.send(MqttMessageFactory.pubrel(((MqttMessageIdVariableHeader) msg.variableHeader()).messageId())); break; case SUBACK: case PUBACK: case PUBCOMP: default: break; } }
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException { JsonElement json = getJson(msg); String deviceName = checkDeviceName(getDeviceName(json)); String deviceType = getDeviceType(json); onDeviceConnect(deviceName, deviceType); ack(msg); }
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException { String deviceName = checkDeviceName(getDeviceName(getJson(msg))); GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName); if (deviceSessionCtx != null) { processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId())); deviceSessionCtx.setClosed(true); log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName); } else { log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName); } ack(msg); }
public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload()); if (json.isJsonObject()) { JsonObject jsonObj = json.getAsJsonObject(); String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString()); Integer requestId = jsonObj.get("id").getAsInt(); String data = jsonObj.get("data").toString(); GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data)))); } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } }
public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException { JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload()); if (json.isJsonObject()) { JsonObject jsonObj = json.getAsJsonObject(); int requestId = jsonObj.get("id").getAsInt(); String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString(); boolean clientScope = jsonObj.get("client").getAsBoolean(); Set<String> keys; if (jsonObj.has("key")) { keys = Collections.singleton(jsonObj.get("key").getAsString()); } else { JsonArray keysArray = jsonObj.get("keys").getAsJsonArray(); keys = new HashSet<>(); for (JsonElement keyObj : keysArray) { keys.add(keyObj.getAsString()); } } BasicGetAttributesRequest request; if (clientScope) { request = new BasicGetAttributesRequest(requestId, keys, null); } else { request = new BasicGetAttributesRequest(requestId, null, keys); } GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName); processor.process(new BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request))); ack(msg); } else { throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json); } }
@Override public void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) { this.stats.incrementMqttStat(); MqttMessageType messageType = msg.fixedHeader().messageType(); switch (messageType) { case PUBLISH : MqttPublishMessage publishMessage = (MqttPublishMessage) msg; String topic = publishMessage.variableHeader().topicName(); switch (topic.toLowerCase()) { case "hardware" : hardware.messageReceived(ctx, state, publishMessage); break; } break; case PINGREQ : ctx.writeAndFlush( MqttMessageFactory.newMessage(msg.fixedHeader(), msg.variableHeader(), null), ctx.voidPromise()); break; case DISCONNECT : log.trace("Got disconnect. Closing..."); ctx.close(); break; } }
@Override public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) { System.out.println("MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType()); // If you need to handle an specific packet type: if (mqttMessage instanceof MqttPublishMessage) { MqttPublishMessage message = (MqttPublishMessage) mqttMessage; String originalMessage = message.payload().toString(Charset.forName("UTF-8")); System.out.println("Original message: " + originalMessage); // The new message content must not be bigger that the original content. String modifiedMessage = "Modified message "; message.payload().setBytes(0, modifiedMessage.getBytes()); } else { if (mqttMessage instanceof MqttConnectMessage) { MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage; System.out.println("MQTT CONNECT control packet was intercepted " + connectMessage); } } // We return true which means "call next interceptor" (if there is one) or target. // If we returned false, it means "abort call" - no more interceptors would be called and neither would // the target return true; }
@Override public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { if (packet.getClass() == MqttPublishMessage.class) { messageCount++; } return true; }
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); sendToClient(publish); }
private void handlePublish(MqttPublishMessage msg) { long start = System.nanoTime(); LOG.debug("push forward message the topic {}", msg.variableHeader().topicName()); LOG.debug("content <{}>", payload2Str(msg.payload())); String decodedPayload = payload2Str(msg.payload()); long sentTime = Long.parseLong(decodedPayload.split("-")[1]); forthNetworkTime.recordValue(start - sentTime); long stop = System.nanoTime(); LOG.info("Request processed in {} ns, matching {}", stop - start, payload2Str(msg.payload())); }
private void handlePublish(MqttPublishMessage msg) { long start = System.nanoTime(); LOG.debug("push forward message the topic {}", msg.variableHeader().topicName()); LOG.debug("content <{}>", payload2Str(msg.content())); String decodedPayload = payload2Str(msg.content()); long sentTime = Long.parseLong(decodedPayload.split("-")[1]); forthNetworkTime.recordValue(start - sentTime); long stop = System.nanoTime(); LOG.info("Request processed in {} ns, matching {}", stop - start, decodedPayload); }
public static void main(String[] args) throws InterruptedException, IOException { IResourceLoader classpathLoader = new ClasspathResourceLoader(); final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); final Server mqttBroker = new Server(); List<? extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener()); mqttBroker.startServer(classPathConfig, userHandlers); System.out.println("Broker started press [CTRL+C] to stop"); //Bind a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Stopping broker"); mqttBroker.stopServer(); System.out.println("Broker stopped"); })); Thread.sleep(20000); System.out.println("Before self publish"); MqttPublishMessage message = MqttMessageBuilders.publish() .topicName("/exit") .retained(true) // qos(MqttQoS.AT_MOST_ONCE); // qQos(MqttQoS.AT_LEAST_ONCE); .qos(MqttQoS.EXACTLY_ONCE) .payload(Unpooled.copiedBuffer("Hello World!!".getBytes())) .build(); mqttBroker.internalPublish(message, "INTRLPUB"); System.out.println("After self publish"); }
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) { // verify if topic can be write final Topic topic = new Topic(msg.variableHeader().topicName()); String clientID = NettyUtils.clientID(channel); String username = NettyUtils.userName(channel); if (!m_authorizator.canWrite(topic, username, clientID)) { LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic); return; } final int messageID = msg.variableHeader().messageId(); // route message to subscribers IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg); toStoreMsg.setClientID(clientID); this.publisher.publish2Subscribers(toStoreMsg, topic, messageID); sendPubAck(clientID, messageID); if (msg.fixedHeader().isRetain()) { if (!msg.payload().isReadable()) { m_messagesStore.cleanRetained(topic); } else { // before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); }
private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) { // AdaptorToSessionActorMsg msg = null; int refCnt = mqttMsg.refCnt(); int messageId = mqttMsg.variableHeader().messageId(); log.info("[{}] refCnt: [{}], messageId: [{}]", sessionId, refCnt, messageId); MqttPublishMessage retainedDuplicate = mqttMsg.retainedDuplicate(); String kafkaOutboundTopic = null; try { if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { // msg = adaptor.convertToActorMsg(deviceSessionCtx, // POST_TELEMETRY_REQUEST, mqttMsg); kafkaOutboundTopic = KafkaTopics.DEVICE_TELEMETRY_TOPIC; } else if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { kafkaOutboundTopic = KafkaTopics.DEVICE_ATTRIBUTES_TOPIC; // msg = adaptor.convertToActorMsg(deviceSessionCtx, // POST_ATTRIBUTES_REQUEST, mqttMsg); // MqttMessage createSubscribeResponseMessage = // createSubscribeResponseMessage(msgId); //// System.out.println(createSubscribeResponseMessage.payload()); // ctx.writeAndFlush(createSubscribeResponseMessage); } else if (topicName.startsWith(DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) { // msg = adaptor.convertToActorMsg(deviceSessionCtx, // GET_ATTRIBUTES_REQUEST, mqttMsg); kafkaOutboundTopic = KafkaTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX; if (msgId >= 0) { ctx.writeAndFlush(createMqttPubAckMsg(msgId)); } } else if (topicName.startsWith(DEVICE_RPC_RESPONSE_TOPIC)) { // msg = adaptor.convertToActorMsg(deviceSessionCtx, // TO_DEVICE_RPC_RESPONSE, mqttMsg); kafkaOutboundTopic = KafkaTopics.DEVICE_ATTRIBUTES_RESPONSE_TOPIC_PREFIX; if (msgId >= 0) { ctx.writeAndFlush(createMqttPubAckMsg(msgId)); } } else if (topicName.startsWith(DEVICE_RPC_REQUESTS_TOPIC)) { // msg = adaptor.convertToActorMsg(deviceSessionCtx, // TO_SERVER_RPC_REQUEST, mqttMsg); kafkaOutboundTopic = KafkaTopics.DEVICE_RPC_REQUESTS_TOPIC; if (msgId >= 0) { ctx.writeAndFlush(createMqttPubAckMsg(msgId)); } } } catch (Exception e) { log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); } if (kafkaOutboundTopic != null) { String payload = new String(ByteBufUtil.getBytes(retainedDuplicate.payload())); Set<ChannelEntity> channelEntitys = MemoryMetaPool.getChannelByTopics(topicName); if (channelEntitys != null) { for (ChannelEntity channelEntity : channelEntitys) { log.info("PUBLISH to ChannelEntity topic = " + topicName + " payload = " + payload); channelEntity.write(retainedDuplicate); } } Device device = deviceSessionCtx.getDevice(); if (device != null && device.getId() != null) { // BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new // BasicToDeviceActorSessionMsg( // device, msg); JsonObject root = new JsonObject(); JsonElement jsonElement = new JsonParser().parse(payload); root.add("d", jsonElement); root.addProperty("messageId", messageId); log.info("[{}] msg: {}", sessionId, root.toString()); this.msgProducer.send(kafkaOutboundTopic, device.getId().toString(), root.toString()); } // processor.process(basicToDeviceActorSessionMsg); } else { log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); ctx.close(); } }
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, AttributesKVMsg msg, boolean asMap) { return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, asMap)); }
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToDeviceRpcRequestMsg msg) { return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg, false)); }
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, ToServerRpcResponseMsg msg) { return createMqttPublishMsg(ctx, topic, JsonConverter.toJson(msg)); }