private void sendPubAck(String clientId, int messageID) { LOG.trace("sendPubAck invoked"); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0); MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, from(messageID)); try { if (connectionDescriptors == null) { throw new RuntimeException("Internal bad error, found connectionDescriptors to null while it should " + "be initialized, somewhere it's overwritten!!"); } LOG.trace("connected clientIDs are {}", connectionDescriptors.getConnectedClientIds()); if (!connectionDescriptors.isConnected(clientId)) { throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client %s in cache %s", clientId, connectionDescriptors)); } connectionDescriptors.sendMessage(pubAckMessage, messageID, clientId); } catch (Throwable t) { LOG.error(null, t); } }
public static MqttPubAckMessage puback(int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); return new MqttPubAckMessage(fixedHeader, variableHeader); }
@Override protected void channelRead0(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception { logger.debug("packet incoming [message={}]", msg.toString()); Session session = Session.NEXUS.get(ctx.channel().id()); if (session == null) { logger.error("None exist session message [message={}]", msg.toString()); ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-4.8.0-1] Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs())); return; } session.setLastIncomingTime(new Date()); String clientId = session.clientId(); int messageId = msg.variableHeader().messageId(); OutboundMessageStatus status = OutboundMessageStatus.NEXUS.removeByKey(messageId, clientId); if (status == null) { logger.error("PUBACK target does not exist [clientId={}, messageId={}]", clientId, messageId); session.dispose(true); // [MQTT-3.3.5-2] return; } ctx.channel().eventLoop() .execute(() -> Plugins.INSTANCE.get(DeliveredEventListener.class).delivered(new DeliveredEventArgs() { @Override public String clientId() { return clientId; } @Override public int messageId() { return messageId; } })); logger.debug("Outbound message status REMOVED [clientId={}, messageId={}]", clientId, messageId); }
void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageType) { MqttQoS qos = (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE; MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel false, 0); MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId)); sendToClient(rel); }
public static MqttPubAckMessage createMqttPubAckMsg(int requestId) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(PUBACK, false, AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from(requestId); return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader); }
private static MqttPubAckMessage createMqttPubAckMsg(int requestId) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(PUBACK, false, AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMsgIdVariableHeader = MqttMessageIdVariableHeader.from(requestId); return new MqttPubAckMessage(mqttFixedHeader, mqttMsgIdVariableHeader); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { if (stopped) { disconnect(true); return; } MqttMessage message = (MqttMessage) msg; // Disconnect if Netty codec failed to decode the stream. if (message.decoderResult().isFailure()) { log.debug("Bad Message Disconnecting Client."); disconnect(true); return; } connection.dataReceived(); MQTTUtil.logMessage(session.getState(), message, true); this.protocolManager.invokeIncoming(message, this.connection); switch (message.fixedHeader().messageType()) { case CONNECT: handleConnect((MqttConnectMessage) message, ctx); break; case PUBLISH: handlePublish((MqttPublishMessage) message); break; case PUBACK: handlePuback((MqttPubAckMessage) message); break; case PUBREC: handlePubrec(message); break; case PUBREL: handlePubrel(message); break; case PUBCOMP: handlePubcomp(message); break; case SUBSCRIBE: handleSubscribe((MqttSubscribeMessage) message); break; case UNSUBSCRIBE: handleUnsubscribe((MqttUnsubscribeMessage) message); break; case PINGREQ: handlePingreq(); break; case DISCONNECT: disconnect(false); break; case UNSUBACK: case SUBACK: case PINGRESP: case CONNACK: // The server does not instantiate connections therefore any CONNACK received over a connection is an invalid control message. default: disconnect(true); } } catch (Exception e) { log.debug("Error processing Control Packet, Disconnecting Client", e); disconnect(true); } finally { ReferenceCountUtil.release(msg); } }
void handlePuback(MqttPubAckMessage message) throws Exception { session.getMqttPublishManager().handlePubAck(getMessageId(message)); }