/** * get MqttClient by clientKey * @param clientKey * @return * @throws MqttException */ public static MqttClient getMqttClient(String serverURI, String clientId,StringRedisTemplate redisTemplate) throws MqttException{ String clientKey=serverURI.concat(clientId); if(clientMap.get(clientKey)==null){ lock.lock(); if(clientMap.get(clientKey)==null){ MqttClientPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(serverURI, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); MqttCallback callback = new IMMqttCallBack(client,redisTemplate); client.setCallback(callback); connOpts.setCleanSession(true); client.connect(connOpts); clientMap.put(clientKey, client); } lock.unlock(); } return clientMap.get(clientKey); }
protected void mqttCallback() { client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { msg("Connection lost..."); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { TextView tvMessage = (TextView) findViewById(R.id.tvMessage); tvMessage.setText(message.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); }
/** * Registers a callback for receiving MQTT messages. * * @param topic * topic to listen for messages. * @param callback * listener to register. */ public void addListener(final String topic, final MqttCallback callback) { logger.info("Adding listener {} to topic '{}'", callback.getClass().getSimpleName(), topic); final String fullTopic = this.topicPrefix + topic; Collection<MqttCallback> callbacksForTopic = this.listeners.get(fullTopic); if (callbacksForTopic == null) { callbacksForTopic = new ArrayList<MqttCallback>(); try { this.listeners.put(fullTopic, callbacksForTopic); this.client.subscribe(fullTopic); logger.info("Subscribed to topic '{}'", fullTopic); } catch (MqttException e) { logger.error("Error registering listener", e); } } callbacksForTopic.add(callback); }
@Override public void messageArrived(final String topic, final MqttMessage message) throws Exception { logger.info("Message arrived at '{}' with payload {}", topic, message); final Collection<MqttCallback> callbacks = this.listeners.get(topic); if (callbacks != null) { for (final MqttCallback mqttCallback : callbacks) { try { mqttCallback.messageArrived(topic, message); } catch (Exception e) { logger.warn("Exception in message processing", e); } } } /* * Delegates, but cuts-off the prefix. */ if (deliverSignals) { signalReceiver.messageArrived(topic.substring(topicPrefix.length()), message); } }
/** Attaches the callback used when configuration changes occur. */ public static void attachCallback(MqttClient client, String deviceId) throws MqttException { mCallback = new MqttCallback() { @Override public void connectionLost(Throwable cause) { // Do nothing... } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload()); System.out.println("Payload : " + payload); // TODO: Insert your parsing / handling of the configuration message here. } @Override public void deliveryComplete(IMqttDeliveryToken token) { // Do nothing; } }; String configTopic = String.format("/devices/%s/config", deviceId); client.subscribe(configTopic, 1); client.setCallback(mCallback); }
@Test public void testMqtt() throws MqttException, InterruptedException { final CountDownLatch messageReceived = new CountDownLatch(1); MqttClient client = new MqttClient(mqttServer.getConnectString(), UUID.randomUUID().toString(), new MemoryPersistence()); client.connect(); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { messageReceived.countDown(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); client.subscribe(TOPIC_NAME_TEST); client.publish(TOPIC_NAME_TEST, UUID.randomUUID().toString().getBytes(), 0, false); assertTrue(messageReceived.await(MQTT_TIMEOUT_MS, TimeUnit.MILLISECONDS)); client.disconnect(); }
private MqttCallback createCallback() { return new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { messageIds.add(message.getId()); messageArrived = true; } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable cause) { } }; }
public GUIApplication() throws MqttException { empf = builder.uri("tcp://" + BarometerApplication.BROKER + ":1883").clientUID("ch.quantasy.knr1.GUIApplication").build(); empf.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String str, MqttMessage mqttMessage) throws Exception { byte[] payload = mqttMessage.getPayload(); ByteBuffer bb = ByteBuffer.wrap(payload); AltitudeProfileView.addBarometricAltitudeData(bb.getInt()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); empf.connect(options); empf.subscribe(BarometerApplication.TOPIC + "+", 0); }
private void subscribe(MqttClient client) throws Exception { client.subscribe(TOPIC); client.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { setResponse(new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable ex) { setError(ex); } }); }
private void subscribe(MqttClient client) throws Exception { client.subscribe(TOPIC); client.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println(new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable ex) { ex.printStackTrace(); } }); }
public MqttAndroidClient createClient(String id, String serverURI, String clientId) { MqttClientPersistence mqttClientPersistence = new MemoryPersistence(); MqttAndroidClient client = new MqttAndroidClient(MyApplication.getContext(), serverURI, clientId, mqttClientPersistence); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { LogUtil.e("connectionLost"); EventBus.getDefault().post(new MQTTActionEvent(Constant.MQTTStatusConstant.CONNECTION_LOST, null, cause)); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { LogUtil.d("topic is " + topic + ",message is " + message.toString() + ", qos is " + message.getQos()); EventBus.getDefault().postSticky(new MessageEvent(new EmqMessage(topic, message))); } @Override public void deliveryComplete(IMqttDeliveryToken token) { LogUtil.d("deliveryComplete"); } }); mClients.put(id, client); return client; }
private void setupMqtt ( final Hive hive, final Session session ) throws MqttException { this.client.setCallback ( new MqttCallback () { @Override public void messageArrived ( final String topic, final MqttMessage message ) throws Exception { logger.trace ( "received message '{}' on topic '{}'", message, topic ); MqttExporter.this.executor.submit ( new Callable<Void> () { @Override public Void call () throws Exception { final String itemId = MqttExporter.this.itemsToWriteTopics.inverse ().get ( topic ); if ( itemId != null ) { writeMessageToItem ( hive, session, itemId, message ); } else { logger.warn ( "received message '{}' on topic '{}' but no corresponding item found", message, topic ); } return null; } } ); } @Override public void deliveryComplete ( final IMqttDeliveryToken token ) { } @Override public void connectionLost ( final Throwable th ) { // TODO: implement this logger.warn ( "connectionLost", th ); } } ); }
@Test public void itCanSubscribeMultipleMessages() throws Throwable { AsyncPahoUtils.connect(this.asyncClient); CountDownLatch latch = new CountDownLatch(4); AtomicInteger messageCount = new AtomicInteger(0); // Callback to monitor delivery completion this.asyncClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage m) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken t) { latch.countDown(); } @Override public void connectionLost(Throwable cause) { } }); // Subscribe this.observableClient.subscribe(TOPIC, 1).subscribe(r -> { messageCount.incrementAndGet(); latch.countDown(); }); // Publish a test message AsyncPahoUtils.publish(asyncClient, TOPIC, new byte[] { 'a', 'b', 'c' }); AsyncPahoUtils.publish(asyncClient, TOPIC, new byte[] { 'd', 'e', 'f' }); // Await for async completion latch.await(); Assert.assertEquals(2, messageCount.get()); }
/** * Set the callback to trigger on message arrival * * @param listener the listener implementation */ public void setCallback(MessageEventListener listener) { if (listener == null) { logger.debug("Clear callback"); getMqttClient().setCallback(null); return; } logger.debug("Set callback"); getMqttClient().setCallback(new MqttCallback() { @Override public void connectionLost(Throwable thrwbl) { logger.warn("Connection to MQTT server lost, reconnecting", thrwbl); if (!getMqttClient().isConnected()) { mqttClient = null; getMqttClient(); } } @Override public void messageArrived(String mqttTopic, MqttMessage mqttMessage) throws Exception { MessageEventListener.Message message = new MessageEventListener.Message(); message.topic = mqttTopic; message.content = new String(mqttMessage.getPayload()); logger.debug("New message received on {}", message.topic, message.content); listener.onMessage(message); } @Override public void deliveryComplete(IMqttDeliveryToken imdt) { } }); }
public MQTTconect(Context contexto,MqttCallback mqttcallback) { //gera um código randômico que serve como identificação do cliente clientID = MqttClient.generateClientId()+"circularUFPAapp"; //cria um objeto MQTTClient android entregando como parametro o endereço o servidor e o id do cliente mqttAndroidClient = new MqttAndroidClient(contexto, serverAndress, clientID); //configura um objeto CallBack (objeto de chamada caso haja alteração) mqttAndroidClient.setCallback(mqttcallback); }
/** * Default handler has only organization ID * * @param orgId org_id is your unique organization ID, assigned when you sign up with the service. It will be a 6 character alphanumeric string */ public IotHandlerAbstr(Context context, String orgId) { this.mContext = context; this.mOrgId = orgId; this.mClientCb = new MqttCallback() { @Override public void connectionLost(Throwable cause) { connected = false; for (int i = 0; i < mMessageCallbacksList.size(); i++) { mMessageCallbacksList.get(i).connectionLost(cause); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { for (int i = 0; i < mMessageCallbacksList.size(); i++) { mMessageCallbacksList.get(i).messageArrived(topic, mqttMessage); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { for (int i = 0; i < mMessageCallbacksList.size(); i++) { mMessageCallbacksList.get(i).deliveryComplete(token); } } }; }
/** * get MqttClient by clientKey * * @param clientKey * @return * @throws MqttException * @throws NoSuchAlgorithmException */ private MqttClient getMqttClient(String clientId) throws MqttException { MqttClientPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(broker_address, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); MqttCallback callback = new MqttCallback() { public void messageArrived(String topic, MqttMessage message) throws Exception { long arriveTime = System.currentTimeMillis(); String msgID = message.toString(); for(MsgHandleInterface handle : handleList) handle.handle(msgID,topic); Object[] str = {msgID,arriveTime}; arriveQueue.put(str); } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectionLost(Throwable cause) { } }; client.setCallback(callback); connOpts.setKeepAliveInterval(3600); connOpts.setCleanSession(true); client.connect(connOpts); return client; }
@Override protected void connectionOpened() { Logging.log.info("Publisher on '{}' connected, publishing messages", this.getTopic()); this.client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { Logging.log.info("Delivered message-id {}", iMqttDeliveryToken.getMessageId()); numSent.incrementAndGet(); sendNext(); } }); this.connectLatch.countDown(); this.sendNext(); }
private MqttCallback createCallback() { return new MqttCallback() { public void connectionLost(Throwable cause) { logger.warn("Connection to mqtt broker lost"); do { try { SECONDS.sleep(1); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } try { logger.info("Trying to reconnect"); listenToMqtt(); } catch (Exception e) { logger.warn("Reconnect failed", e); } } while (!MqttClient.this.client.isConnected()); logger.info("Successfully reconnected"); } public void messageArrived(String topic, MqttMessage message) throws IOException { String payload = new String(message.getPayload()); logger.debug( "Received mqtt message, sending to arduino {} {}", topic, payload); MqttClient.this.toArduino(topic, payload); } public void deliveryComplete(IMqttDeliveryToken token) { // nothing to do } }; }
@Test(timeout = 300000) public void testSendAndReceiveMQTT() throws Exception { final CountDownLatch latch = new CountDownLatch(1); MqttClient consumer = createPahoClient("consumerId"); MqttClient producer = createPahoClient("producerId"); consumer.connect(); consumer.subscribe("test"); consumer.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { latch.countDown(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); producer.connect(); producer.publish("test", "hello".getBytes(), 1, false); waitForLatch(latch); producer.disconnect(); producer.close(); }
public TimeAgent() throws MqttException { receiver = builder.uri("tcp://" + MqttTimeService.BROKER + ":1883").clientUID("ch.quantasy.mqttfirst.TimeService").build(); receiver.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String str, MqttMessage mqttMessage) throws Exception { if (System.currentTimeMillis() > count + 500) { count = System.currentTimeMillis(); byte[] payload = mqttMessage.getPayload(); lastMessage = new String(payload); System.out.println("s = " + str + " msg " + lastMessage); } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); receiver.connect(options); receiver.subscribe(MqttTimeService.TOPIC, 1); }
/** * Create the Paho MQTT Client that will underpin the Device client. * @param callback * MqttCallback * @see <a href="http://www.eclipse.org/paho/files/javadoc/index.html">Paho Client Library</a> * */ protected void createClient(MqttCallback callback) { LoggerUtility.info(CLASS_NAME, "createClient", "Org ID = " + getOrgId() + "\n Client ID = " + clientId); this.mqttAsyncClient = null; this.mqttClientOptions = new MqttConnectOptions(); this.mqttCallback = callback; }
public void start(MqttCallback callback) { try { mqttClient.setCallback(callback); mqttClient.connect(); //Subscribe to all subtopics of com.thingtrack/vaadin-desktop/* mqttClient.subscribe("com.thingtrack/vaadin/#"); } catch (MqttException e) { e.printStackTrace(); System.exit(1); } }
public Builder setMqttCallback(final MqttCallback mqttCallback) { this.client.setCallback(Objects.requireNonNull(mqttCallback)); return this; }
@Test public void itCanSubscribe() throws Throwable { AsyncPahoUtils.connect(this.asyncClient); CountDownLatch latch = new CountDownLatch(2); AtomicReference<IMqttDeliveryToken> token = new AtomicReference<>(); AtomicReference<MqttMessage> result = new AtomicReference<>(); // Callback to monitor delivery completion this.asyncClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage m) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken t) { token.set(t); latch.countDown(); } @Override public void connectionLost(Throwable cause) { } }); // Subscribe MqttMessage expected = MqttMessage.create(0, new byte[] { 'a', 'b', 'c' }, 1, false); this.observableClient.subscribe(TOPIC, 1).subscribe(r -> { result.set(r); latch.countDown(); }); // Publish a test message AsyncPahoUtils.publish(asyncClient, TOPIC, expected.getPayload()); // Await for async completion latch.await(); Assert.assertNotNull(result.get()); Assert.assertArrayEquals(expected.getPayload(), result.get().getPayload()); Assert.assertNotNull(token.get()); }
@Test public void itCanPublish() throws Throwable { AsyncPahoUtils.connect(this.asyncClient); CountDownLatch latch = new CountDownLatch(2); AtomicReference<IMqttDeliveryToken> token = new AtomicReference<>(); AtomicReference<PublishToken> pubToken = new AtomicReference<>(); // Callback to monitor delivery completion this.asyncClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, org.eclipse.paho.client.mqttv3.MqttMessage message) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken t) { token.set(t); latch.countDown(); } @Override public void connectionLost(Throwable cause) { } }); // Publish the message MqttMessage msg = MqttMessage.create(0, new byte[] { 'a', 'b', 'c' }, 1, false); Single<PublishToken> obs = this.observableClient.publish(TOPIC, msg); // Subscribe for result obs.subscribe(r -> { pubToken.set(r); latch.countDown(); }); // Await for async completion latch.await(); IMqttDeliveryToken iMqttDeliveryToken = token.get(); PublishToken publishToken = pubToken.get(); Assert.assertNotNull(iMqttDeliveryToken); Assert.assertNotNull(publishToken); Assert.assertNotNull(publishToken.getClientId()); Assert.assertEquals(iMqttDeliveryToken.getClient().getClientId(), publishToken.getClientId()); Assert.assertNotNull(publishToken.getMessageId()); Assert.assertEquals(iMqttDeliveryToken.getMessageId(), publishToken.getMessageId()); Assert.assertNotNull(publishToken.getTopics()); Assert.assertArrayEquals(iMqttDeliveryToken.getTopics(), publishToken.getTopics()); Assert.assertNotNull(publishToken.getMessageId()); Assert.assertEquals(iMqttDeliveryToken.getMessageId(), publishToken.getMessageId()); System.out.println(publishToken.getClientId()); System.out.println(publishToken.getMessageId()); System.out.println(publishToken.getSessionPresent()); for (String s: publishToken.getTopics()) { System.out.println(s); } }
@Test public void keepAliveTimeout(TestContext context) { Async async = context.async(); int keepAliveInterval = 5; // MQTT spec : server will wait half more then the keepAliveInterval int timeout = keepAliveInterval + keepAliveInterval / 2; try { MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(String.format("tcp://%s:%d", Proxy.SERVER_HOST, Proxy.SERVER_PORT), "12345", persistence); MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(false); options.setKeepAliveInterval(keepAliveInterval); this.started = System.currentTimeMillis(); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { ended = System.currentTimeMillis(); log.info("Elapsed : " + (ended - started)); async.complete(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); client.connect(options); vertx.setTimer(1000, t -> { proxy.pause(); }); async.await(); long elapsed = ended - started; // consider a range of 500 ms context.assertTrue(elapsed > (timeout * 1000 - 500) && elapsed < (timeout * 1000 + 500)); } catch (MqttException e) { context.fail(e); } }
private void init() { try { //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(host, "test", new MemoryPersistence()); //MQTT的连接设置 options = new MqttConnectOptions(); //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); //设置连接的用户名 //options.setUserName(userName); //设置连接的密码 //options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); //设置回调 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { //连接丢失后,一般在这里面进行重连 System.out.println("connectionLost----------"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //publish后会执行到这里 System.out.println("deliveryComplete---------" + token.isComplete()); } @Override public void messageArrived(String topicName, MqttMessage message) throws Exception { //subscribe后得到的消息会执行到这里面 System.out.println("messageArrived----------"); Message msg = new Message(); msg.what = 1; msg.obj = topicName + "---" + message.toString(); handler.sendMessage(msg); } }); } catch (Exception e) { e.printStackTrace(); } }
public void run(String... args) { System.out.println("TopicSubscriber initializing..."); String host = args[0]; String username = args[1]; String password = args[2]; if (!host.startsWith("tcp://")) { host = "tcp://" + host; } try { // Create an Mqtt client MqttClient mqttClient = new MqttClient(host, "HelloWorldSub"); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); // Connect the client System.out.println("Connecting to Solace messaging at "+host); mqttClient.connect(connOpts); System.out.println("Connected"); // Latch used for synchronizing b/w threads final CountDownLatch latch = new CountDownLatch(1); // Topic filter the client will subscribe to final String subTopic = "T/GettingStarted/pubsub"; // Callback - Anonymous inner-class for receiving messages mqttClient.setCallback(new MqttCallback() { public void messageArrived(String topic, MqttMessage message) throws Exception { // Called when a message arrives from the server that // matches any subscription made by the client String time = new Timestamp(System.currentTimeMillis()).toString(); System.out.println("\nReceived a Message!" + "\n\tTime: " + time + "\n\tTopic: " + topic + "\n\tMessage: " + new String(message.getPayload()) + "\n\tQoS: " + message.getQos() + "\n"); latch.countDown(); // unblock main thread } public void connectionLost(Throwable cause) { System.out.println("Connection to Solace messaging lost!" + cause.getMessage()); latch.countDown(); } public void deliveryComplete(IMqttDeliveryToken token) { } }); // Subscribe client to the topic filter and a QoS level of 0 System.out.println("Subscribing client to topic: " + subTopic); mqttClient.subscribe(subTopic, 0); System.out.println("Subscribed"); // Wait for the message to be received try { latch.await(); // block here until message received, and latch will flip } catch (InterruptedException e) { System.out.println("I was awoken while waiting"); } // Disconnect the client mqttClient.disconnect(); System.out.println("Exiting"); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } }
private void receiveData() { try { applyProperties(); setRunningState(RunningState.STARTED); String url = (ssl ? "ssl://" : "tcp://") + host + ":" + Integer.toString(port); mqttClient = new MqttClient(url, MqttClient.generateClientId(), new MemoryPersistence()); mqttClient.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { receive(message.getPayload()); } catch (RuntimeException e) { e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { // not used } @Override public void connectionLost(Throwable cause) { log.error("CONNECTION_LOST", cause.getLocalizedMessage()); } }); MqttConnectOptions options = new MqttConnectOptions(); // Connect with username and password if both are available. if (username != null && password != null && !username.isEmpty() && password.length > 0) { options.setUserName(username); options.setPassword(password); } if (ssl) { // Support TLS only (1.0-1.2) as even SSL 3.0 has well known exploits java.util.Properties sslProperties = new java.util.Properties(); sslProperties.setProperty("com.ibm.ssl.protocol", "TLS"); options.setSSLProperties(sslProperties); } options.setCleanSession(true); mqttClient.connect(options); mqttClient.subscribe(topic, qos); } catch (Throwable ex) { log.error("UNEXPECTED_ERROR", ex); setRunningState(RunningState.ERROR); } }
public void setCallBack(MqttCallback callback) { mqttClient.setCallback(callback); this.callback = callback; }
public MqttCallback getMqttCallback() { return this.callback; }
private static void startSubsriber() throws MqttException { client = new MqttClient("tcp://localhost:1883", "TestClient", new MemoryPersistence()); MqttCallback callback = new MqttCallback() { @Override public void connectionLost(Throwable cause) { fail(cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { byte[] data = message.getPayload(); int seqByte = 0xFF&data[0]; int qos = message.getQos(); long now = 0; int idx = 0; while (++idx<9) { now = (now<<8)|(0xFF&data[idx]); } System.out.println(seqByte+" "+new Date(now)+" "+(now%1000)); long rate = 0; idx=8; while (++idx<17) { rate = (rate<<8)|(0xFF&data[idx]); } System.out.println("rate: "+rate+" QOS "+qos); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.err.println("should not be called for subscriber"); System.err.println(token); fail(); }}; client.setCallback(callback); client.connect(); client.subscribe("#", 2); }
@Test public void testMQTTProducer() throws Exception { String conUrl = TestUtils.getResourceValue(getClass(), "/tcp-connection"); CamelContext camelctx = new DefaultCamelContext(); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start"). transform(body().prepend("Hello ")). to("paho:" + BrokerSetup.TEST_TOPIC + "?brokerUrl=" + conUrl); } }); camelctx.start(); try { MqttClient client = new MqttClient(conUrl, "MqttClient", new MemoryPersistence()); MqttConnectOptions opts = new MqttConnectOptions(); opts.setCleanSession(true); client.connect(opts); client.subscribe(BrokerSetup.TEST_TOPIC, 2); final List<String> result = new ArrayList<>(); final CountDownLatch latch = new CountDownLatch(1); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { result.add(new String (message.getPayload())); latch.countDown(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { }}); ProducerTemplate producer = camelctx.createProducerTemplate(); producer.asyncSendBody("direct:start", "Kermit"); Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); Assert.assertEquals("One message", 1, result.size()); Assert.assertEquals("Hello Kermit", result.get(0)); } finally { camelctx.stop(); } }
public void setCallback(MqttCallback mqttCallback) { this.mqttCallback = mqttCallback; }
public void setCallback(MqttCallback mqttCallback) { this.callback.setCallback(mqttCallback); }
/** * Sets a callback listener to use for events that happen asynchronously. * <p> * There are a number of events that the listener will be notified about. * These include: * </p> * <ul> * <li>A new message has arrived and is ready to be processed</li> * <li>The connection to the server has been lost</li> * <li>Delivery of a message to the server has completed</li> * </ul> * <p> * Other events that track the progress of an individual operation such as * connect and subscribe can be tracked using the {@link MqttToken} returned * from each non-blocking method or using setting a * {@link IMqttActionListener} on the non-blocking method. * <p> * * @param callback * which will be invoked for certain asynchronous events * * @see MqttCallback */ @Override public void setCallback(MqttCallback callback) { this.callback = callback; }
@Test public void subscribeRecoveringSession(TestContext context) { Async async = context.async(); try { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); MemoryPersistence subscriberPersistence = new MemoryPersistence(); MqttClient subscriber = new MqttClient(String.format("tcp://%s:%d", MQTT_BIND_ADDRESS, MQTT_LISTEN_PORT), SUBSCRIBER_ID, subscriberPersistence); subscriber.connect(options); subscriber.subscribe(MQTT_TOPIC, 0); subscriber.disconnect(); // re-connect without subscribing, should receive published message subscriber.connect(options); subscriber.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { LOG.info("topic: {} message: {}", topic, new String(mqttMessage.getPayload())); async.complete(); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); MemoryPersistence publisherPersistence = new MemoryPersistence(); MqttClient publisher = new MqttClient(String.format("tcp://%s:%d", MQTT_BIND_ADDRESS, MQTT_LISTEN_PORT), PUBLISHER_ID, publisherPersistence); publisher.connect(); publisher.publish(MQTT_TOPIC, MQTT_MESSAGE.getBytes(), 0, true); publisher.disconnect(); async.await(); context.assertTrue(true); } catch (MqttException e) { context.assertTrue(false); e.printStackTrace(); } }
/** * Sets a callback listener to use for events that happen asynchronously. * <p>There are a number of events that the listener will be notified about. * These include: * <ul> * <li>A new message has arrived and is ready to be processed</li> * <li>The connection to the server has been lost</li> * <li>Delivery of a message to the server has completed</li> * </ul> * </p> * <p>Other events that track the progress of an individual operation such * as connect and subscribe can be tracked using the {@link MqttToken} returned from * each non-blocking method or using setting a {@link IMqttActionListener} on the * non-blocking method.<p> * @param callback which will be invoked for certain asynchronous events * * @see MqttCallback */ @Override public void setCallback(MqttCallback callback) { this.callback = callback; }