@Override public void updateSubscription(String clientId, List<String> topicLevels, MqttQoS qos) { if (Topics.isTopicFilter(topicLevels)) { boolean b1 = hash().hset(RedisKey.subscription(clientId), String.join("/", topicLevels), String.valueOf(qos.value())); boolean b2 = hash().hset(RedisKey.topicFilter(topicLevels), clientId, String.valueOf(qos.value())); if (b1 && b2) { List<String> keys = new ArrayList<>(); List<String> argv = new ArrayList<>(); // topic filter tree for (int i = 0; i < topicLevels.size(); i++) { keys.add(RedisKey.topicFilterChild(topicLevels.subList(0, i))); argv.add(topicLevels.get(i)); } script().eval("local length = table.getn(KEYS)\n" + "for i = 1, length do\n" + " redis.call('HINCRBY', KEYS[i], ARGV[i], 1)\n" + "end\n" + "return redis.status_reply('OK')", ScriptOutputType.STATUS, keys.toArray(new String[keys.size()]), argv.toArray(new String[argv.size()])); } } else { hash().hset(RedisKey.subscription(clientId), String.join("/", topicLevels), String.valueOf(qos.value())); hash().hset(RedisKey.topicName(topicLevels), clientId, String.valueOf(qos.value())); } }
@Override public void start() throws Exception { MqttClient client = MqttClient.create(vertx); client.rxConnect(PORT, HOST) .flatMapPublisher(ack -> Flowable.interval(1, TimeUnit.SECONDS) .flatMapSingle(l -> { JsonObject payload = new JsonObject() .put("uuid", id) .put("data", random.nextInt(100)); return client .rxPublish("/data", Buffer.buffer(payload.encode()), MqttQoS.AT_MOST_ONCE, false, false); })) .subscribe(); }
public static void put(String topic, String clientId, MqttQoS qos) { Message message = new Message(); message.setClientId(clientId); message.setQoS(qos); LinkedList<Message> messages; if (!container.containsKey(topic)) { messages = new LinkedList<>(); messages.add(message); container.put(topic, messages); } else { messages = container.get(topic); if (!messages.contains(message)) { messages.add(message); } } }
/** * Verifies that the adapter does not wait for a telemetry message being settled and accepted * by a downstream peer. * * @param ctx The vert.x test context. */ @Test public void testOnUnauthenticatedMessageDoesNotWaitForAcceptedOutcome(final TestContext ctx) { // GIVEN an adapter with a downstream telemetry consumer final Future<ProtonDelivery> outcome = Future.succeededFuture(mock(ProtonDelivery.class)); givenATelemetrySenderForOutcome(outcome); MqttServer server = getMqttServer(false); AbstractVertxBasedMqttProtocolAdapter<ProtocolAdapterProperties> adapter = getAdapter(server); // WHEN a device publishes a telemetry message final Buffer payload = Buffer.buffer("some payload"); final MqttEndpoint endpoint = mock(MqttEndpoint.class); final MqttPublishMessage messageFromDevice = mock(MqttPublishMessage.class); when(messageFromDevice.topicName()).thenReturn("telemetry/my-tenant/4712"); when(messageFromDevice.qosLevel()).thenReturn(MqttQoS.AT_MOST_ONCE); when(messageFromDevice.payload()).thenReturn(payload); adapter.onUnauthenticatedMessage(endpoint, messageFromDevice).setHandler(ctx.asyncAssertSuccess()); }
/** * Verifies that the adapter maps control messages with QoS 0 published from a Kura gateway to * the Telemetry endpoint. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageMapsKuraControlMessagesToTelemetryApi(final TestContext ctx) { // GIVEN an adapter configured to use the standard topic.control-prefix $EDC // and a custom control message content type config.setCtrlMsgContentType("control-msg"); // WHEN a message is published to a topic with the Kura $EDC prefix as endpoint final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, "$EDC/my-scope/4711"); final Async determineAddressSuccess = ctx.async(); Future<Message> msgTracker = adapter.getDownstreamMessage(message).map(msg -> { determineAddressSuccess.complete(); return msg; }); // THEN the message is forwarded to the telemetry API // and has the custom control message content type determineAddressSuccess.await(2000); assertMessageProperties(msgTracker.result(), config.getCtrlMsgContentType(), TelemetryConstants.TELEMETRY_ENDPOINT, "my-scope", "4711"); }
/** * Verifies that the adapter recognizes control messages published to a topic with a custom control prefix. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageRecognizesControlMessagesWithCustomControlPrefix(final TestContext ctx) { // GIVEN an adapter configured to use a custom topic.control-prefix config.setControlPrefix("bumlux"); // WHEN a message is published to a topic with the custom prefix as endpoint final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, "bumlux/my-scope/4711"); final Async determineAddressSuccess = ctx.async(); Future<Message> msgTracker = adapter.getDownstreamMessage(message).map(msg -> { determineAddressSuccess.complete(); return msg; }); // THEN the message is recognized as a control message and forwarded to the event API determineAddressSuccess.await(2000); assertMessageProperties(msgTracker.result(), config.getCtrlMsgContentType(), TelemetryConstants.TELEMETRY_ENDPOINT, "my-scope", "4711"); }
/** * Verifies that the adapter forwards data messages with QoS 0 published from a Kura gateway to * the Telemetry endpoint. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageMapsKuraDataMessagesToTelemetryApi(final TestContext ctx) { // GIVEN an adapter configured with a custom data message content type config.setDataMsgContentType("data-msg"); // WHEN a message is published to an application topic with QoS 0 final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, "my-scope/4711"); final Async determineAddressSuccess = ctx.async(); Future<Message> msgTracker = adapter.getDownstreamMessage(message).map(msg -> { determineAddressSuccess.complete(); return msg; }); // THEN the message is forwarded to the telemetry API // and has the configured data message content type determineAddressSuccess.await(2000); assertMessageProperties(msgTracker.result(), config.getDataMsgContentType(), TelemetryConstants.TELEMETRY_ENDPOINT, "my-scope", "4711"); }
/** * Verifies that the adapter forwards application messages with QoS 1 published from a Kura gateway to * the Event endpoint. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageMapsKuraDataMessagesToEventApi(final TestContext ctx) { // GIVEN an adapter // WHEN a message is published to an application topic with QoS 1 final MqttPublishMessage message = newMessage(MqttQoS.AT_LEAST_ONCE, "my-scope/4711"); final Async determineAddressSuccess = ctx.async(); Future<Message> msgTracker = adapter.getDownstreamMessage(message).map(msg -> { determineAddressSuccess.complete(); return msg; }); // THEN the message is forwarded to the event API determineAddressSuccess.await(2000); assertMessageProperties(msgTracker.result(), config.getDataMsgContentType(), EventConstants.EVENT_ENDPOINT, "my-scope", "4711"); }
/** * Verifies that the adapter rejects messages published to topics containing an endpoint * other than <em>telemetry</em> or <em>event</em>. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageFailsForUnknownEndpoint(final TestContext ctx) { // GIVEN an adapter // WHEN a device publishes a message to a topic with an unknown endpoint final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, "unknown"); final Async determineAddressFailure = ctx.async(); adapter.getDownstreamMessage(message).recover(t -> { determineAddressFailure.complete(); return Future.failedFuture(t); }); // THEN no downstream sender can be created for the message determineAddressFailure.await(2000); }
/** * Verifies that the adapter rejects QoS 1 messages published to the <em>telemetry</em> endpoint. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageFailsForQoS1TelemetryMessage(final TestContext ctx) { // GIVEN an adapter // WHEN a device publishes a message with QoS 1 to a "telemetry" topic final MqttPublishMessage message = newMessage(MqttQoS.AT_LEAST_ONCE, TelemetryConstants.TELEMETRY_ENDPOINT); final Async determineAddressFailure = ctx.async(); adapter.getDownstreamMessage(message).recover(t -> { determineAddressFailure.complete(); return Future.failedFuture(t); }); // THEN no downstream sender can be created for the message determineAddressFailure.await(2000); }
/** * Verifies that the adapter rejects QoS 1 messages published to the <em>telemetry</em> endpoint. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageFailsForQoS0EventMessage(final TestContext ctx) { // GIVEN an adapter // WHEN a device publishes a message with QoS 0 to an "event" topic final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, EventConstants.EVENT_ENDPOINT); final Async messageFailure = ctx.async(); adapter.getDownstreamMessage(message).recover(t -> { messageFailure.complete(); return Future.failedFuture(t); }); // THEN no downstream sender can be created for the message messageFailure.await(2000); }
/** * Verifies that the adapter fails to map a topic without a tenant ID received from an anonymous device. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageFailsForMissingTenant(final TestContext ctx) { // GIVEN an adapter // WHEN an anonymous device publishes a message to a topic that does not contain a tenant ID final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, TelemetryConstants.TELEMETRY_ENDPOINT); final Async determineAddressFailure = ctx.async(); adapter.getDownstreamMessage(message).recover(t -> { determineAddressFailure.complete(); return Future.failedFuture(t); }); // THEN the message cannot be mapped to an address determineAddressFailure.await(2000); }
/** * Verifies that the adapter fails to map a topic without a device ID received from an anonymous device. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageFailsForMissingDeviceId(final TestContext ctx) { // GIVEN an adapter // WHEN an anonymous device publishes a message to a topic that does not contain a device ID final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, TelemetryConstants.TELEMETRY_ENDPOINT + "/my-tenant"); final Async determineAddressFailure = ctx.async(); adapter.getDownstreamMessage(message).recover(t -> { determineAddressFailure.complete(); return Future.failedFuture(t); }); // THEN the message cannot be mapped to an address determineAddressFailure.await(2000); }
/** * Verifies that the adapter uses an authenticated device's identity when mapping a topic without tenant ID. * * @param ctx The helper to use for running tests on vert.x. */ @Test public void testGetDownstreamMessageUsesDeviceIdentityForTopicWithoutTenant(final TestContext ctx) { // GIVEN an adapter // WHEN an authenticated device publishes a message to a topic that does not contain a tenant ID final MqttPublishMessage message = newMessage(MqttQoS.AT_MOST_ONCE, TelemetryConstants.TELEMETRY_ENDPOINT); final Async determineAddressSuccess = ctx.async(); Future<Message> downstreamMessage = adapter.getDownstreamMessage(message, new Device("my-tenant", "4711")).map(msg -> { determineAddressSuccess.complete(); return msg; }); // THEN the mapped address contains the authenticated device's tenant and device ID determineAddressSuccess.await(2000); final ResourceIdentifier downstreamAddress = ResourceIdentifier.fromString(downstreamMessage.result().getAddress()); assertThat(downstreamAddress.getEndpoint(), is(TelemetryConstants.TELEMETRY_ENDPOINT)); assertThat(downstreamAddress.getTenantId(), is("my-tenant")); assertThat(MessageHelper.getDeviceId(downstreamMessage.result()), is("4711")); }
public static MqttConnectMessage connect(ConnectOptions options) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 10); MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(options.version().protocolName(), options.version().protocolLevel(), options.userName() != null, options.password() != null, options.will() == null ? false : options.will().isRetain(), options.will() == null ? 0 : options.will().qos().value(), options.will() != null, options.cleanSession(), options.keepAliveTimeSeconds()); MqttConnectPayload payload = new MqttConnectPayload(Strings.nullToEmpty(options.clientId()), options.will() == null ? "" : options.will().topicName(), options.will() == null ? "" : new String(options.will().message(), CharsetUtil.UTF_8), Strings.nullToEmpty(options.userName()), Strings.nullToEmpty(options.password())); return new MqttConnectMessage(fixedHeader, variableHeader, payload); }
@Test public void testMatches() throws Exception { String testTopic = "testTopic/test"; Session session = new Session("1", "1", 1, 50, true, null); TopicSubscription ts0 = new TopicSubscription(session.clientId(), "testTopic/#", MqttQoS.AT_MOST_ONCE); TopicSubscription ts1 = new TopicSubscription(session.clientId(), "testTopic/+", MqttQoS.AT_LEAST_ONCE); TopicSubscription ts2 = new TopicSubscription(session.clientId(), testTopic, MqttQoS.EXACTLY_ONCE); TopicSubscription.NEXUS.put(ts0); TopicSubscription.NEXUS.put(ts1); TopicSubscription.NEXUS.put(ts2); Assert.assertEquals(3, TopicSubscription.NEXUS.topicFiltersOf(session.clientId()).size()); TopicSubscription target = session.matches(testTopic); Assert.assertTrue(target.topicFilter().equals(testTopic)); }
@Test public void testWillToNullOnNormalDisconnect() throws Exception { String willTopic = "will"; String message = "ASTALAVISTA"; String clientId = TestUtil.newClientId(); ConnectOptions options = new ConnectOptions(); options.clientId(clientId); options.will( new Message(-1, willTopic, null, message.getBytes(CharsetUtil.UTF_8), MqttQoS.AT_LEAST_ONCE, false)); options.cleanSession(false); MqttClient client0 = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort()); MqttConnectReturnCode ret = client0.connectOptions(options).connect(); Assert.assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, ret); Assert.assertTrue(client0.isConnected()); Assert.assertTrue(Session.NEXUS.get(clientId).will() != null && Session.NEXUS.get(clientId).will().topicName().equals(willTopic)); client0.disconnect(true); Thread.sleep(100); Assert.assertNull(Session.NEXUS.get(clientId).will()); }
/** * Example for handling client subscription request * @param endpoint */ public void example4(MqttEndpoint endpoint) { // handling requests for subscriptions endpoint.subscribeHandler(subscribe -> { List<MqttQoS> grantedQosLevels = new ArrayList<>(); for (MqttTopicSubscription s: subscribe.topicSubscriptions()) { System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService()); grantedQosLevels.add(s.qualityOfService()); } // ack the subscriptions request endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels); }); }
/** * Example for handling publish message to the client * @param endpoint */ public void example7(MqttEndpoint endpoint) { // just as example, publish a message with QoS level 2 endpoint.publish("my_topic", Buffer.buffer("Hello from the Vert.x MQTT server"), MqttQoS.EXACTLY_ONCE, false, false); // specifing handlers for handling QoS 1 and 2 endpoint.publishAcknowledgeHandler(messageId -> { System.out.println("Received ack for message = " + messageId); }).publishReceivedHandler(messageId -> { endpoint.publishRelease(messageId); }).publishCompletionHandler(messageId -> { System.out.println("Received ack for message = " + messageId); }); }
/** * Return an AMQP_SUBSCRIPTIONS message from the raw AMQP one * * @param message raw AMQP message * @return AMQP_SUBSCRIPTIONS message */ @SuppressWarnings("unchecked") public static AmqpSubscriptionsMessage from(Message message) { if (!message.getSubject().equals(AMQP_SUBJECT)) { throw new IllegalArgumentException(String.format("AMQP message subject is no s%", AMQP_SUBJECT)); } Section section = message.getBody(); if ((section != null) && (section instanceof AmqpValue)) { Map<String, String> map = (Map<String, String>) ((AmqpValue) section).getValue(); // build the unique topic subscriptions list List<AmqpTopicSubscription> topicSubscriptions = new ArrayList<>(); for (Map.Entry<String, String> entry: map.entrySet()) { topicSubscriptions.add(new AmqpTopicSubscription(entry.getKey(), MqttQoS.valueOf(Integer.valueOf(entry.getValue())))); } return new AmqpSubscriptionsMessage(topicSubscriptions); } else { throw new IllegalArgumentException("AMQP message wrong body type"); } }
@Override protected void endpointHandler(MqttEndpoint endpoint) { endpoint.subscribeHandler(subscribe -> { List<MqttQoS> qos = new ArrayList<>(); MqttQoS grantedQos = subscribe.topicSubscriptions().get(0).topicName().equals(MQTT_TOPIC_FAILURE) ? MqttQoS.FAILURE : subscribe.topicSubscriptions().get(0).qualityOfService(); qos.add(grantedQos); endpoint.subscribeAcknowledge(subscribe.messageId(), qos); this.async.complete(); }); endpoint.accept(false); }
public Connect(boolean cleanSession, boolean willRetain, MqttQoS willQos, String willTopic, byte[] willMessage) { this.cleanSession = cleanSession; this.willRetain = willRetain; this.willQos = willQos; this.willTopic = willTopic; this.willMessage = willMessage; }
/** * Convert Map to InternalMessage * * @param map Map * @return InternalMessage */ public static InternalMessage mapToInternal(Map<String, String> map) { if (map == null || map.isEmpty()) return null; int type = Integer.parseInt(map.get("type")); if (type == MqttMessageType.PUBLISH.value()) { byte[] payload = null; if (map.get("payload") != null) try { payload = map.get("payload").getBytes("ISO-8859-1"); } catch (UnsupportedEncodingException ignore) { } return new InternalMessage<>( MqttMessageType.PUBLISH, BooleanUtils.toBoolean(map.getOrDefault("dup", "0"), "1", "0"), MqttQoS.valueOf(Integer.parseInt(map.getOrDefault("qos", "0"))), BooleanUtils.toBoolean(map.getOrDefault("retain", "0"), "1", "0"), MqttVersion.valueOf(map.getOrDefault("version", MqttVersion.MQTT_3_1_1.toString())), map.get("clientId"), map.get("userName"), null, new Publish( map.get("topicName"), Integer.parseInt(map.getOrDefault("packetId", "0")), payload )); } else if (type == MqttMessageType.PUBREL.value()) { return new InternalMessage<>( MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, MqttVersion.valueOf(map.getOrDefault("version", MqttVersion.MQTT_3_1_1.toString())), map.get("clientId"), map.get("userName"), null, new PacketId(Integer.parseInt(map.getOrDefault("packetId", "0")))); } else { throw new IllegalArgumentException("Invalid in-flight MQTT message type: " + MqttMessageType.valueOf(type)); } }
@Override public Map<String, MqttQoS> getTopicSubscriptions(List<String> topicLevels) { Map<String, MqttQoS> map = new HashMap<>(); Map<String, String> subscriptions; if (Topics.isTopicFilter(topicLevels)) { subscriptions = hash().hgetall(RedisKey.topicFilter(topicLevels)); } else { subscriptions = hash().hgetall(RedisKey.topicName(topicLevels)); } if (subscriptions != null) { subscriptions.forEach((topic, qos) -> map.put(topic, MqttQoS.valueOf(Integer.parseInt(qos)))); } return map; }
@Override public Map<String, MqttQoS> getClientSubscriptions(String clientId) { Map<String, MqttQoS> map = new HashMap<>(); Map<String, String> subscriptions = hash().hgetall(RedisKey.subscription(clientId)); if (subscriptions != null) { subscriptions.forEach((topic, qos) -> map.put(topic, MqttQoS.valueOf(Integer.parseInt(qos)))); } return map; }
@Override public void getMatchSubscriptions(List<String> topicLevels, Map<String, MqttQoS> map) { if (Topics.isTopicFilter(topicLevels)) { throw new IllegalArgumentException("it must be topic name not topic filter"); } // topic name Map<String, MqttQoS> subscriptions = getTopicSubscriptions(topicLevels); if (subscriptions != null) { map.putAll(subscriptions); } // topic filter getMatchSubscriptions(topicLevels, 0, map); }
public static void put(String topic, String payload, MqttQoS qoS) { if (qoS == MqttQoS.AT_MOST_ONCE) { // TODO: 2017/9/19 查找是否存在此主题,并从消息列表删除 } Message message = new Message(); message.setTopic(topic); message.setPayload(payload); message.setQoS(qoS); container.put(topic, message); }
public MqttMessage doMessage(Channel channel, MqttMessage msg) { String channelId = channel.id().asLongText(); logger.debug("MQTT PINGREQ " + channelId); // 更新最新连接时间 ApplicationContext.updateChannelConTime(channelId); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage message = new MqttMessage(fixedHeader); return message; }
@Override public void send(String address, byte[] data, Handler<String> sendCompletionHandler) { this.vertx.runOnContext(c -> { this.client.publish(address, Buffer.buffer(data), MqttQoS.AT_MOST_ONCE, false, false, done -> { if (done.succeeded()) { if (sendCompletionHandler != null) { sendCompletionHandler.handle(done.result().toString()); } } }); }); }
@Override public void receive(String address) { this.vertx.runOnContext(c -> { this.client.subscribe(address, MqttQoS.AT_MOST_ONCE.value(), done -> { if (done.succeeded()) { log.info("Subscription request [{}] for {}", done.result(), address); } else { log.error("Error subscribing to {}", address, done.cause()); } }); }); }
@Override public void onMsg(SessionCtrlMsg msg) throws SessionException { if (msg instanceof SessionCloseMsg) { pushToNetwork( new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0))); channel.close(); } }
private MqttPublishMessage createMqttPublishMsg(DeviceSessionCtx ctx, String topic, JsonElement json) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0); MqttPublishVariableHeader header = new MqttPublishVariableHeader(topic, ctx.nextMsgId()); ByteBuf payload = ALLOCATOR.buffer(); payload.writeBytes(GSON.toJson(json).getBytes(UTF8)); return new MqttPublishMessage(mqttFixedHeader, header, payload); }
private Future<Void> doUploadMessage(final Message message, final MqttEndpoint endpoint, final MqttPublishMessage messageFromDevice, final MessageSender sender) { return sender.send(message).map(delivery -> { if (EventConstants.EVENT_ENDPOINT.equals(sender.getEndpoint())) { // check that the remote MQTT client is still connected before sending PUBACK if (endpoint.isConnected() && messageFromDevice.qosLevel() == MqttQoS.AT_LEAST_ONCE) { endpoint.publishAcknowledge(messageFromDevice.messageId()); } } return (Void) null; }); }
/** * Verifies that the adapter waits for an event being settled and accepted * by a downstream peer before sending a PUBACK package to the device. * * @param ctx The vert.x test context. */ @Test public void testOnUnauthenticatedMessageSendsPubAckOnSuccess(final TestContext ctx) { // GIVEN an adapter with a downstream event consumer final Future<ProtonDelivery> outcome = Future.future(); givenAnEventSenderForOutcome(outcome); MqttServer server = getMqttServer(false); AbstractVertxBasedMqttProtocolAdapter<ProtocolAdapterProperties> adapter = getAdapter(server); // WHEN a device publishes an event final Buffer payload = Buffer.buffer("some payload"); final MqttEndpoint endpoint = mock(MqttEndpoint.class); when(endpoint.isConnected()).thenReturn(Boolean.TRUE); final MqttPublishMessage messageFromDevice = mock(MqttPublishMessage.class); when(messageFromDevice.topicName()).thenReturn("event/my-tenant/4712"); when(messageFromDevice.qosLevel()).thenReturn(MqttQoS.AT_LEAST_ONCE); when(messageFromDevice.payload()).thenReturn(payload); when(messageFromDevice.messageId()).thenReturn(5555555); adapter.onUnauthenticatedMessage(endpoint, messageFromDevice).setHandler(ctx.asyncAssertSuccess()); // THEN the device does not receive a PUBACK verify(endpoint, never()).publishAcknowledge(anyInt()); // until the event has been settled and accepted outcome.complete(mock(ProtonDelivery.class)); verify(endpoint).publishAcknowledge(5555555); }
/** * Verifies that the adapter does not send a PUBACK package to the device if * the message has not been accepted by the peer. * * @param ctx The vert.x test context. */ @Test public void testOnUnauthenticatedMessageDoesNotSendPubAckOnFailure(final TestContext ctx) { // GIVEN an adapter with a downstream event consumer final Future<ProtonDelivery> outcome = Future.future(); givenAnEventSenderForOutcome(outcome); MqttServer server = getMqttServer(false); AbstractVertxBasedMqttProtocolAdapter<ProtocolAdapterProperties> adapter = getAdapter(server); // WHEN a device publishes an event that is not accepted // by the peer final Buffer payload = Buffer.buffer("some payload"); final MqttEndpoint endpoint = mock(MqttEndpoint.class); when(endpoint.isConnected()).thenReturn(Boolean.TRUE); final MqttPublishMessage messageFromDevice = mock(MqttPublishMessage.class); when(messageFromDevice.topicName()).thenReturn("event/my-tenant/4712"); when(messageFromDevice.qosLevel()).thenReturn(MqttQoS.AT_LEAST_ONCE); when(messageFromDevice.payload()).thenReturn(payload); when(messageFromDevice.messageId()).thenReturn(5555555); adapter.onUnauthenticatedMessage(endpoint, messageFromDevice).setHandler(ctx.asyncAssertFailure()); outcome.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST)); // THEN the device has not received a PUBACK verify(endpoint, never()).publishAcknowledge(5555555); }
private static String getEndpoint(final MqttQoS level) { switch(level) { case AT_MOST_ONCE: return TelemetryConstants.TELEMETRY_ENDPOINT; default: return EventConstants.EVENT_ENDPOINT; } }
private Future<ResourceIdentifier> mapTopic(final MqttPublishMessage message) { try { final ResourceIdentifier topic = ResourceIdentifier.fromString(message.topicName()); if (TelemetryConstants.TELEMETRY_ENDPOINT.equals(topic.getEndpoint())) { if (!MqttQoS.AT_MOST_ONCE.equals(message.qosLevel())) { // client tries to send telemetry message using QoS 1 or 2 return Future.failedFuture("Only QoS 0 supported for telemetry messages"); } else { return Future.succeededFuture(topic); } } else if (EventConstants.EVENT_ENDPOINT.equals(topic.getEndpoint())) { if (!MqttQoS.AT_LEAST_ONCE.equals(message.qosLevel())) { // client tries to send event message using QoS 0 or 2 return Future.failedFuture("Only QoS 1 supported for event messages"); } else { return Future.succeededFuture(topic); } } else { // MQTT client is trying to publish on a not supported endpoint LOG.debug("no such endpoint [{}]", topic.getEndpoint()); return Future.failedFuture("no such endpoint"); } } catch (IllegalArgumentException e) { return Future.failedFuture(e); } }
public static MqttConnAckMessage connack(MqttConnectReturnCode returnCode, boolean sessionPresent) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 2); MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent); return new MqttConnAckMessage(fixedHeader, variableHeader); }
public static MqttPubAckMessage puback(int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); return new MqttPubAckMessage(fixedHeader, variableHeader); }
public static MqttMessage pubrec(int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 2); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); return new MqttMessage(fixedHeader, variableHeader); }