@Override public FdfsClient append(ReadStream<Buffer> stream, long size, FdfsFileId fileId, Handler<AsyncResult<Void>> handler) { getTracker().setHandler(tracker -> { if (tracker.succeeded()) { tracker.result().getStoreStorage(fileId.group(), storage -> { if (storage.succeeded()) { storage.result().append(stream, size, fileId, append -> { handler.handle(append); }); } else { handler.handle(Future.failedFuture(storage.cause())); } }); } else { handler.handle(Future.failedFuture(tracker.cause())); } }); return this; }
@Override public FdfsClient modify(ReadStream<Buffer> stream, long size, FdfsFileId fileId, long offset, Handler<AsyncResult<Void>> handler) { getTracker().setHandler(tracker -> { if (tracker.succeeded()) { tracker.result().getStoreStorage(fileId.group(), storage -> { if (storage.succeeded()) { storage.result().modify(stream, size, fileId, offset, modify -> { handler.handle(modify); }); } else { handler.handle(Future.failedFuture(storage.cause())); } }); } else { handler.handle(Future.failedFuture(tracker.cause())); } }); return this; }
/** * Simple test to fake a _tenantPermission interface. * Captures the body, and reports it in a header. * * @param ctx */ private void myPermissionHandle(RoutingContext ctx) { ReadStream<Buffer> content = ctx.request(); final Buffer incoming = Buffer.buffer(); content.handler(incoming::appendBuffer); ctx.request().endHandler(x -> { String body = incoming.toString(); body = body.replaceAll("\\s+", " "); // remove newlines etc ctx.response().putHeader("X-Tenant-Perms-Result", body); if (body.length() > 80) { body = body.substring(0, 80) + "..."; } logger.info("tenantPermissions: " + body); ctx.response().end(); }); }
@Test public void testClosedKeyStream() { Map<JsonObject, Buffer> map = genJsonToBuffer(100); loadData(map, (vertx, asyncMap) -> { List<JsonObject> keys = new ArrayList<>(); ReadStream<JsonObject> stream = InfinispanAsyncMap.<JsonObject, Buffer>unwrap(asyncMap).keyStream(); stream.exceptionHandler(t -> { fail(t); }).handler(jsonObject -> { keys.add(jsonObject); if (jsonObject.getInteger("key") == 38) { stream.handler(null); int emitted = keys.size(); vertx.setTimer(500, tid -> { assertTrue("Items emitted after close", emitted == keys.size()); testComplete(); }); } }); }); await(); }
private void proxyRequestResponse10(Iterator<ModuleInstance> it, ProxyContext pc, ReadStream<Buffer> stream, Buffer bcontent, ModuleInstance mi) { if (bcontent != null) { proxyRequestResponse(it, pc, null, bcontent, mi); } else { final Buffer incoming = Buffer.buffer(); stream.handler(data -> { incoming.appendBuffer(data); pc.trace("ProxyRequestBlock request chunk '" + data.toString() + "'"); }); stream.endHandler(v -> { pc.trace("ProxyRequestBlock request end"); proxyRequestResponse(it, pc, null, incoming, mi); }); stream.resume(); } }
private void proxyInternal(Iterator<ModuleInstance> it, ProxyContext pc, ReadStream<Buffer> stream, Buffer bcontent, ModuleInstance mi) { pc.debug("proxyInternal " + mi.getModuleDescriptor().getId()); if (bcontent != null) { proxyInternalBuffer(it, pc, bcontent, mi); } else { // read the whole request into a buffer final Buffer incoming = Buffer.buffer(); stream.handler(data -> { incoming.appendBuffer(data); pc.trace("proxyInternal request chunk '" + data.toString() + "'"); }); stream.endHandler(v -> { pc.trace("proxyInternal request end"); proxyInternalBuffer(it, pc, incoming, mi); }); stream.resume(); } }
public void testSmall(TestContext context, boolean encrypt) throws IOException { SfsVertx sfsVertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool); Path tmpDir = createTempDirectory(valueOf(currentTimeMillis())); Buffer testBuffer = buffer("test"); Async async = context.async(); aVoid() .flatMap(aVoid -> { FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(sfsVertx, 2, encrypt, tmpDir); ReadStream<Buffer> readStream = new BufferReadStream(testBuffer); return pump(readStream, fileBackedBuffer) .flatMap(aVoid1 -> { ReadStream<Buffer> fileBackedReadStream = fileBackedBuffer.readStream(); BufferWriteEndableWriteStream bufferedWriteStreamConsumer = new BufferWriteEndableWriteStream(); return pump(fileBackedReadStream, bufferedWriteStreamConsumer) .doOnNext(aVoid2 -> { Buffer actualBuffer = bufferedWriteStreamConsumer.toBuffer(); assertArrayEquals(context, testBuffer.getBytes(), actualBuffer.getBytes()); assertEquals(context, true, fileBackedBuffer.isFileOpen()); }); }) .flatMap(aVoid1 -> fileBackedBuffer.close()); }) .subscribe(new TestSubscriber(context, async)); }
public void testOnlyBuffer(TestContext context, boolean encrypt) throws IOException { SfsVertx sfsVertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool); Path tmpDir = createTempDirectory(valueOf(currentTimeMillis())); Buffer testBuffer = buffer("test"); Async async = context.async(); aVoid() .flatMap(aVoid -> { FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(sfsVertx, 1024, encrypt, tmpDir); ReadStream<Buffer> readStream = new BufferReadStream(testBuffer); return pump(readStream, fileBackedBuffer) .flatMap(aVoid1 -> { ReadStream<Buffer> fileBackedReadStream = fileBackedBuffer.readStream(); BufferWriteEndableWriteStream bufferedWriteStreamConsumer = new BufferWriteEndableWriteStream(); return pump(fileBackedReadStream, bufferedWriteStreamConsumer) .doOnNext(aVoid2 -> { Buffer actualBuffer = bufferedWriteStreamConsumer.toBuffer(); assertArrayEquals(context, testBuffer.getBytes(), actualBuffer.getBytes()); assertEquals(context, false, fileBackedBuffer.isFileOpen()); }); }) .flatMap(aVoid1 -> fileBackedBuffer.close()); }) .subscribe(new TestSubscriber(context, async)); }
/** * <p>Search the GeoRocket data store and return a {@link ReadStream} of * merged chunks matching the given criteria.</p> * <p>If <code>query</code> is <code>null</code> or empty all chunks from * the given <code>layer</code> (and all sub-layers) will be returned. If * <code>layer</code> is also <code>null</code> or empty the contents of the * whole data store will be returned.</p> * <p>The caller is responsible for handling exceptions through * {@link ReadStream#exceptionHandler(Handler)}.</p> * @param query a search query specifying which chunks to return (may be * <code>null</code>) * @param layer the name of the layer where to search for chunks recursively * (may be <code>null</code>) * @param handler a handler that will receive the {@link ReadStream} from * which the merged chunks matching the given criteria can be read */ public void search(String query, String layer, Handler<AsyncResult<ReadStream<Buffer>>> handler) { if ((query == null || query.isEmpty()) && (layer == null || layer.isEmpty())) { handler.handle(Future.failedFuture("No search query and no layer given. " + "Do you really wish to export/query the whole data store? If so, " + "set the layer to '/'.")); return; } String queryPath = prepareQuery(query, layer); HttpClientRequest request = client.get(getEndpoint() + queryPath); request.exceptionHandler(t -> handler.handle(Future.failedFuture(t))); request.handler(response -> { if (response.statusCode() == 404) { fail(response, handler, message -> new NoSuchElementException(ClientAPIException.parse(message).getMessage())); } else if (response.statusCode() != 200) { fail(response, handler); } else { handler.handle(Future.succeededFuture(response)); } }); configureRequest(request).end(); }
public MultipartHelper putBinaryBody(String name, ReadStream<Buffer> stream, String contentType, String fileName, Handler<AsyncResult> handler) { request .write("--") .write(boundary) .write(System.lineSeparator()) .write(String.format("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"", name, fileName)) .write(System.lineSeparator()) .write(String.format("Content-Type: %s", contentType)) .write(System.lineSeparator()) .write("Content-Transfer-Encoding: binary") .write(System.lineSeparator()) .write(System.lineSeparator()); Pump.pump(stream .endHandler(event -> { request.write(System.lineSeparator()); handler.handle(createResult(true, null)); }) .exceptionHandler(e -> handler.handle(createResult(false, e))), request) .start(); return this; }
@Override public ReadStream<JsonObject> resume() { JsonObject conf; Handler<JsonObject> succ; synchronized (this) { if (! paused) { // Cannot resume a non paused stream return this; } paused = false; conf = last; if (last != null) { last = null; } succ = this.handler; } if (conf != null && succ != null) { vertx.runOnContext(v -> succ.handle(conf)); } return this; }
/** * Asynchronously store content from source to filePath, * and call onEnd when finished * @param source * @param filePath * @param onEnd */ public static void asyncStore(Vertx vertx, ReadStream<Buffer> source, String filePath, Handler<Void> onEnd) { checkDir(filePath); source.pause(); vertx.fileSystem().open(filePath, new OpenOptions().setWrite(true).setCreate(true), fres -> { AsyncFile afile = fres.result(); Pump pump = Pump.pump(source, afile); source.endHandler(onEnd); pump.start(); source.resume(); }); }
@Override public FdfsClient upload(ReadStream<Buffer> stream, long size, String ext, Handler<AsyncResult<FdfsFileId>> handler) { if (Buffer.buffer(ext, options.getCharset()).length() > FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN) { handler.handle(Future .failedFuture("ext is too long ( greater than " + FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN + ")")); return this; } getTracker().setHandler(tracker -> { if (tracker.succeeded()) { tracker.result().getStoreStorage(storage -> { if (storage.succeeded()) { storage.result().upload(stream, size, ext, upload -> { handler.handle(upload); }); } else { handler.handle(Future.failedFuture(storage.cause())); } }); } else { handler.handle(Future.failedFuture(tracker.cause())); } }); return this; }
@Override public FdfsClient uploadAppender(ReadStream<Buffer> stream, long size, String ext, Handler<AsyncResult<FdfsFileId>> handler) { if (Buffer.buffer(ext, options.getCharset()).length() > FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN) { handler.handle(Future .failedFuture("ext is too long ( greater than " + FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN + ")")); return this; } getTracker().setHandler(tracker -> { if (tracker.succeeded()) { tracker.result().getStoreStorage(storage -> { if (storage.succeeded()) { storage.result().uploadAppender(stream, size, ext, uploadAppender -> { handler.handle(uploadAppender); }); } else { handler.handle(Future.failedFuture(storage.cause())); } }); } else { handler.handle(Future.failedFuture(tracker.cause())); } }); return this; }
private void proxyRedirect(Iterator<ModuleInstance> it, ProxyContext pc, ReadStream<Buffer> stream, Buffer bcontent, ModuleInstance mi) { pc.trace("ProxyNull " + mi.getModuleDescriptor().getId()); pc.closeTimer(); // if no more entries in it, proxyR will return 404 proxyR(it, pc, stream, bcontent); }
public Observable<Void> consume(SfsVertx vertx, long position, long length, ReadStream<Buffer> src, boolean assertAlignment) { Context context = vertx.getOrCreateContext(); return defer(() -> { checkOpen(); checkCanWrite(); ObservableFuture<Void> drainHandler = RxHelper.observableFuture(); if (writeQueueSupport.writeQueueFull()) { writeQueueSupport.drainHandler(context, drainHandler::complete); } else { drainHandler.complete(null); } return drainHandler.flatMap(aVoid -> using( () -> { AsyncFileWriter dst = createWriteStream(context, position, assertAlignment); activeWriters.add(dst); return dst; }, sfsWriteStream -> { BufferedEndableWriteStream bufferedWriteStream = new BufferedEndableWriteStream(sfsWriteStream); LimitedWriteEndableWriteStream limitedWriteStream = new LimitedWriteEndableWriteStream(bufferedWriteStream, length); return pump(src, limitedWriteStream) .doOnNext(aVoid1 -> activeWriters.remove(sfsWriteStream)); }, activeWriters::remove )); }); }
@Override public Observable<NodeWriteStreamBlob> createWriteStream(String volumeId, long length, final MessageDigestFactory... messageDigestFactories) { LocalNode _this = this; return defer(() -> { if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("createWriteStream {volumeId=%s,length=%d,messageDigests=%s}", volumeId, length, on(',').join(messageDigestFactories))); } final Volume volume = volumeManager.get(volumeId).get(); return volume.putDataStream(vertxContext.vertx(), length) .map((Func1<WriteStreamBlob, NodeWriteStreamBlob>) writeStreamBlob -> new NodeWriteStreamBlob(_this) { @Override public Observable<DigestBlob> consume(ReadStream<Buffer> src) { DigestReadStream digestWriteStream = new DigestReadStream(src, messageDigestFactories); return writeStreamBlob.consume(digestWriteStream) .map(aVoid -> { DigestBlob digestBlob = new DigestBlob(writeStreamBlob.getVolume(), writeStreamBlob.getPosition(), writeStreamBlob.getLength()); for (MessageDigestFactory messageDigestFactory : messageDigestFactories) { digestBlob.withDigest(messageDigestFactory, digestWriteStream.getDigest(messageDigestFactory).get()); } return digestBlob; }); } }); }).onErrorResumeNext(new HandleServerToBusy<>()); }
public Observable<List<DigestBlob>> consume(final long length, final Iterable<MessageDigestFactory> messageDigestFactories, ReadStream<Buffer> src) { return calculateNodeWriteStreamBlobs(length, toArray(messageDigestFactories, MessageDigestFactory.class)) .flatMap(nodeWriteStreamBlobs -> { int size = nodeWriteStreamBlobs.size(); List<Observable<DigestBlob>> oDigests = new ArrayList<>(size); List<BufferEndableWriteStream> writeStreams = new ArrayList<>(size); for (int i = 0; i < size; i++) { PipedReadStream readStream = new PipedReadStream(); PipedEndableWriteStream writeStream = new PipedEndableWriteStream(readStream); Observable<DigestBlob> oDigest = nodeWriteStreamBlobs.get(i).consume(readStream); oDigests.add(oDigest); writeStreams.add(writeStream); } MultiEndableWriteStream multiWriteStream = new MultiEndableWriteStream(writeStreams); Observable<Void> producer = pump(src, multiWriteStream).single(); Observable<List<DigestBlob>> consumer = Observable.mergeDelayError(oDigests) .toList() .single(); // the zip operator will not work here // since the subscriptions need to run // in parallel due to the pipe connections return combineSinglesDelayError( producer, consumer, (aVoid, response) -> response); }); }
public DigestReadStream(ReadStream<Buffer> delegate, MessageDigestFactory messageDigest, MessageDigestFactory... mdfs) { this.delegate = delegate; this.messageDigests = new MessageDigest[1 + mdfs.length]; MessageDigest md = messageDigest.instance(); this.messageDigests[0] = md; digests.put(messageDigest, md); for (int i = 0; i < mdfs.length; i++) { MessageDigestFactory mdf = mdfs[i]; md = mdf.instance(); this.messageDigests[i + 1] = md; digests.put(mdf, md); } }
public ReadStream<Buffer> readStream() { checkReadStreamNotOpen(); openedReadStream = true; if (fileOpen) { AsyncFileReader asyncFileReader = new AsyncFileReaderImpl(context, 0, 8192, countingEndableWriteStream.count(), channel, LOGGER); if (encryptTempFile) { return algorithm.decrypt(asyncFileReader); } else { return asyncFileReader; } } else { return new BufferReadStream(memory); } }
public void testLarge(TestContext context, boolean encrypt) throws IOException { SfsVertx sfsVertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool); Path tmpDir = createTempDirectory(valueOf(currentTimeMillis())); byte[] data = new byte[1024 * 1024 * 10]; getCurrentInstance().nextBytesBlocking(data); Buffer testBuffer = buffer(data); Async async = context.async(); aVoid() .flatMap(aVoid -> { FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(sfsVertx, 1024, encrypt, tmpDir); ReadStream<Buffer> readStream = new BufferReadStream(testBuffer); return pump(readStream, fileBackedBuffer) .flatMap(aVoid1 -> { ReadStream<Buffer> fileBackedReadStream = fileBackedBuffer.readStream(); BufferWriteEndableWriteStream bufferedWriteStreamConsumer = new BufferWriteEndableWriteStream(); return pump(fileBackedReadStream, bufferedWriteStreamConsumer) .doOnNext(aVoid2 -> { Buffer actualBuffer = bufferedWriteStreamConsumer.toBuffer(); assertArrayEquals(context, testBuffer.getBytes(), actualBuffer.getBytes()); assertEquals(context, true, fileBackedBuffer.isFileOpen()); }); }) .flatMap(aVoid1 -> fileBackedBuffer.close()); }) .subscribe(new TestSubscriber(context, async)); }
@Override public ReadStream<Buffer> resume() { if (paused) { paused = false; if (handler != null) { Handler<Buffer> h = handler; handler = null; doWrite(h); } } return this; }
@Override public ReadStream<Buffer> resume() { check(); if (paused && !closed) { paused = false; if (handler != null) { doRead(); } } return this; }
/** * Execute a get request with specified path, query and additional parameter * @param endpoint the endpoint * @param parameterName the name of the query parameter * @param parameterValue the value of the query parameter * @param query a search query specifying which chunks to return (may be * <code>null</code>) * @param layer the name of the layer where to search for chunks recursively * (may be <code>null</code>) * @param handler a handler that will receive the {@link ReadStream} from * which the results matching the given criteria can be read */ protected void getWithParameter(String endpoint, String parameterName, String parameterValue, String query, String layer, Handler<AsyncResult<ReadStream<Buffer>>> handler) { if ((query == null || query.isEmpty()) && (layer == null || layer.isEmpty())) { handler.handle(Future.failedFuture("No search query and no layer given. " + "Do you really wish to export/query the whole data store? If so, " + "set the layer to '/'.")); return; } String queryPath = prepareQuery(query, layer); if (query == null || query.isEmpty()) { queryPath += "?"; } else { queryPath += "&"; } String path = endpoint + queryPath + parameterName + "=" + parameterValue; HttpClientRequest request = client.get(path); request.exceptionHandler(t -> handler.handle(Future.failedFuture(t))); request.handler(response -> { if (response.statusCode() != 200) { fail(response, handler); } else { handler.handle(Future.succeededFuture(response)); } }); configureRequest(request).end(); }
private Handler<ReadStream<Buffer>> assertExport(String url, String XML, TestContext context, Async async) { return r -> { Buffer response = Buffer.buffer(); r.handler(response::appendBuffer); r.endHandler(v -> { context.assertEquals(XML, response.toString()); verifyRequested(url, context); async.complete(); }); }; }
@Override public ReadStream<Entry<K, V>> entryStream() { return new CloseableIteratorCollectionStream<>(vertx.getOrCreateContext(), cache::entrySet, cacheEntry -> { K key = DataConverter.fromCachedObject(cacheEntry.getKey()); V value = DataConverter.fromCachedObject(cacheEntry.getValue()); return new SimpleImmutableEntry<>(key, value); }); }
/** * Create a new back pressure handler. * * @param stream stream to handle. * @param endpoint endpoint name. * @param endHandler end handler to call. */ public PressureHandler(@NonNull ReadStream stream, String endpoint, Handler<Void> endHandler) { this.stream = stream; stream.endHandler(h -> { if (endHandler != null) endHandler.handle(null); nextPressure.clear(); ended = true; log.info("Finish to read stream: " + endpoint); }); }
/** * Handle back-pressure on component. * * @param stream stream in read. * @param endHandler end handler to call. */ public void handlePressure(ReadStream stream, Handler<Void> endHandler) { MessageConsumer<String> consumer = eventBus.consumer(parentEndpoint + ".pressure"); consumer.handler(new PressureHandler(stream, parentEndpoint, h -> { if (endHandler != null) { endHandler.handle(null); } consumer.unregister(); })); }
@Test public void testConstructor2() { // Prepare read stream ReadStream mockedStream = mock(ReadStream.class); // Test new PressureHandler(mockedStream, "test-endpoint"); verify(mockedStream).endHandler(any()); }
@Test public void testHandle1() { // Prepare mocks ReadStream mockedStream = mock(ReadStream.class); Message<String> mockedMessage = mock(Message.class); when(mockedMessage.body()).thenReturn("next-endpoint"); // Test PressureHandler handler = new PressureHandler(mockedStream, "test-endpoint"); handler.handle(mockedMessage); verify(mockedStream).pause(); }
@Test public void testHandle2() { // Prepare mocks ReadStream mockedStream = mock(ReadStream.class); Message<String> mockedMessage = mock(Message.class); when(mockedMessage.body()).thenReturn("next-endpoint"); // Test PressureHandler handler = new PressureHandler(mockedStream, "test-endpoint"); handler.handle(mockedMessage); handler.handle(mockedMessage); verify(mockedStream).pause(); verify(mockedStream).resume(); }
@Test public void testHandle5() { // Prepare mocks ReadStream mockedStream = mock(ReadStream.class); Message<String> mockedMessage1 = mock(Message.class); when(mockedMessage1.body()).thenReturn("next-endpoint-1"); Message<String> mockedMessage2 = mock(Message.class); when(mockedMessage2.body()).thenReturn("next-endpoint-2"); // Test PressureHandler handler = new PressureHandler(mockedStream, "test-endpoint"); handler.handle(mockedMessage1); handler.handle(mockedMessage2); verify(mockedStream).pause(); }
private <T> void testReadStream( Function<InfinispanAsyncMap<JsonObject, Buffer>, ReadStream<T>> streamFactory, BiConsumer<Map<JsonObject, Buffer>, List<T>> assertions ) { Map<JsonObject, Buffer> map = genJsonToBuffer(100); loadData(map, (vertx, asyncMap) -> { List<T> items = new ArrayList<>(); ReadStream<T> stream = streamFactory.apply(InfinispanAsyncMap.unwrap(asyncMap)); AtomicInteger idx = new AtomicInteger(); long pause = 500; long start = System.nanoTime(); stream.endHandler(end -> { assertions.accept(map, items); long duration = NANOSECONDS.toMillis(System.nanoTime() - start); assertTrue(duration >= 3 * pause); testComplete(); }).exceptionHandler(t -> { fail(t); }).handler(item -> { items.add(item); int j = idx.getAndIncrement(); if (j == 3 || j == 16 || j == 38) { stream.pause(); int emitted = items.size(); vertx.setTimer(pause, tid -> { assertTrue("Items emitted during pause", emitted == items.size()); stream.resume(); }); } }); }); await(); }
@Override public ReadStream<Buffer> endHandler(Handler<Void> handler) { check(); this.endHandler = handler; return this; }
@Override public ReadStream<HttpServerRequest> requestStream() { return null; }
@Override public ReadStream<ServerWebSocket> websocketStream() { return null; }
@Override public FdfsStorage upload(ReadStream<Buffer> stream, long size, String ext, Handler<AsyncResult<FdfsFileId>> handler) { uploadFile(FdfsProtocol.STORAGE_PROTO_CMD_UPLOAD_FILE, stream, size, ext).setHandler(handler); return this; }
@Override public FdfsStorage uploadAppender(ReadStream<Buffer> stream, long size, String ext, Handler<AsyncResult<FdfsFileId>> handler) { uploadFile(FdfsProtocol.STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, stream, size, ext).setHandler(handler); return this; }
@Override public FdfsStorage append(ReadStream<Buffer> stream, long size, FdfsFileId fileId, Handler<AsyncResult<Void>> handler) { stream.pause(); Future<FdfsConnection> futureConn = getConnection(); futureConn.compose(connection -> { Future<FdfsPacket> futureResponse = FdfsProtocol.recvPacket(vertx, options.getNetworkTimeout(), connection, FdfsProtocol.STORAGE_PROTO_CMD_RESP, 0, null); Buffer nameBuffer = Buffer.buffer(fileId.name(), options.getCharset()); long bodyLength = 2 * FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE + nameBuffer.length() + size; Buffer headerBuffer = FdfsProtocol.packHeader(FdfsProtocol.STORAGE_PROTO_CMD_APPEND_FILE, (byte) 0, bodyLength); connection.write(headerBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Buffer bodyBuffer = FdfsUtils.newZero(bodyLength - size); int offset = 0; bodyBuffer.setLong(offset, nameBuffer.length()); offset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setLong(offset, size); offset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setBuffer(offset, nameBuffer); connection.write(bodyBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Pump.pump(stream, connection).start(); stream.resume(); return futureResponse; }).setHandler(ar -> { if (futureConn.succeeded()) { futureConn.result().release(); } if (ar.succeeded()) { handler.handle(Future.succeededFuture()); } else { handler.handle(Future.failedFuture(ar.cause())); } }); return this; }
@Override public FdfsStorage modify(ReadStream<Buffer> stream, long size, FdfsFileId fileId, long offset, Handler<AsyncResult<Void>> handler) { stream.pause(); Future<FdfsConnection> futureConn = getConnection(); futureConn.compose(connection -> { Future<FdfsPacket> futureResponse = FdfsProtocol.recvPacket(vertx, options.getNetworkTimeout(), connection, FdfsProtocol.STORAGE_PROTO_CMD_RESP, 0, null); Buffer nameBuffer = Buffer.buffer(fileId.name(), options.getCharset()); long bodyLength = 3 * FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE + nameBuffer.length() + size; Buffer headerBuffer = FdfsProtocol.packHeader(FdfsProtocol.STORAGE_PROTO_CMD_MODIFY_FILE, (byte) 0, bodyLength); connection.write(headerBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Buffer bodyBuffer = FdfsUtils.newZero(bodyLength - size); int bufferOffset = 0; bodyBuffer.setLong(bufferOffset, nameBuffer.length()); bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setLong(bufferOffset, offset); bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setLong(bufferOffset, size); bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE; bodyBuffer.setBuffer(bufferOffset, nameBuffer); connection.write(bodyBuffer); if (connection.writeQueueFull()) { connection.pause(); connection.drainHandler(v -> { connection.resume(); }); } Pump.pump(stream, connection).start(); stream.resume(); return futureResponse; }).setHandler(ar -> { if (futureConn.succeeded()) { futureConn.result().release(); } if (ar.succeeded()) { handler.handle(Future.succeededFuture()); } else { handler.handle(Future.failedFuture(ar.cause())); } }); return this; }