Java 类io.vertx.core.streams.ReadStream 实例源码

项目:vertx-fastdfs-client    文件:FdfsClientImpl.java   
@Override
public FdfsClient append(ReadStream<Buffer> stream, long size, FdfsFileId fileId,
        Handler<AsyncResult<Void>> handler) {
    getTracker().setHandler(tracker -> {
        if (tracker.succeeded()) {
            tracker.result().getStoreStorage(fileId.group(), storage -> {
                if (storage.succeeded()) {
                    storage.result().append(stream, size, fileId, append -> {
                        handler.handle(append);
                    });
                } else {
                    handler.handle(Future.failedFuture(storage.cause()));
                }
            });
        } else {
            handler.handle(Future.failedFuture(tracker.cause()));
        }
    });

    return this;
}
项目:vertx-fastdfs-client    文件:FdfsClientImpl.java   
@Override
public FdfsClient modify(ReadStream<Buffer> stream, long size, FdfsFileId fileId, long offset,
        Handler<AsyncResult<Void>> handler) {
    getTracker().setHandler(tracker -> {
        if (tracker.succeeded()) {
            tracker.result().getStoreStorage(fileId.group(), storage -> {
                if (storage.succeeded()) {
                    storage.result().modify(stream, size, fileId, offset, modify -> {
                        handler.handle(modify);
                    });
                } else {
                    handler.handle(Future.failedFuture(storage.cause()));
                }
            });
        } else {
            handler.handle(Future.failedFuture(tracker.cause()));
        }
    });

    return this;
}
项目:okapi    文件:MainVerticle.java   
/**
 * Simple test to fake a _tenantPermission interface.
 * Captures the body, and reports it in a header.
 *
 * @param ctx
 */
private void myPermissionHandle(RoutingContext ctx) {
  ReadStream<Buffer> content = ctx.request();
  final Buffer incoming = Buffer.buffer();
  content.handler(incoming::appendBuffer);
  ctx.request().endHandler(x -> {
    String body = incoming.toString();
    body = body.replaceAll("\\s+", " "); // remove newlines etc
    ctx.response().putHeader("X-Tenant-Perms-Result", body);
    if (body.length() > 80) {
      body = body.substring(0, 80) + "...";
    }
    logger.info("tenantPermissions: " + body);
    ctx.response().end();
  });
}
项目:vertx-infinispan    文件:InfinispanClusteredAsyncMapTest.java   
@Test
public void testClosedKeyStream() {
  Map<JsonObject, Buffer> map = genJsonToBuffer(100);
  loadData(map, (vertx, asyncMap) -> {
    List<JsonObject> keys = new ArrayList<>();
    ReadStream<JsonObject> stream = InfinispanAsyncMap.<JsonObject, Buffer>unwrap(asyncMap).keyStream();
    stream.exceptionHandler(t -> {
      fail(t);
    }).handler(jsonObject -> {
      keys.add(jsonObject);
      if (jsonObject.getInteger("key") == 38) {
        stream.handler(null);
        int emitted = keys.size();
        vertx.setTimer(500, tid -> {
          assertTrue("Items emitted after close", emitted == keys.size());
          testComplete();
        });
      }
    });
  });
  await();
}
项目:okapi    文件:ProxyService.java   
private void proxyRequestResponse10(Iterator<ModuleInstance> it,
  ProxyContext pc, ReadStream<Buffer> stream, Buffer bcontent,
  ModuleInstance mi) {

  if (bcontent != null) {
    proxyRequestResponse(it, pc, null, bcontent, mi);
  } else {
    final Buffer incoming = Buffer.buffer();
    stream.handler(data -> {
      incoming.appendBuffer(data);
      pc.trace("ProxyRequestBlock request chunk '"
        + data.toString() + "'");
    });
    stream.endHandler(v -> {
      pc.trace("ProxyRequestBlock request end");
      proxyRequestResponse(it, pc, null, incoming, mi);
    });
    stream.resume();
  }
}
项目:okapi    文件:ProxyService.java   
private void proxyInternal(Iterator<ModuleInstance> it,
  ProxyContext pc, ReadStream<Buffer> stream, Buffer bcontent,
  ModuleInstance mi) {

  pc.debug("proxyInternal " + mi.getModuleDescriptor().getId());
  if (bcontent != null) {
    proxyInternalBuffer(it, pc, bcontent, mi);
  } else { // read the whole request into a buffer
    final Buffer incoming = Buffer.buffer();
    stream.handler(data -> {
      incoming.appendBuffer(data);
      pc.trace("proxyInternal request chunk '"
        + data.toString() + "'");
    });
    stream.endHandler(v -> {
      pc.trace("proxyInternal request end");
      proxyInternalBuffer(it, pc, incoming, mi);
    });
    stream.resume();
  }
}
项目:sfs    文件:FileBackedBufferTest.java   
public void testSmall(TestContext context, boolean encrypt) throws IOException {

        SfsVertx sfsVertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool);
        Path tmpDir = createTempDirectory(valueOf(currentTimeMillis()));

        Buffer testBuffer = buffer("test");
        Async async = context.async();
        aVoid()
                .flatMap(aVoid -> {
                    FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(sfsVertx, 2, encrypt, tmpDir);
                    ReadStream<Buffer> readStream = new BufferReadStream(testBuffer);
                    return pump(readStream, fileBackedBuffer)
                            .flatMap(aVoid1 -> {
                                ReadStream<Buffer> fileBackedReadStream = fileBackedBuffer.readStream();
                                BufferWriteEndableWriteStream bufferedWriteStreamConsumer = new BufferWriteEndableWriteStream();
                                return pump(fileBackedReadStream, bufferedWriteStreamConsumer)
                                        .doOnNext(aVoid2 -> {
                                            Buffer actualBuffer = bufferedWriteStreamConsumer.toBuffer();
                                            assertArrayEquals(context, testBuffer.getBytes(), actualBuffer.getBytes());
                                            assertEquals(context, true, fileBackedBuffer.isFileOpen());
                                        });
                            })
                            .flatMap(aVoid1 -> fileBackedBuffer.close());
                })
                .subscribe(new TestSubscriber(context, async));
    }
项目:sfs    文件:FileBackedBufferTest.java   
public void testOnlyBuffer(TestContext context, boolean encrypt) throws IOException {

        SfsVertx sfsVertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool);
        Path tmpDir = createTempDirectory(valueOf(currentTimeMillis()));

        Buffer testBuffer = buffer("test");
        Async async = context.async();
        aVoid()
                .flatMap(aVoid -> {
                    FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(sfsVertx, 1024, encrypt, tmpDir);
                    ReadStream<Buffer> readStream = new BufferReadStream(testBuffer);
                    return pump(readStream, fileBackedBuffer)
                            .flatMap(aVoid1 -> {
                                ReadStream<Buffer> fileBackedReadStream = fileBackedBuffer.readStream();
                                BufferWriteEndableWriteStream bufferedWriteStreamConsumer = new BufferWriteEndableWriteStream();
                                return pump(fileBackedReadStream, bufferedWriteStreamConsumer)
                                        .doOnNext(aVoid2 -> {
                                            Buffer actualBuffer = bufferedWriteStreamConsumer.toBuffer();
                                            assertArrayEquals(context, testBuffer.getBytes(), actualBuffer.getBytes());
                                            assertEquals(context, false, fileBackedBuffer.isFileOpen());
                                        });
                            })
                            .flatMap(aVoid1 -> fileBackedBuffer.close());
                })
                .subscribe(new TestSubscriber(context, async));
    }
项目:georocket    文件:StoreClient.java   
/**
 * <p>Search the GeoRocket data store and return a {@link ReadStream} of
 * merged chunks matching the given criteria.</p>
 * <p>If <code>query</code> is <code>null</code> or empty all chunks from
 * the given <code>layer</code> (and all sub-layers) will be returned. If
 * <code>layer</code> is also <code>null</code> or empty the contents of the
 * whole data store will be returned.</p>
 * <p>The caller is responsible for handling exceptions through
 * {@link ReadStream#exceptionHandler(Handler)}.</p>
 * @param query a search query specifying which chunks to return (may be
 * <code>null</code>)
 * @param layer the name of the layer where to search for chunks recursively
 * (may be <code>null</code>)
 * @param handler a handler that will receive the {@link ReadStream} from
 * which the merged chunks matching the given criteria can be read
 */
public void search(String query, String layer,
    Handler<AsyncResult<ReadStream<Buffer>>> handler) {
  if ((query == null || query.isEmpty()) && (layer == null || layer.isEmpty())) {
    handler.handle(Future.failedFuture("No search query and no layer given. "
        + "Do you really wish to export/query the whole data store? If so, "
        + "set the layer to '/'."));
    return;
  }
  String queryPath = prepareQuery(query, layer);
  HttpClientRequest request = client.get(getEndpoint() + queryPath);
  request.exceptionHandler(t -> handler.handle(Future.failedFuture(t)));
  request.handler(response -> {
    if (response.statusCode() == 404) {
      fail(response, handler, message -> new NoSuchElementException(ClientAPIException.parse(message).getMessage()));
    } else if (response.statusCode() != 200) {
      fail(response, handler);
    } else {
      handler.handle(Future.succeededFuture(response));
    }
  });
  configureRequest(request).end();
}
项目:vertx-telegram-bot-api    文件:MultipartHelper.java   
public MultipartHelper putBinaryBody(String name, ReadStream<Buffer> stream, String contentType, String fileName, Handler<AsyncResult> handler) {
    request
            .write("--")
            .write(boundary)
            .write(System.lineSeparator())
            .write(String.format("Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"", name, fileName))
            .write(System.lineSeparator())
            .write(String.format("Content-Type: %s", contentType))
            .write(System.lineSeparator())
            .write("Content-Transfer-Encoding: binary")
            .write(System.lineSeparator())
            .write(System.lineSeparator());
    Pump.pump(stream
            .endHandler(event -> {
                request.write(System.lineSeparator());
                handler.handle(createResult(true, null));
            })
            .exceptionHandler(e -> handler.handle(createResult(false, e))), request)
            .start();
    return this;
}
项目:vertx-config    文件:ConfigRetrieverImpl.java   
@Override
public ReadStream<JsonObject> resume() {
  JsonObject conf;
  Handler<JsonObject> succ;
  synchronized (this) {
    if (! paused) {
      // Cannot resume a non paused stream
      return this;
    }

    paused = false;
    conf = last;
    if (last != null) {
      last = null;
    }
    succ = this.handler;
  }

  if (conf != null && succ != null) {
    vertx.runOnContext(v -> succ.handle(conf));
  }

  return this;
}
项目:tentacles    文件:AsyncFileStore.java   
/**
 * Asynchronously store content from source to filePath,
 * and call onEnd when finished
 * @param source
 * @param filePath
 * @param onEnd
 */
public static void asyncStore(Vertx vertx,
                       ReadStream<Buffer> source,
                       String filePath,
                       Handler<Void> onEnd) {
    checkDir(filePath);
    source.pause();
    vertx.fileSystem().open(filePath,
            new OpenOptions().setWrite(true).setCreate(true),
            fres -> {
                AsyncFile afile = fres.result();
                Pump pump = Pump.pump(source, afile);
                source.endHandler(onEnd);
                pump.start();
                source.resume();
            });
}
项目:vertx-fastdfs-client    文件:FdfsClientImpl.java   
@Override
public FdfsClient upload(ReadStream<Buffer> stream, long size, String ext,
        Handler<AsyncResult<FdfsFileId>> handler) {

    if (Buffer.buffer(ext, options.getCharset()).length() > FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN) {
        handler.handle(Future
                .failedFuture("ext is too long ( greater than " + FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN + ")"));
        return this;
    }

    getTracker().setHandler(tracker -> {
        if (tracker.succeeded()) {
            tracker.result().getStoreStorage(storage -> {
                if (storage.succeeded()) {
                    storage.result().upload(stream, size, ext, upload -> {
                        handler.handle(upload);
                    });
                } else {
                    handler.handle(Future.failedFuture(storage.cause()));
                }
            });
        } else {
            handler.handle(Future.failedFuture(tracker.cause()));
        }
    });

    return this;
}
项目:vertx-fastdfs-client    文件:FdfsClientImpl.java   
@Override
public FdfsClient uploadAppender(ReadStream<Buffer> stream, long size, String ext,
        Handler<AsyncResult<FdfsFileId>> handler) {

    if (Buffer.buffer(ext, options.getCharset()).length() > FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN) {
        handler.handle(Future
                .failedFuture("ext is too long ( greater than " + FdfsProtocol.FDFS_FILE_EXT_NAME_MAX_LEN + ")"));
        return this;
    }

    getTracker().setHandler(tracker -> {
        if (tracker.succeeded()) {
            tracker.result().getStoreStorage(storage -> {
                if (storage.succeeded()) {
                    storage.result().uploadAppender(stream, size, ext, uploadAppender -> {
                        handler.handle(uploadAppender);
                    });
                } else {
                    handler.handle(Future.failedFuture(storage.cause()));
                }
            });
        } else {
            handler.handle(Future.failedFuture(tracker.cause()));
        }
    });

    return this;
}
项目:okapi    文件:ProxyService.java   
private void proxyRedirect(Iterator<ModuleInstance> it,
  ProxyContext pc, ReadStream<Buffer> stream, Buffer bcontent,
  ModuleInstance mi) {

  pc.trace("ProxyNull " + mi.getModuleDescriptor().getId());
  pc.closeTimer();
  // if no more entries in it, proxyR will return 404
  proxyR(it, pc, stream, bcontent);
}
项目:sfs    文件:BlobFile.java   
public Observable<Void> consume(SfsVertx vertx, long position, long length, ReadStream<Buffer> src, boolean assertAlignment) {
    Context context = vertx.getOrCreateContext();
    return defer(() -> {
        checkOpen();
        checkCanWrite();
        ObservableFuture<Void> drainHandler = RxHelper.observableFuture();
        if (writeQueueSupport.writeQueueFull()) {
            writeQueueSupport.drainHandler(context, drainHandler::complete);
        } else {
            drainHandler.complete(null);
        }
        return drainHandler.flatMap(aVoid ->
                using(
                        () -> {
                            AsyncFileWriter dst = createWriteStream(context, position, assertAlignment);
                            activeWriters.add(dst);
                            return dst;
                        },
                        sfsWriteStream -> {
                            BufferedEndableWriteStream bufferedWriteStream = new BufferedEndableWriteStream(sfsWriteStream);
                            LimitedWriteEndableWriteStream limitedWriteStream = new LimitedWriteEndableWriteStream(bufferedWriteStream, length);
                            return pump(src, limitedWriteStream)
                                    .doOnNext(aVoid1 -> activeWriters.remove(sfsWriteStream));
                        },
                        activeWriters::remove
                ));
    });
}
项目:sfs    文件:LocalNode.java   
@Override
public Observable<NodeWriteStreamBlob> createWriteStream(String volumeId, long length, final MessageDigestFactory... messageDigestFactories) {
    LocalNode _this = this;
    return defer(() -> {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(format("createWriteStream {volumeId=%s,length=%d,messageDigests=%s}", volumeId, length, on(',').join(messageDigestFactories)));
        }
        final Volume volume = volumeManager.get(volumeId).get();
        return volume.putDataStream(vertxContext.vertx(), length)
                .map((Func1<WriteStreamBlob, NodeWriteStreamBlob>) writeStreamBlob -> new NodeWriteStreamBlob(_this) {
                    @Override
                    public Observable<DigestBlob> consume(ReadStream<Buffer> src) {
                        DigestReadStream digestWriteStream = new DigestReadStream(src, messageDigestFactories);
                        return writeStreamBlob.consume(digestWriteStream)
                                .map(aVoid -> {
                                    DigestBlob digestBlob =
                                            new DigestBlob(writeStreamBlob.getVolume(),
                                                    writeStreamBlob.getPosition(),
                                                    writeStreamBlob.getLength());
                                    for (MessageDigestFactory messageDigestFactory : messageDigestFactories) {
                                        digestBlob.withDigest(messageDigestFactory, digestWriteStream.getDigest(messageDigestFactory).get());
                                    }
                                    return digestBlob;
                                });
                    }
                });
    }).onErrorResumeNext(new HandleServerToBusy<>());
}
项目:sfs    文件:VolumeReplicaGroup.java   
public Observable<List<DigestBlob>> consume(final long length, final Iterable<MessageDigestFactory> messageDigestFactories, ReadStream<Buffer> src) {
    return calculateNodeWriteStreamBlobs(length, toArray(messageDigestFactories, MessageDigestFactory.class))
            .flatMap(nodeWriteStreamBlobs -> {
                int size = nodeWriteStreamBlobs.size();
                List<Observable<DigestBlob>> oDigests = new ArrayList<>(size);
                List<BufferEndableWriteStream> writeStreams = new ArrayList<>(size);
                for (int i = 0; i < size; i++) {
                    PipedReadStream readStream = new PipedReadStream();
                    PipedEndableWriteStream writeStream = new PipedEndableWriteStream(readStream);
                    Observable<DigestBlob> oDigest = nodeWriteStreamBlobs.get(i).consume(readStream);
                    oDigests.add(oDigest);
                    writeStreams.add(writeStream);
                }

                MultiEndableWriteStream multiWriteStream = new MultiEndableWriteStream(writeStreams);
                Observable<Void> producer = pump(src, multiWriteStream).single();
                Observable<List<DigestBlob>> consumer =
                        Observable.mergeDelayError(oDigests)
                                .toList()
                                .single();
                // the zip operator will not work here
                // since the subscriptions need to run
                // in parallel due to the pipe connections
                return combineSinglesDelayError(
                        producer,
                        consumer,
                        (aVoid, response) -> response);
            });
}
项目:sfs    文件:DigestReadStream.java   
public DigestReadStream(ReadStream<Buffer> delegate, MessageDigestFactory messageDigest, MessageDigestFactory... mdfs) {
    this.delegate = delegate;
    this.messageDigests = new MessageDigest[1 + mdfs.length];
    MessageDigest md = messageDigest.instance();
    this.messageDigests[0] = md;
    digests.put(messageDigest, md);
    for (int i = 0; i < mdfs.length; i++) {
        MessageDigestFactory mdf = mdfs[i];
        md = mdf.instance();
        this.messageDigests[i + 1] = md;
        digests.put(mdf, md);
    }
}
项目:sfs    文件:FileBackedBuffer.java   
public ReadStream<Buffer> readStream() {
    checkReadStreamNotOpen();
    openedReadStream = true;
    if (fileOpen) {
        AsyncFileReader asyncFileReader = new AsyncFileReaderImpl(context, 0, 8192, countingEndableWriteStream.count(), channel, LOGGER);
        if (encryptTempFile) {
            return algorithm.decrypt(asyncFileReader);
        } else {
            return asyncFileReader;
        }
    } else {
        return new BufferReadStream(memory);
    }
}
项目:sfs    文件:FileBackedBufferTest.java   
public void testLarge(TestContext context, boolean encrypt) throws IOException {

        SfsVertx sfsVertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool);
        Path tmpDir = createTempDirectory(valueOf(currentTimeMillis()));

        byte[] data = new byte[1024 * 1024 * 10];
        getCurrentInstance().nextBytesBlocking(data);
        Buffer testBuffer = buffer(data);
        Async async = context.async();
        aVoid()
                .flatMap(aVoid -> {
                    FileBackedBuffer fileBackedBuffer = new FileBackedBuffer(sfsVertx, 1024, encrypt, tmpDir);
                    ReadStream<Buffer> readStream = new BufferReadStream(testBuffer);
                    return pump(readStream, fileBackedBuffer)
                            .flatMap(aVoid1 -> {
                                ReadStream<Buffer> fileBackedReadStream = fileBackedBuffer.readStream();
                                BufferWriteEndableWriteStream bufferedWriteStreamConsumer = new BufferWriteEndableWriteStream();
                                return pump(fileBackedReadStream, bufferedWriteStreamConsumer)
                                        .doOnNext(aVoid2 -> {
                                            Buffer actualBuffer = bufferedWriteStreamConsumer.toBuffer();
                                            assertArrayEquals(context, testBuffer.getBytes(), actualBuffer.getBytes());
                                            assertEquals(context, true, fileBackedBuffer.isFileOpen());
                                        });
                            })
                            .flatMap(aVoid1 -> fileBackedBuffer.close());
                })
                .subscribe(new TestSubscriber(context, async));
    }
项目:georocket    文件:BufferReadStream.java   
@Override
public ReadStream<Buffer> resume() {
  if (paused) {
    paused = false;
    if (handler != null) {
      Handler<Buffer> h = handler;
      handler = null;
      doWrite(h);
    }
  }
  return this;
}
项目:georocket    文件:MongoDBChunkReadStream.java   
@Override
public ReadStream<Buffer> resume() {
  check();
  if (paused && !closed) {
    paused = false;
    if (handler != null) {
      doRead();
    }
  }
  return this;
}
项目:georocket    文件:AbstractClient.java   
/**
 * Execute a get request with specified path, query and additional parameter
 * @param endpoint the endpoint
 * @param parameterName the name of the query parameter
 * @param parameterValue the value of the query parameter
 * @param query a search query specifying which chunks to return (may be
 * <code>null</code>)
 * @param layer the name of the layer where to search for chunks recursively
 * (may be <code>null</code>)
 * @param handler a handler that will receive the {@link ReadStream} from
 * which the results matching the given criteria can be read
 */
protected void getWithParameter(String endpoint, String parameterName,
    String parameterValue, String query, String layer,
    Handler<AsyncResult<ReadStream<Buffer>>> handler) {
  if ((query == null || query.isEmpty()) && (layer == null || layer.isEmpty())) {
    handler.handle(Future.failedFuture("No search query and no layer given. "
      + "Do you really wish to export/query the whole data store? If so, "
      + "set the layer to '/'."));
    return;
  }
  String queryPath = prepareQuery(query, layer);
  if (query == null || query.isEmpty()) {
    queryPath += "?";
  } else {
    queryPath += "&";
  }

  String path = endpoint + queryPath + parameterName + "=" + parameterValue;
  HttpClientRequest request = client.get(path);
  request.exceptionHandler(t -> handler.handle(Future.failedFuture(t)));
  request.handler(response -> {
    if (response.statusCode() != 200) {
      fail(response, handler);
    } else {
      handler.handle(Future.succeededFuture(response));
    }
  });
  configureRequest(request).end();
}
项目:georocket    文件:StoreClientSearchTest.java   
private Handler<ReadStream<Buffer>> assertExport(String url, String XML,
    TestContext context, Async async) {
  return r -> {
    Buffer response = Buffer.buffer();
    r.handler(response::appendBuffer);
    r.endHandler(v -> {
      context.assertEquals(XML, response.toString());
      verifyRequested(url, context);
      async.complete();
    });
  };
}
项目:vertx-infinispan    文件:InfinispanAsyncMapImpl.java   
@Override
public ReadStream<Entry<K, V>> entryStream() {
  return new CloseableIteratorCollectionStream<>(vertx.getOrCreateContext(), cache::entrySet, cacheEntry -> {
    K key = DataConverter.fromCachedObject(cacheEntry.getKey());
    V value = DataConverter.fromCachedObject(cacheEntry.getValue());
    return new SimpleImmutableEntry<>(key, value);
  });
}
项目:logbulk    文件:PressureHandler.java   
/**
 * Create a new back pressure handler.
 *
 * @param stream     stream to handle.
 * @param endpoint   endpoint name.
 * @param endHandler end handler to call.
 */
public PressureHandler(@NonNull ReadStream stream, String endpoint, Handler<Void> endHandler) {
    this.stream = stream;
    stream.endHandler(h -> {
        if (endHandler != null) endHandler.handle(null);
        nextPressure.clear();
        ended = true;
        log.info("Finish to read stream: " + endpoint);
    });
}
项目:logbulk    文件:ComponentVerticle.java   
/**
 * Handle back-pressure on component.
 *
 * @param stream     stream in read.
 * @param endHandler end handler to call.
 */
public void handlePressure(ReadStream stream, Handler<Void> endHandler) {
    MessageConsumer<String> consumer = eventBus.consumer(parentEndpoint + ".pressure");
    consumer.handler(new PressureHandler(stream, parentEndpoint, h -> {
        if (endHandler != null) {
            endHandler.handle(null);
        }
        consumer.unregister();
    }));
}
项目:logbulk    文件:PressureHandlerTest.java   
@Test public void testConstructor2() {
    // Prepare read stream
    ReadStream mockedStream = mock(ReadStream.class);

    // Test
    new PressureHandler(mockedStream, "test-endpoint");
    verify(mockedStream).endHandler(any());
}
项目:logbulk    文件:PressureHandlerTest.java   
@Test public void testHandle1() {
    // Prepare mocks
    ReadStream mockedStream = mock(ReadStream.class);
    Message<String> mockedMessage = mock(Message.class);
    when(mockedMessage.body()).thenReturn("next-endpoint");

    // Test
    PressureHandler handler = new PressureHandler(mockedStream, "test-endpoint");
    handler.handle(mockedMessage);
    verify(mockedStream).pause();
}
项目:logbulk    文件:PressureHandlerTest.java   
@Test public void testHandle2() {
    // Prepare mocks
    ReadStream mockedStream = mock(ReadStream.class);
    Message<String> mockedMessage = mock(Message.class);
    when(mockedMessage.body()).thenReturn("next-endpoint");

    // Test
    PressureHandler handler = new PressureHandler(mockedStream, "test-endpoint");
    handler.handle(mockedMessage);
    handler.handle(mockedMessage);
    verify(mockedStream).pause();
    verify(mockedStream).resume();
}
项目:logbulk    文件:PressureHandlerTest.java   
@Test public void testHandle5() {
    // Prepare mocks
    ReadStream mockedStream = mock(ReadStream.class);
    Message<String> mockedMessage1 = mock(Message.class);
    when(mockedMessage1.body()).thenReturn("next-endpoint-1");
    Message<String> mockedMessage2 = mock(Message.class);
    when(mockedMessage2.body()).thenReturn("next-endpoint-2");

    // Test
    PressureHandler handler = new PressureHandler(mockedStream, "test-endpoint");
    handler.handle(mockedMessage1);
    handler.handle(mockedMessage2);
    verify(mockedStream).pause();
}
项目:vertx-infinispan    文件:InfinispanClusteredAsyncMapTest.java   
private <T> void testReadStream(
  Function<InfinispanAsyncMap<JsonObject, Buffer>, ReadStream<T>> streamFactory,
  BiConsumer<Map<JsonObject, Buffer>, List<T>> assertions
) {
  Map<JsonObject, Buffer> map = genJsonToBuffer(100);
  loadData(map, (vertx, asyncMap) -> {
    List<T> items = new ArrayList<>();
    ReadStream<T> stream = streamFactory.apply(InfinispanAsyncMap.unwrap(asyncMap));
    AtomicInteger idx = new AtomicInteger();
    long pause = 500;
    long start = System.nanoTime();
    stream.endHandler(end -> {
      assertions.accept(map, items);
      long duration = NANOSECONDS.toMillis(System.nanoTime() - start);
      assertTrue(duration >= 3 * pause);
      testComplete();
    }).exceptionHandler(t -> {
      fail(t);
    }).handler(item -> {
      items.add(item);
      int j = idx.getAndIncrement();
      if (j == 3 || j == 16 || j == 38) {
        stream.pause();
        int emitted = items.size();
        vertx.setTimer(pause, tid -> {
          assertTrue("Items emitted during pause", emitted == items.size());
          stream.resume();
        });
      }
    });
  });
  await();
}
项目:incubator-servicecomb-java-chassis    文件:InputStreamToReadStream.java   
@Override
public ReadStream<Buffer> endHandler(Handler<Void> handler) {
  check();
  this.endHandler = handler;
  return this;
}
项目:vertx-aws-lambda    文件:LambdaServer.java   
@Override
public ReadStream<HttpServerRequest> requestStream() {
    return null;
}
项目:vertx-aws-lambda    文件:LambdaServer.java   
@Override
public ReadStream<ServerWebSocket> websocketStream() {
    return null;
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
@Override
public FdfsStorage upload(ReadStream<Buffer> stream, long size, String ext,
        Handler<AsyncResult<FdfsFileId>> handler) {
    uploadFile(FdfsProtocol.STORAGE_PROTO_CMD_UPLOAD_FILE, stream, size, ext).setHandler(handler);
    return this;
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
@Override
public FdfsStorage uploadAppender(ReadStream<Buffer> stream, long size, String ext,
        Handler<AsyncResult<FdfsFileId>> handler) {
    uploadFile(FdfsProtocol.STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, stream, size, ext).setHandler(handler);
    return this;
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
@Override
public FdfsStorage append(ReadStream<Buffer> stream, long size, FdfsFileId fileId,
        Handler<AsyncResult<Void>> handler) {

    stream.pause();

    Future<FdfsConnection> futureConn = getConnection();

    futureConn.compose(connection -> {
        Future<FdfsPacket> futureResponse = FdfsProtocol.recvPacket(vertx, options.getNetworkTimeout(), connection, FdfsProtocol.STORAGE_PROTO_CMD_RESP, 0,
                null);

        Buffer nameBuffer = Buffer.buffer(fileId.name(), options.getCharset());
        long bodyLength = 2 * FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE + nameBuffer.length() + size;
        Buffer headerBuffer = FdfsProtocol.packHeader(FdfsProtocol.STORAGE_PROTO_CMD_APPEND_FILE, (byte) 0,
                bodyLength);

        connection.write(headerBuffer);
        if (connection.writeQueueFull()) {
            connection.pause();
            connection.drainHandler(v -> {
                connection.resume();
            });
        }

        Buffer bodyBuffer = FdfsUtils.newZero(bodyLength - size);

        int offset = 0;
        bodyBuffer.setLong(offset, nameBuffer.length());
        offset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE;
        bodyBuffer.setLong(offset, size);
        offset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE;
        bodyBuffer.setBuffer(offset, nameBuffer);

        connection.write(bodyBuffer);
        if (connection.writeQueueFull()) {
            connection.pause();
            connection.drainHandler(v -> {
                connection.resume();
            });
        }

        Pump.pump(stream, connection).start();
        stream.resume();

        return futureResponse;
    }).setHandler(ar -> {
        if (futureConn.succeeded()) {
            futureConn.result().release();
        }

        if (ar.succeeded()) {
            handler.handle(Future.succeededFuture());
        } else {
            handler.handle(Future.failedFuture(ar.cause()));
        }
    });

    return this;
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
@Override
public FdfsStorage modify(ReadStream<Buffer> stream, long size, FdfsFileId fileId, long offset,
        Handler<AsyncResult<Void>> handler) {

    stream.pause();

    Future<FdfsConnection> futureConn = getConnection();

    futureConn.compose(connection -> {
        Future<FdfsPacket> futureResponse = FdfsProtocol.recvPacket(vertx, options.getNetworkTimeout(), connection, FdfsProtocol.STORAGE_PROTO_CMD_RESP, 0,
                null);

        Buffer nameBuffer = Buffer.buffer(fileId.name(), options.getCharset());
        long bodyLength = 3 * FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE + nameBuffer.length() + size;
        Buffer headerBuffer = FdfsProtocol.packHeader(FdfsProtocol.STORAGE_PROTO_CMD_MODIFY_FILE, (byte) 0,
                bodyLength);

        connection.write(headerBuffer);
        if (connection.writeQueueFull()) {
            connection.pause();
            connection.drainHandler(v -> {
                connection.resume();
            });
        }

        Buffer bodyBuffer = FdfsUtils.newZero(bodyLength - size);

        int bufferOffset = 0;
        bodyBuffer.setLong(bufferOffset, nameBuffer.length());
        bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE;
        bodyBuffer.setLong(bufferOffset, offset);
        bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE;
        bodyBuffer.setLong(bufferOffset, size);
        bufferOffset += FdfsProtocol.FDFS_PROTO_PKG_LEN_SIZE;
        bodyBuffer.setBuffer(bufferOffset, nameBuffer);

        connection.write(bodyBuffer);
        if (connection.writeQueueFull()) {
            connection.pause();
            connection.drainHandler(v -> {
                connection.resume();
            });
        }

        Pump.pump(stream, connection).start();
        stream.resume();

        return futureResponse;
    }).setHandler(ar -> {

        if (futureConn.succeeded()) {
            futureConn.result().release();
        }

        if (ar.succeeded()) {
            handler.handle(Future.succeededFuture());
        } else {
            handler.handle(Future.failedFuture(ar.cause()));
        }
    });

    return this;
}