protected void handle(ChannelHandlerContext ctx, Session session, int messageId) { // TODO what if the PUBREL is resented one? Topic topic = Topic.NEXUS.get(session.clientId(), messageId, ClientType.PUBLISHER); if (topic == null) { logger.error("PUBREL target does not exist [clientId={}, messageId={}]", session.clientId(), messageId); session.dispose(true); // [MQTT-3.3.5-2] return; } MqttMessage toSend = MqttMessageFactory.pubcomp(messageId); final String log = toSend.toString(); session.send(toSend, f -> { if (!f.isSuccess()) { logger.error("packet outgoing failed [{}] {}", log, f.cause()); return; } InboundMessageStatus.NEXUS.removeByKey(messageId, session.clientId()); logger.debug("Inbound message status REMOVED [clientId={}, messageId={}]", session.clientId(), messageId); }); }
protected void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) { if (!session.isConnected(true)) { logger.error("Message is not sent - Channel is inactive or out of the node. [{}]", message); return; } ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId()); String log = message.toString(); ChannelFuture cf = ctx.writeAndFlush(message).addListener(f -> { if (f.isSuccess()) { logger.debug("packet outgoing [{}]", log); } else { logger.error("packet outgoing failed [{}] {}", log, f.cause()); } }); if (completeListener != null) { cf.addListener(completeListener); } }
/** * Provides the Observer with a new item to observe. * <p> * The {@link com.caricah.iotracah.core.modules.Worker} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or * {@link #onError}. * * @param ioTMessage the item emitted by the Observable */ @Override public void onNext(IOTMessage ioTMessage) { if(null == ioTMessage || !Protocol.MQTT.equals(ioTMessage.getProtocol())){ return; } log.debug(" MqttServer onNext : message outbound {}", ioTMessage); MqttMessage mqttMessage = toServerMessage(ioTMessage); if(null == mqttMessage){ log.debug(" MqttServer onNext : ignoring outbound message {}", ioTMessage); }else { serverImpl.pushToClient(ioTMessage.getConnectionId(), mqttMessage); } serverImpl.postProcess(ioTMessage); }
@Override public void channelRead(ChannelHandlerContext ctx, Object message) { MqttMessage msg = (MqttMessage) message; MqttMessageType messageType = msg.fixedHeader().messageType(); try { switch (messageType) { case PUBLISH: LOG.info("Received a message of type {}", messageType); handlePublish((MqttPublishMessage) msg); return; default: LOG.info("Received a message of type {}", messageType); } } catch (Exception ex) { LOG.error("Bad error in processing the message", ex); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object message) { MqttMessage msg = (MqttMessage) message; MqttMessageType type = msg.fixedHeader().messageType(); try { switch (type) { case PUBLISH: LOG.info("Received a message of type {}", type); handlePublish((MqttPublishMessage) msg); return; default: LOG.info("Received a message of type {}", type); } } catch (Exception ex) { LOG.error("Bad error in processing the message", ex); } }
@Override public void channelRead(ChannelHandlerContext ctx, Object message) { MqttMessage msg = (MqttMessage) message; MqttMessageType messageType = msg.fixedHeader().messageType(); switch (messageType) { case PUBLISH: this.publishesMetrics.mark(); break; case SUBSCRIBE: this.subscribeMetrics.mark(); break; case CONNECT: this.connectedClientsMetrics.inc(); break; case DISCONNECT: this.connectedClientsMetrics.dec(); break; default: break; } ctx.fireChannelRead(message); }
@Test public void testWillMessageIsWiredOnClientKeepAliveExpiry() throws Exception { LOG.info("*** testWillMessageIsWiredOnClientKeepAliveExpiry ***"); String willTestamentTopic = "/will/test"; String willTestamentMsg = "Bye bye"; m_willSubscriber.connect(); m_willSubscriber.subscribe(willTestamentTopic, 0); m_client.clientId("FAKECLNT").connect(willTestamentTopic, willTestamentMsg); long connectTime = System.currentTimeMillis(); Awaitility.await() .atMost(7, TimeUnit.SECONDS) .untilAsserted(() -> { // but after the 2 KEEP ALIVE timeout expires it gets fired, // NB it's 1,5 * KEEP_ALIVE so 3 secs and some millis to propagate the message org.eclipse.paho.client.mqttv3.MqttMessage msg = m_messageCollector.getMessageImmediate(); assertNotNull("the will message should be fired after keep alive!", msg); // the will message hasn't to be received before the elapsing of Keep Alive timeout assertTrue(System.currentTimeMillis() - connectTime > 3000); assertEquals(willTestamentMsg, new String(msg.getPayload(), StandardCharsets.UTF_8)); }); m_willSubscriber.disconnect(); }
@Test public void testWillMessageIsPublishedOnClientBadDisconnection() throws InterruptedException, MqttException { LOG.info("*** testWillMessageIsPublishedOnClientBadDisconnection ***"); String willTestamentTopic = "/will/test"; String willTestamentMsg = "Bye bye"; m_willSubscriber.connect(); m_willSubscriber.subscribe(willTestamentTopic, 0); m_client.clientId("FAKECLNT").connect(willTestamentTopic, willTestamentMsg); // kill will publisher m_client.close(); // Verify will testament is published org.eclipse.paho.client.mqttv3.MqttMessage receivedTestament = m_messageCollector.waitMessage(1); assertEquals(willTestamentMsg, new String(receivedTestament.getPayload())); m_willSubscriber.disconnect(); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("[{}] Processing msg: {}", sessionId, msg); if (msg instanceof MqttMessage) { MqttMessage mqttMessage = (MqttMessage) msg; MqttFixedHeader fixedHeader = mqttMessage.fixedHeader(); if (fixedHeader != null) { processMqttMsg(ctx, (MqttMessage) msg); } else { //xtx } } }
private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { // deviceSessionCtx.setChannel(ctx); // assetSessionCtx.setChannel(ctx); switch (msg.fixedHeader().messageType()) { case CONNECT: processConnect(ctx, (MqttConnectMessage) msg); break; case PUBLISH: processPublish(ctx, (MqttPublishMessage) msg); // System.out.println("write..."); // ctx.write("just for test"); break; case SUBSCRIBE: processSubscribe(ctx, (MqttSubscribeMessage) msg); break; case UNSUBSCRIBE: processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case PINGREQ: if (checkConnected(ctx)) { ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); } break; case DISCONNECT: if (checkConnected(ctx)) { processDisconnect(ctx); } break; } }
/** * Send MQTT message to specific client * * @param msg MQTT Message to be sent * @param clientId Client Id * @param packetId Packet Id * @param flush Flush? */ public void sendMessage(MqttMessage msg, String clientId, Integer packetId, boolean flush) { ChannelHandlerContext ctx = getSession(clientId); if (ctx == null) { String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId); logger.debug("Message failed: Message {} {} failed to send to {}: Client not connected to this node", msg.fixedHeader().messageType(), pid, clientId); return; } sendMessage(ctx, msg, clientId, packetId, flush); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof MqttMessage) { MqttMessage mqtt = (MqttMessage) msg; if (StringUtils.isBlank(this.clientId) && mqtt.fixedHeader().messageType() == MqttMessageType.CONNECT) { this.clientId = ((MqttConnectPayload) mqtt.payload()).clientId(); } if (StringUtils.isNotBlank(this.clientId)) { this.metrics.measurement(this.clientId, this.brokerId, MessageDirection.IN, mqtt.fixedHeader().messageType()); } this.metrics.measurement(this.brokerId, MessageDirection.IN, mqtt.fixedHeader().messageType()); } ctx.fireChannelRead(msg); }
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof MqttMessage) { MqttMessage mqtt = (MqttMessage) msg; if (StringUtils.isNotBlank(this.clientId)) { this.metrics.measurement(this.clientId, this.brokerId, MessageDirection.OUT, mqtt.fixedHeader().messageType()); } this.metrics.measurement(this.brokerId, MessageDirection.OUT, mqtt.fixedHeader().messageType()); } ctx.write(msg, promise); }
public MqttMessage doMessage(Channel channel, MqttMessage msg) { String channelId = channel.id().asLongText(); logger.debug("MQTT PINGREQ " + channelId); // 更新最新连接时间 ApplicationContext.updateChannelConTime(channelId); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage message = new MqttMessage(fixedHeader); return message; }
public MqttMessage doMessage(MqttMessage msg) { logger.debug("MQTT PUBACK"); MqttPublishVariableHeader publishVariableHeader = (MqttPublishVariableHeader) msg.variableHeader(); int packetId = publishVariableHeader.packetId(); ManagerHandler.removeSendedMessage(packetId); return null; }
private void pushToNetwork(MqttMessage msg) { if (channel == null) { log.warn("channel is null:" + DeviceSessionCtx.class); } else { channel.writeAndFlush(msg); } }
@Override public void onMsg(SessionCtrlMsg msg) throws SessionException { if (msg instanceof SessionCloseMsg) { pushToNetwork( new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0))); channel.close(); } }
@Override public AdaptorToSessionActorMsg convertToActorMsg(DeviceSessionCtx ctx, MsgType type, MqttMessage inbound) throws AdaptorException { FromDeviceMsg msg; switch (type) { case POST_TELEMETRY_REQUEST: msg = convertToTelemetryUploadRequest(ctx, (MqttPublishMessage) inbound); break; case POST_ATTRIBUTES_REQUEST: msg = convertToUpdateAttributesRequest(ctx, (MqttPublishMessage) inbound); break; case SUBSCRIBE_ATTRIBUTES_REQUEST: msg = new AttributesSubscribeMsg(); break; case UNSUBSCRIBE_ATTRIBUTES_REQUEST: msg = new AttributesUnsubscribeMsg(); break; case SUBSCRIBE_RPC_COMMANDS_REQUEST: msg = new RpcSubscribeMsg(); break; case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: msg = new RpcUnsubscribeMsg(); break; case GET_ATTRIBUTES_REQUEST: msg = convertToGetAttributesRequest(ctx, (MqttPublishMessage) inbound); break; case TO_DEVICE_RPC_RESPONSE: msg = convertToRpcCommandResponse(ctx, (MqttPublishMessage) inbound); break; case TO_SERVER_RPC_REQUEST: msg = convertToServerRpcRequest(ctx, (MqttPublishMessage) inbound); break; default: log.warn("[{}] Unsupported msg type: {}!", ctx.getSessionId(), type); throw new AdaptorException(new IllegalArgumentException("Unsupported msg type: " + type + "!")); } return new BasicAdaptorToSessionActorMsg(ctx, msg); }
public static MqttMessage pubrec(int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 2); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); return new MqttMessage(fixedHeader, variableHeader); }
public static MqttMessage pubrel(int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 2); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); return new MqttMessage(fixedHeader, variableHeader); }
public static MqttMessage pubcomp(int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 2); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); return new MqttMessage(fixedHeader, variableHeader); }
protected ChannelFuture send(MqttMessage message) { if (!isConnected()) { logger.error("Channel is not active"); return null; } return channel.writeAndFlush(message); }
@Override protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { switch (msg.fixedHeader().messageType()) { case PUBLISH: if (receiver != null) { receiver.messageReceived(Message.newMessage(client.clientId(), (MqttPublishMessage) msg)); } int messageId = ((MqttPublishMessage) msg).variableHeader().messageId(); if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) { client.send(MqttMessageFactory.puback(messageId)); } else if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { client.send(MqttMessageFactory.pubrec(messageId)); } break; case CONNACK: sharedObject.receivedMessage(msg); synchronized (sharedObject.locker()) { sharedObject.locker().notify(); } break; case PUBREC: client.send(MqttMessageFactory.pubrel(((MqttMessageIdVariableHeader) msg.variableHeader()).messageId())); break; case SUBACK: case PUBACK: case PUBCOMP: default: break; } }
private MqttMessage createConnectPacket(MqttClientOptions options) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( PROTOCOL_NAME, PROTOCOL_VERSION, options.hasUsername(), options.hasPassword(), options.isWillRetain(), options.getWillQoS(), options.isWillFlag(), options.isCleanSession(), options.getKeepAliveTimeSeconds() ); MqttConnectPayload payload = new MqttConnectPayload( options.getClientId() == null ? "" : options.getClientId(), options.getWillTopic(), options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, options.hasUsername() ? options.getUsername() : null, options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null ); return MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); }
/** * Send MQTT message to specific session * * @param ctx ChannelHandlerContext as Session * @param msg MQTT Message to be sent * @param clientId Client Id * @param packetId Packet Id * @param flush Flush? */ public void sendMessage(ChannelHandlerContext ctx, MqttMessage msg, String clientId, Integer packetId, boolean flush) { String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId); ChannelFuture future = flush ? ctx.writeAndFlush(msg) : ctx.write(msg); future.addListener(f -> { if (f.isSuccess()) { logger.debug("Message succeed: Message {} {} has been sent to client {} successfully", msg.fixedHeader().messageType(), pid, clientId); } else { logger.debug("Message failed: Message {} {} failed to send to client {}: ", msg.fixedHeader().messageType(), pid, clientId, f.cause()); } }); }
@Override public void messageReceived(ChannelHandlerContext ctx, MqttMessage msg) { this.stats.incrementMqttStat(); MqttMessageType messageType = msg.fixedHeader().messageType(); switch (messageType) { case PUBLISH : MqttPublishMessage publishMessage = (MqttPublishMessage) msg; String topic = publishMessage.variableHeader().topicName(); switch (topic.toLowerCase()) { case "hardware" : hardware.messageReceived(ctx, state, publishMessage); break; } break; case PINGREQ : ctx.writeAndFlush( MqttMessageFactory.newMessage(msg.fixedHeader(), msg.variableHeader(), null), ctx.voidPromise()); break; case DISCONNECT : log.trace("Got disconnect. Closing..."); ctx.close(); break; } }
@Override public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) { System.out.println("MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType()); // If you need to handle an specific packet type: if (mqttMessage instanceof MqttPublishMessage) { MqttPublishMessage message = (MqttPublishMessage) mqttMessage; String originalMessage = message.payload().toString(Charset.forName("UTF-8")); System.out.println("Original message: " + originalMessage); // The new message content must not be bigger that the original content. String modifiedMessage = "Modified message "; message.payload().setBytes(0, modifiedMessage.getBytes()); } else { if (mqttMessage instanceof MqttConnectMessage) { MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage; System.out.println("MQTT CONNECT control packet was intercepted " + connectMessage); } } // We return true which means "call next interceptor" (if there is one) or target. // If we returned false, it means "abort call" - no more interceptors would be called and neither would // the target return true; }
private boolean checkMessageProperties(MqttMessage message, Map<String, Object> expectedProperties) { System.out.println("Checking properties in interceptor"); try { assertNotNull(message); assertNotNull(server.getNodeID()); MqttFixedHeader header = message.fixedHeader(); assertNotNull(header.messageType()); assertEquals(header.qosLevel().value(), AT_MOST_ONCE); assertEquals(header.isRetain(), expectedProperties.get(RETAINED)); } catch (Throwable t) { collector.addError(t); } return true; }
@Override public boolean intercept(MqttMessage packet, RemotingConnection connection) throws ActiveMQException { if (packet.getClass() == MqttPublishMessage.class) { messageCount++; } return true; }
protected void send(int messageId, String topicName, int qosLevel, boolean isRetain, ByteBuf payload, int deliveryCount) { boolean redelivery = qosLevel == 0 ? false : (deliveryCount > 0); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); sendToClient(publish); }
public boolean sendMessage(MqttMessage message, Integer messageID, String clientID) { final MqttMessageType messageType = message.fixedHeader().messageType(); try { if (messageID != null) { LOG.info("Sending {} message CId=<{}>, messageId={}", messageType, clientID, messageID); } else { LOG.debug("Sending {} message CId=<{}>", messageType, clientID); } ConnectionDescriptor descriptor = connectionDescriptors.get(clientID); if (descriptor == null) { if (messageID != null) { LOG.error("Client has just disconnected. {} message could not be sent. CId=<{}>, messageId={}", messageType, clientID, messageID); } else { LOG.error("Client has just disconnected. {} could not be sent. CId=<{}>", messageType, clientID); } /* * If the client has just disconnected, its connection descriptor will be null. We * don't have to make the broker crash: we'll just discard the PUBACK message. */ return false; } descriptor.writeAndFlush(message); return true; } catch (Throwable e) { String errorMsg = "Unable to send " + messageType + " message. CId=<" + clientID + ">"; if (messageID != null) { errorMsg += ", messageId=" + messageID; } LOG.error(errorMsg, e); return false; } }
public void write(Object value) { try { this.m_receivedMessage = (MqttMessage) value; } catch (Exception ex) { throw new AssertionError("Wrong return code"); } }
static byte encodeFlags(MqttMessage message) { byte flags = 0; if (message.fixedHeader().isDup()) { flags |= 0x08; } if (message.fixedHeader().isRetain()) { flags |= 0x01; } flags |= (message.fixedHeader().qosLevel().value() & 0x03) << 1; return flags; }
private MqttMessage createUnSubAckMessage(int msgId) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(UNSUBACK, false, AT_LEAST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(msgId); return new MqttMessage(mqttFixedHeader, mqttMessageIdVariableHeader); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MqttMessage mqttMessage = (MqttMessage) msg; mProcessor.process(mqttMessage, ctx.channel()); }
public void sendMessage(MqttMessage message){ QoSMessagePack messagePack = new QoSMessagePack( connection, message); MessageExecutor.put(messagePack); }