public MqttPersistable get(String key) throws MqttPersistenceException { checkIsOpen(); MqttPersistable result; try { File file = new File(clientDir, key + MESSAGE_FILE_EXTENSION); FileInputStream fis = new FileInputStream(file); int size = fis.available(); byte[] data = new byte[size]; int read = 0; while (read < size) { read += fis.read(data, read, size - read); } fis.close(); result = new MqttPersistentData(key, data, 0, data.length, null, 0, 0); } catch (IOException ex) { throw new MqttPersistenceException(ex); } return result; }
public static MqttWireMessage createWireMessage(MqttPersistable data) throws MqttException { byte[] payload = data.getPayloadBytes(); // The persistable interface allows a message to be restored entirely in the header array // Need to treat these two arrays as a single array of bytes and use the decoding // logic to identify the true header/payload split if (payload == null) { payload = new byte[0]; } MultiByteArrayInputStream mbais = new MultiByteArrayInputStream( data.getHeaderBytes(), data.getHeaderOffset(), data.getHeaderLength(), payload, data.getPayloadOffset(), data.getPayloadLength()); return createWireMessage(mbais); }
public MqttPersistable get(String key) throws MqttPersistenceException { checkIsOpen(); MqttPersistable result; try { File file = new File(clientDir, key+MESSAGE_FILE_EXTENSION); FileInputStream fis = new FileInputStream(file); int size = fis.available(); byte[] data = new byte[size]; int read = 0; while (read<size) { read += fis.read(data,read,size-read); } fis.close(); result = new MqttPersistentData(key, data, 0, data.length, null, 0, 0); } catch(IOException ex) { throw new MqttPersistenceException(ex); } return result; }
private MqttWireMessage restoreMessage(String key, MqttPersistable persistable) throws MqttException { final String methodName = "restoreMessage"; MqttWireMessage message = null; try { message = MqttWireMessage.createWireMessage(persistable); } catch (MqttException ex) { //@TRACE 602=key={0} exception log.fine(CLASS_NAME, methodName, "602", new Object[] {key}, ex); if (ex.getCause() instanceof EOFException) { // Premature end-of-file means that the message is corrupted if (key != null) { persistence.remove(key); } } else { throw ex; } } //@TRACE 601=key={0} message={1} log.fine(CLASS_NAME, methodName, "601", new Object[]{key,message}); return message; }
public MqttPersistable get(String key) throws MqttPersistenceException { checkIsOpen(); MqttPersistable result; try { FileConnection file = (FileConnection) Connector.open(clientDir.getURL() + key + MESSAGE_FILE_EXTENSION); DataInputStream fis = file.openDataInputStream(); int size = fis.available(); byte[] data = new byte[size]; int read = 0; while (read<size) { read += fis.read(data,read,size-read); } fis.close(); result = new MqttPersistentData(key, data, 0, data.length, null, 0, 0); } catch(IOException ex) { throw new MqttPersistenceException(ex); } return result; }
@Override public void open(String clientId, String serverURI) throws MqttPersistenceException { if (clientId == null || serverURI == null) throw new MqttPersistenceException(); if (dataStore == null) { this.serverURI = serverURI; this.clientId = clientId; dataStore = new Hashtable<String, MqttPersistable> (); } }
@Override public void put(String key, MqttPersistable persistentData) throws MqttPersistenceException { if (key == null || persistentData == null) throw new MqttPersistenceException(); dataStore.put(key, persistentData); }
@Override public MqttPersistable get(String key) throws MqttPersistenceException { MqttPersistable message = dataStore.get(key); return message; }
public MqttPersistable get(String key) throws MqttPersistenceException { return (MqttPersistable) data.get(key); }
public void put(String key, MqttPersistable persistable) throws MqttPersistenceException { data.put(key, persistable); }
public MqttPersistable get(String key) throws MqttPersistenceException { return (MqttPersistable)data.get(key); }