/** * Returns the timestamp of the latest oplog entry. * * @param collection The oplog {@link MongoCollection} * @return The latest timestamp or {@code null} if no entry is available */ public static BsonTimestamp getLatestOplogTimestamp(MongoCollection<BsonDocument> collection) { final AtomicReference<BsonTimestamp> timestamp = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>(); final CountDownLatch waiter = new CountDownLatch(1); collection.find().sort(new Document("$natural", -1)).limit(1).first(new SingleResultCallback<BsonDocument>() { @Override public void onResult(BsonDocument document, Throwable throwable) { if (throwable != null) error.set(throwable); if (document != null) timestamp.set(document.getTimestamp("ts")); waiter.countDown(); } }); ConcurrentUtils.safeAwait(waiter); Throwable realError = error.get(); if (realError != null) throw Throwables.propagate(realError); return timestamp.get(); }
public List<JSONObject> getWithinBoundingBox(double btm, double tp, double rght, double lft) { List<JSONObject> finalList = Collections.synchronizedList(new ArrayList<>()); MongoCollection<Document> roadsNodes = this.database.getCollection("ollie_roads"); Block<Document> getBlock = new Block<Document>() { @Override public void apply(final Document document) { finalList.add(new JSONObject(document)); } }; SingleResultCallback<Void> callbackWhenFinished = new SingleResultCallback<Void>() { @Override public void onResult(final Void result, final Throwable t) { latch.countDown(); } }; roadsNodes.find(and(gte("startNode.latitude", btm), lte("startNode.latitude", tp), gte("startNode.longitude", lft), lte("startNode.longitude", rght), gte("endNode.latitude", btm), lte("endNode.latitude", tp), gte("endNode.longitude", lft), lte("endNode.longitude", rght))). forEach(getBlock, callbackWhenFinished); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return finalList; }
/** * * @param resultHandler * @param converter * @param <T> * @param <R> * @return */ private <T, R> SingleResultCallback<T> convertCallback(Handler<AsyncResult<R>> resultHandler, Function<T, R> converter) { Context context = mongi.vertx.getOrCreateContext(); return (result, error) -> { context.runOnContext(v -> { if (error != null) { resultHandler.handle(Future.failedFuture(error)); } else { resultHandler.handle(Future.succeededFuture(converter.apply(result))); } }); }; }
@Override public Publisher<Success> createView(final String viewName, final String viewOn, final List<? extends Bson> pipeline, final CreateViewOptions createViewOptions) { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.createView(viewName, viewOn, pipeline, createViewOptions, voidToSuccessCallback(callback)); } })); }
@Override public Observable<Success> close() { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.close(voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Observable<Success> abort() { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.abort(voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Publisher<Success> delete(final ClientSession clientSession, final BsonValue id) { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.delete(clientSession, id, voidToSuccessCallback(callback)); } })); }
@Override public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests, final BulkWriteOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() { @Override public void apply(final SingleResultCallback<BulkWriteResult> callback) { wrapped.bulkWrite(requests, options, callback); } }), observableAdapter); }
@Override public Publisher<Success> delete(final ClientSession clientSession, final ObjectId id) { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.delete(clientSession, id, voidToSuccessCallback(callback)); } })); }
@Override public Observable<Success> insertOne(final TDocument document, final InsertOneOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.insertOne(document, options, voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Observable<Success> insertMany(final List<? extends TDocument> documents, final InsertManyOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.insertMany(documents, options, voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Observable<DeleteResult> deleteOne(final Bson filter) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<DeleteResult>>() { @Override public void apply(final SingleResultCallback<DeleteResult> callback) { wrapped.deleteOne(filter, callback); } }), observableAdapter); }
<T> void callResult(final SingleResultCallback<T> callback) { Object response = responses.remove(0); if (response instanceof Throwable) { callback.onResult(null, (Throwable) response); } else { callback.onResult((T) response, null); } }
@Override public Observable<DeleteResult> deleteMany(final Bson filter) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<DeleteResult>>() { @Override public void apply(final SingleResultCallback<DeleteResult> callback) { wrapped.deleteMany(filter, callback); } }), observableAdapter); }
@Override public Observable<DeleteResult> deleteMany(final Bson filter, final DeleteOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<DeleteResult>>() { @Override public void apply(final SingleResultCallback<DeleteResult> callback) { wrapped.deleteMany(filter, options, callback); } }), observableAdapter); }
@Override public Observable<UpdateResult> replaceOne(final Bson filter, final TDocument replacement, final UpdateOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<UpdateResult>>() { @Override public void apply(final SingleResultCallback<UpdateResult> callback) { wrapped.replaceOne(filter, replacement, options, callback); } }), observableAdapter); }
@Override public Observable<UpdateResult> updateOne(final Bson filter, final Bson update, final UpdateOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<UpdateResult>>() { @Override public void apply(final SingleResultCallback<UpdateResult> callback) { wrapped.updateOne(filter, update, options, callback); } }), observableAdapter); }
@Override public Observable<UpdateResult> updateMany(final Bson filter, final Bson update, final UpdateOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<UpdateResult>>() { @Override public void apply(final SingleResultCallback<UpdateResult> callback) { wrapped.updateMany(filter, update, options, callback); } }), observableAdapter); }
@Override public Observable<TDocument> findOneAndDelete(final Bson filter, final FindOneAndDeleteOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<TDocument>>() { @Override public void apply(final SingleResultCallback<TDocument> callback) { wrapped.findOneAndDelete(filter, options, callback); } }), observableAdapter); }
@Override public Publisher<Long> downloadToStream(final ClientSession clientSession, final ObjectId id, final AsyncOutputStream destination) { return new ObservableToPublisher<Long>(observe(new Block<SingleResultCallback<Long>>() { @Override public void apply(final SingleResultCallback<Long> callback) { wrapped.downloadToStream(clientSession, id, toCallbackAsyncOutputStream(destination), callback); } })); }
@Override public Observable<TDocument> findOneAndUpdate(final Bson filter, final Bson update, final FindOneAndUpdateOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<TDocument>>() { @Override public void apply(final SingleResultCallback<TDocument> callback) { wrapped.findOneAndUpdate(filter, update, options, callback); } }), observableAdapter); }
@Override public Publisher<Success> drop(final ClientSession clientSession) { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.drop(clientSession, voidToSuccessCallback(callback)); } })); }
@Override public <T> void execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ClientSession session, final SingleResultCallback<T> callback) { readPreferences.add(readPreference); clientSessions.add(session); if (queueExecution) { queuedReadOperations.add(operation); queuedReadCallbacks.add(callback); } else { readOperations.add(operation); callResult(callback); } }
@Override public Observable<String> createIndexes(final List<IndexModel> indexes) { return RxObservables.create(Observables.observeAndFlatten(new Block<SingleResultCallback<List<String>>>() { @Override public void apply(final SingleResultCallback<List<String>> callback) { wrapped.createIndexes(indexes, callback); } }), observableAdapter); }
@Override public Observable<Success> dropIndex(final String indexName) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.dropIndex(indexName, voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Publisher<Success> uploadFromStream(final BsonValue id, final String filename, final AsyncInputStream source, final GridFSUploadOptions options) { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.uploadFromStream(id, filename, toCallbackAsyncInputStream(source), options, voidToSuccessCallback(callback)); } })); }
@Override public Observable<Success> renameCollection(final MongoNamespace newCollectionNamespace, final RenameCollectionOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.renameCollection(newCollectionNamespace, options, voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Observable<GridFSFile> first() { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<GridFSFile>>(){ @Override public void apply(final SingleResultCallback<GridFSFile> callback) { wrapped.first(callback); } }), observableAdapter); }
@Override public <TResult> Observable<TResult> runCommand(final Bson command, final Class<TResult> clazz) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<TResult>>() { @Override public void apply(final SingleResultCallback<TResult> callback) { wrapped.runCommand(command, clazz, callback); } }), observableAdapter); }
@Override public <TResult> Observable<TResult> runCommand(final Bson command, final ReadPreference readPreference, final Class<TResult> clazz) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<TResult>>() { @Override public void apply(final SingleResultCallback<TResult> callback) { wrapped.runCommand(command, readPreference, clazz, callback); } }), observableAdapter); }
@Override public Publisher<Success> createCollection(final ClientSession clientSession, final String collectionName, final CreateCollectionOptions options) { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.createCollection(clientSession, collectionName, options, voidToSuccessCallback(callback)); } })); }
@Override public Observable<Success> createCollection(final String collectionName, final CreateCollectionOptions options) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.createCollection(collectionName, options, voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Publisher<Success> toCollection() { return new ObservableToPublisher<Success>(observe(new Block<SingleResultCallback<Success>>(){ @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.toCollection(voidToSuccessCallback(callback)); } })); }
@Override public Observable<Success> createView(final String viewName, final String viewOn, final List<? extends Bson> pipeline, final CreateViewOptions createViewOptions) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.createView(viewName, viewOn, pipeline, createViewOptions, voidToSuccessCallback(callback)); } }), observableAdapter); }
@Override public Observable<GridFSFile> getGridFSFile() { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<GridFSFile>>() { @Override public void apply(final SingleResultCallback<GridFSFile> callback) { wrapped.getGridFSFile(callback); } }), observableAdapter); }
@Override public Observable<Integer> read(final ByteBuffer dst) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Integer>>() { @Override public void apply(final SingleResultCallback<Integer> callback) { wrapped.read(dst, callback); } }), observableAdapter); }
@Override public Publisher<ObjectId> uploadFromStream(final String filename, final AsyncInputStream source, final GridFSUploadOptions options) { return new ObservableToPublisher<ObjectId>(observe(new Block<SingleResultCallback<ObjectId>>() { @Override public void apply(final SingleResultCallback<ObjectId> callback) { wrapped.uploadFromStream(filename, toCallbackAsyncInputStream(source), options, callback); } })); }
@Override public Observable<Success> toCollection() { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<Success>>() { @Override public void apply(final SingleResultCallback<Success> callback) { wrapped.toCollection(voidToSuccessCallback(callback)); } }), observableAdapter); }
/** * Helper to trigger Boolean SingleResultCallbacks for Void operations * * @param callback the boolean single result callback. * @return the results callback for an operation that returns null to signal success. */ public static SingleResultCallback<Void> voidToSuccessCallback(final SingleResultCallback<Success> callback) { return new SingleResultCallback<Void>() { @Override public void onResult(final Void result, final Throwable t) { callback.onResult(Success.SUCCESS, t); } }; }
@Override public Observable<ObjectId> uploadFromStream(final String filename, final AsyncInputStream source) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<ObjectId>>() { @Override public void apply(final SingleResultCallback<ObjectId> callback) { wrapped.uploadFromStream(filename, toCallbackAsyncInputStream(source), callback); } }), observableAdapter); }