public static void subscribe(MqttSubscribeMessage message, ClientIdentifier identifier){ for(MqttTopicSubscription topicSubscription : message.payload().topicSubscriptions()){ Topic topic = new Topic(topicSubscription.topicName()); Topic.Element element = topic.getHeadElement(); if (element == null) return; SubscriptionTree cachedTree = subscriptions.get(element.getValue()); if(cachedTree == null){ SubscriptionTree tree = new SubscriptionTree(); tree.build(topic); subscriptions.putIfAbsent(element.getValue(), tree); tree.subscribe(topic,identifier); } else { cachedTree.subscribe(topic,identifier); } } }
boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration) { synchronized (subscriptions) { addressMessageMap.putIfAbsent(MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<Long, Integer>()); MqttTopicSubscription existingSubscription = subscriptions.get(subscription.topicName()); if (existingSubscription != null) { if (subscription.qualityOfService().value() > existingSubscription.qualityOfService().value()) { subscriptions.put(subscription.topicName(), subscription); return true; } } else { subscriptions.put(subscription.topicName(), subscription); return true; } } return false; }
private void addSubscription(MqttTopicSubscription subscription) throws Exception { String topicName = CompositeAddress.extractAddressName(subscription.topicName()); MqttTopicSubscription s = session.getSessionState().getSubscription(topicName); int qos = subscription.qualityOfService().value(); String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topicName, session.getWildcardConfiguration()); session.getSessionState().addSubscription(subscription, session.getWildcardConfiguration()); Queue q = createQueueForSubscription(coreAddress, qos); if (s == null) { createConsumerForSubscriptionQueue(q, topicName, qos); } else { consumerQoSLevels.put(consumers.get(topicName).getID(), qos); } session.getRetainMessageManager().addRetainedMessagesToQueue(q, topicName); }
@Override public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions) { List<MqttGrantedQoS> r = new ArrayList<>(); requestSubscriptions.forEach(subscription -> { if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.FAILURE); if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.FAILURE); r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value())); }); return r; }
@Override public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions) { List<MqttGrantedQoS> r = new ArrayList<>(); requestSubscriptions.forEach(subscription -> { if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.FAILURE); if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.FAILURE); if (!subscription.topic().endsWith("downstream")) r.add(MqttGrantedQoS.FAILURE); if (subscription.topic().indexOf(clientId) == -1) r.add(MqttGrantedQoS.FAILURE); r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value())); }); return r; }
public static MqttSubscribeMessage subscribe(int messageId, MqttTopicSubscription... topicSubscriptions) { int topicNameSize = 0; int topicCount = topicSubscriptions.length; for (MqttTopicSubscription item : topicSubscriptions) { topicNameSize += item.topicName().getBytes(CharsetUtil.UTF_8).length; } MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 2 + topicNameSize + topicCount); MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); MqttSubscribePayload payload = new MqttSubscribePayload(Lists.newArrayList(topicSubscriptions)); return new MqttSubscribeMessage(fixedHeader, variableHeader, payload); }
@Override public List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions) { List<MqttGrantedQoS> r = new ArrayList<>(); requestSubscriptions.forEach(subscription -> { if (!this.allowDollar && subscription.topic().startsWith("$")) r.add(MqttGrantedQoS.NOT_GRANTED); else if (subscription.topic().equals(this.deniedTopic)) r.add(MqttGrantedQoS.NOT_GRANTED); else r.add(MqttGrantedQoS.valueOf(subscription.requestedQos().value())); }); return r; }
synchronized void start() throws Exception { for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) { String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName(), session.getWildcardConfiguration()); Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value()); createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value()); } }
/** * As per MQTT Spec. Subscribes this client to a number of MQTT topics. * * @param subscriptions * @return An array of integers representing the list of accepted QoS for each topic. * @throws Exception */ int[] addSubscriptions(List<MqttTopicSubscription> subscriptions) throws Exception { int[] qos = new int[subscriptions.size()]; for (int i = 0; i < subscriptions.size(); i++) { addSubscription(subscriptions.get(i)); qos[i] = subscriptions.get(i).qualityOfService().value(); } return qos; }
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { if (!checkConnected(ctx)) { return; } log.trace("[{}] Processing subscription [{}]!", sessionId, mqttMsg.variableHeader().messageId()); List<Integer> grantedQoSList = new ArrayList<>(); for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topicName = subscription.topicName(); // TODO: handle this qos level. MqttQoS reqQoS = subscription.qualityOfService(); try { if (topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) { // AdaptorToSessionActorMsg msg = // adaptor.convertToActorMsg(deviceSessionCtx, // SUBSCRIBE_ATTRIBUTES_REQUEST, // mqttMsg); // BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new // BasicToDeviceActorSessionMsg( // deviceSessionCtx.getDevice(), msg); // processor.process(basicToDeviceActorSessionMsg); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_RPC_REQUESTS_SUB_TOPIC)) { // AdaptorToSessionActorMsg msg = // adaptor.convertToActorMsg(deviceSessionCtx, // SUBSCRIBE_RPC_COMMANDS_REQUEST, // mqttMsg); // processor.process(new // BasicToDeviceActorSessionMsg(deviceSessionCtx.getDevice(), msg)); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_RPC_RESPONSE_SUB_TOPIC)) { grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_ATTRIBUTES_RESPONSES_TOPIC)) { deviceSessionCtx.setAllowAttributeResponses(); grantedQoSList.add(getMinSupportedQos(reqQoS)); } else if (topicName.equals(DEVICE_TELEMETRY_TOPIC)) { grantedQoSList.add(getMinSupportedQos(reqQoS)); } else { log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); grantedQoSList.add(FAILURE.value()); } ChannelEntity channelEntity = new TcpChannelEntity(ctx.channel()); MemoryMetaPool.registerTopic(channelEntity, topicName); } catch (Exception e) { e.printStackTrace(); log.warn("[{}] Failed to subscribe to [{}][{}]", sessionId, topicName, reqQoS); grantedQoSList.add(FAILURE.value()); } } ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), grantedQoSList)); }
public void subscribe(MqttTopicSubscription... topicSubscriptions) throws InterruptedException { send(MqttMessageFactory.subscribe(nextMessageId(), topicSubscriptions)); // TODO error handling,store subscription }
Collection<MqttTopicSubscription> getSubscriptions() { return subscriptions.values(); }
MqttTopicSubscription getSubscription(String address) { return subscriptions.get(address); }
void clean() throws Exception { for (MqttTopicSubscription mqttTopicSubscription : session.getSessionState().getSubscriptions()) { removeSubscription(mqttTopicSubscription.topicName()); } }
/** * Authorize client SUBSCRIBE * * @param clientId Client Id * @param userName User Name * @param requestSubscriptions List of request Topic Subscription * @return List of granted QoS */ List<MqttGrantedQoS> authSubscribe(String clientId, String userName, List<MqttTopicSubscription> requestSubscriptions);
@Test public void persistMessagesForDisconnectedPersistantSession() throws Exception { String clientId = TestUtil.newClientId(); ConnectOptions options = new ConnectOptions(); options.clientId(clientId); options.cleanSession(false); String topicName = "testTopic"; String message = "test message"; MqttClient client = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort()); client.connectOptions(options).connect(); client.subscribe(new MqttTopicSubscription(topicName, MqttQoS.EXACTLY_ONCE)); client.disconnect(true); Thread.sleep(100); Assert.assertNotNull(Session.NEXUS.get(clientId)); String publisherId = TestUtil.newClientId(); MqttClient publisher = new MqttClient("mqtt://localhost:" + Settings.INSTANCE.mqttPort()); ConnectOptions pubOptions = new ConnectOptions(); pubOptions.clientId(publisherId); pubOptions.cleanSession(true); int messageId = 1; publisher.connectOptions(pubOptions).connect(); publisher.publish(new Message(messageId, topicName, publisherId, message.getBytes(CharsetUtil.UTF_8), MqttQoS.EXACTLY_ONCE, false)); Thread.sleep(100); publisher.disconnect(true); Thread.sleep(1000); Assert.assertNull(Session.NEXUS.get(publisherId)); Topic topic = Topic.NEXUS.get(topicName); Assert.assertNotNull(topic); OutboundMessageStatus status = OutboundMessageStatus.NEXUS.getBy(messageId, clientId); Assert.assertNotNull(status); Assert.assertTrue(Messages.key(publisherId, messageId).equals(status.messageKey())); }