@Override public Void call() throws Exception { final Date now = new Date(); final Document query = new Document("ns", ns) .append("ts", new Document("$gt", new BSONTimestamp((int) (now.getTime() / 1000), 0))); final MongoCursor<Document> cursor = oplog.find(query) .cursorType(CursorType.TailableAwait).iterator(); while (cursor.hasNext()) { final Document doc = cursor.next(); for (final OplogListener listener : listeners) { listener.onOplog(doc); } } return null; }
@Before public void createCollection() throws Exception { MongoDatabase db = mongoClient.getDatabase(DATABASE); testCollectionName = name.getMethodName(); db.createCollection(testCollectionName); final long currentTime = System.currentTimeMillis(); //To make sure that oplog is read on each method after we created the above collection. //We let this current second pass, before we get the initial timestamp seconds. Awaitility.await().untilTrue(new AtomicBoolean((System.currentTimeMillis() - currentTime) > 1000)); //So we can skip old oplogs and just start with whatever this test is producing initialTs = getInitialTsFromCurrentTime(); testDocuments = mongoClient.getDatabase(DATABASE).getCollection(testCollectionName); mongoCursorFindIterable = mongoClient.getDatabase("local").getCollection(OPLOG_COLLECTION) .find() //As the collection is a capped collection we use Tailable cursor which will return results in natural order in this case //based on ts timestamp field. //Tailable Await does not return and blocks, so we are using tailable. .cursorType(CursorType.Tailable); }
private FindIterable<O2MSyncEventLog> getCursor() throws InterruptedException { Thread.sleep(waitTime); waitTime *= retryCount; logCollection = MongoConnection.INSTANCE.getMongoDataBase() .getCollection(String.valueOf(ApplicationCollections.O2MSyncEventLog), O2MSyncEventLog.class); FindIterable<O2MSyncEventLog> it = logCollection .find(Filters.and(Filters.eq(O2MSyncEventLogCodec.EVENT_ID, String.valueOf(eventId)), Filters.eq(O2MSyncEventLogCodec.STATUS, O2MSyncEventLogCodec.PENDING))) .cursorType(CursorType.TailableAwait).noCursorTimeout(true); return it; }
private FindIterable<Document> getCursor(){ MongoClient client = DBCacheManager.INSTANCE.getCachedMongoPool(mongoDbName, mongoUserName); //MongoClient client = DBCacheManager.INSTANCE.getCachedMongoPool(mongoDbName, "ccwOplRO"); client.setReadPreference(ReadPreference.secondary()); MongoCollection<Document> collection =client.getDatabase(localDb).getCollection(oplogRs); FindIterable<Document> it = collection.find(Filters.and(Filters.eq(NS, ns),Filters.gt(TS, lastReadTime))) .cursorType(CursorType.TailableAwait).noCursorTimeout(true).maxAwaitTime(30, TimeUnit.MINUTES); return it; }
@Test public void testFind() { FindIterable<Document> find = coll.find(Filters.eq("name", "Alto"), Document.class) .sort(Sorts.ascending("color")); List<Document> docList = toDocumentList(find); assertEquals(4, docList.size()); find = coll.find(Filters.eq("name", "Alto")).sort(Sorts.ascending("color")); docList = toDocumentList(find); assertEquals(4, docList.size()); find = coll.find(Document.class).filter(Filters.eq("name", "Alto")).sort(Sorts.ascending("color")); docList = toDocumentList(find); assertEquals(4, docList.size()); find = coll.find().filter(Filters.eq("name", "Alto")).sort(Sorts.ascending("color")); docList = toDocumentList(find); assertEquals(4, docList.size()); find = coll.find().filter(Filters.eq("name", "Alto")).sort(Sorts.ascending("color")).batchSize(123) .collation(Collation.builder().build()).cursorType(CursorType.NonTailable).limit(2) .maxAwaitTime(12, TimeUnit.DAYS).maxTime(12, TimeUnit.DAYS).noCursorTimeout(true).oplogReplay(false) .partial(false).skip(1); docList = toDocumentList(find); assertEquals(2, docList.size()); Document firstFind = coll.find().filter(Filters.eq("name", "Alto")).sort(Sorts.ascending("color")).first(); Assert.assertNotNull(firstFind); coll.find().filter(Filters.eq("name", "Alto")).forEach(new Block<Document>() { @Override public void apply(Document t) { System.out.println(t.get("name")); } }); }
private FindIterable<Document> find(int page){ final FindIterable<Document> documents = oplog .find(query) .sort(new Document("$natural", 1)) .skip(page * batchSize) .limit(batchSize) .projection(Projections.include("ts", "op", "ns", "o")) .cursorType(CursorType.TailableAwait); return documents; }
private void bindHostToPublisher(MongoCollection<Document> tsCollection, Map<String, FindPublisher<Document>> publishers, List<MongoClientWrapper> clients) { for (MongoClientWrapper client : clients) { logger.info("------------ Binding "+client.getHost()+" to oplog. ---------------"); FindPublisher<Document> oplogPublisher = client.getClient().getDatabase("local") .getCollection("oplog.rs").find().filter(getQueryFilter(tsCollection, client)) .sort(new Document("$natural", 1)).cursorType(CursorType.TailableAwait); publishers.put(client.getHost(), oplogPublisher); } }
public static void main(String[] args) { try (MongoClient client = new MongoClient()) { FindIterable<Document> oplogTail = client.getDatabase("local") .getCollection("oplog.rs").find().filter(getQueryFilter()) .sort(new Document("$natural", 1)).cursorType(CursorType.TailableAwait); oplogTail.forEach((Block<Document>) document -> System.out.println(document)); } }
private void tailQueue() { while (!engine.isDestroyed()) { try { FindIterable<org.bson.Document> cursor; if (lastProcessed == null) { cursor = queueCollection.find(and(ne(PID, pid),gt(CREATED,new Date().getTime()))).cursorType(CursorType.TailableAwait); } else { cursor = queueCollection.find(and(ne(PID, pid), gt("_id", lastProcessed))).cursorType(CursorType.TailableAwait); } cursor.forEach(new Block<org.bson.Document>() { @Override public void apply(final org.bson.Document event) { lastProcessed = event.getObjectId("_id"); engine.execute(new Runnable() { @Override public void run() { processEvent(event); } }, null); } }); } catch (Exception ex) { LOG.severe("Exception while tailing event queue: " + ex.getMessage()); ex.printStackTrace(); } } }
private MongoCursor<BasicDBObject> initializeCursor() { Object lastVal = tailTracking.lastVal; // lastVal can be null if we are initializing and there is no persistence enabled MongoCursor<BasicDBObject> answer; if (lastVal == null) { answer = dbCol.find().cursorType(CursorType.TailableAwait).iterator(); } else { BasicDBObject queryObj = new BasicDBObject(tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal)); answer = dbCol.find(queryObj).cursorType(CursorType.TailableAwait).iterator(); } return answer; }
/** * {@inheritDoc} */ @Override public void run() { BsonTimestamp timestamp = OpLogUtils.getLatestOplogTimestamp(oplog); if (timestamp == null) { LOGGER.severe("OpLog is not ready. Please make sure that the server maintains an oplog and restart this server."); return; } final AtomicReference<BsonTimestamp> last = new AtomicReference<>(timestamp); //noinspection InfiniteLoopStatement while (true) { final CountDownLatch waiter = new CountDownLatch(1); oplog.find(Filters.and(Filters.gt("ts", last.get()), Filters.eq("ns", namespace))).cursorType(CursorType.TailableAwait).forEach( new Block<BsonDocument>() { @Override public void apply(BsonDocument document) { BsonTimestamp current = document.getTimestamp("ts"); if (current.getTime() > last.get().getTime()) { last.set(current); parser.emit(document); } } }, new SingleResultCallback<Void>() { @Override public void onResult(Void aVoid, Throwable throwable) { waiter.countDown(); } } ); ConcurrentUtils.safeAwait(waiter); } }
private void prepareCursor(int timestampSeconds, int ordinal, List<OplogOpType> filterOplogTypes, int batchSize) { LOG.debug("Getting new cursor with offset - TimeStampInSeconds:'{}', Ordinal : '{}' and Batch Size : '{}'",timestampSeconds, ordinal, batchSize); FindIterable<Document> mongoCursorIterable = mongoCollection .find() //As the collection is a capped collection we use Tailable cursor which will return results in natural order in this case //based on ts timestamp field. //Tailable Await does not return and blocks, so we are using tailable. .cursorType(CursorType.Tailable) .batchSize(batchSize); List<Bson> andFilters = new ArrayList<>(); //Only filter if we already have saved/initial offset specified or else both time_t and ordinal will not be -1. if (timestampSeconds > 0 && ordinal >= 0) { andFilters.add(Filters.gt(TIMESTAMP_FIELD, new BsonTimestamp(timestampSeconds, ordinal))); } if (!filterOplogTypes.isEmpty()) { List<Bson> oplogOptypeFilters = new ArrayList<>(); Set<OplogOpType> oplogOpTypesSet = new HashSet<>(); for (OplogOpType filterOplogopType : filterOplogTypes) { if (oplogOpTypesSet.add(filterOplogopType)) { oplogOptypeFilters.add(Filters.eq(OP_TYPE_FIELD, filterOplogopType.getOp())); } } //Add an or filter for filtered Or Types andFilters.add(Filters.or(oplogOptypeFilters)); } //Finally and timestamp with oplog filters if (!andFilters.isEmpty()) { mongoCursorIterable = mongoCursorIterable.filter(Filters.and(andFilters)); } cursor = mongoCursorIterable.iterator(); }
private CursorType toCursorType(QueryOptions queryOptions) { if (!queryOptions.isTailable()) { return CursorType.NonTailable; } if (queryOptions.isAwaitData()) { return CursorType.TailableAwait; } return CursorType.Tailable; }
@Test public void testTailableCursors() { getMorphia().map(CappedPic.class); getDs().ensureCaps(); final Query<CappedPic> query = getDs().find(CappedPic.class); final List<CappedPic> found = new ArrayList<CappedPic>(); final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); assertEquals(0, query.count()); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { getDs().save(new CappedPic(System.currentTimeMillis() + "")); } }, 0, 500, TimeUnit.MILLISECONDS); final Iterator<CappedPic> tail = query .fetch(new FindOptions() .cursorType(CursorType.Tailable)); Awaitility .await() .pollDelay(500, TimeUnit.MILLISECONDS) .atMost(10, TimeUnit.SECONDS) .until(new Callable<Boolean>() { @Override public Boolean call() { if (tail.hasNext()) { found.add(tail.next()); } return found.size() >= 10; } }); executorService.shutdownNow(); Assert.assertTrue(query.count() >= 10); }
public CursorType getCursorType() { return cursorType; }
public String getOperationName(String function) { StringBuilder builder = new StringBuilder(); String collectionName = getCollection().getNamespace().getCollectionName(); builder.append("Mongo : "); builder.append(collectionName); builder.append(" : find"); if( function != null ) { builder.append(" : "); builder.append(function); } Bson filter = getFilter(); if (filter != null) { filter = MongoUtilities.filterParameters(filter.toBsonDocument(BsonDocument.class, MongoClient.getDefaultCodecRegistry())); builder.append(" : Filter "); builder.append(filter.toString()); } Bson sort = getSort(); if (sort != null) { builder.append(" : Sort "); builder.append(sort.toString()); } Bson modifiers = getModifiers(); if (modifiers != null) { builder.append(" : Modifiers "); builder.append(modifiers.toString()); } Bson projection = getProjection(); if (projection != null) { builder.append(" : Projection "); builder.append(projection.toString()); } if (limit != -1) { builder.append(" : Limit "); builder.append(limit); } Collation collation = getCollation(); if (collation != null) { builder.append(" : Collation "); builder.append(collation.asDocument().toString()); } CursorType cursorType2 = getCursorType(); if (cursorType2 != null) { builder.append(" : Cursor Type "); builder.append(cursorType2.toString()); } return builder.toString(); }
protected CursorType getCursorType() { return CursorType.TailableAwait; }
private void prepareCursor(int maxBatchSize, String offsetField, String lastSourceOffset) { String stringOffset = ""; ObjectId objectIdOffset = null; if (null == cursor) { if (null == lastSourceOffset || lastSourceOffset.isEmpty()) { objectIdOffset = initialObjectId; stringOffset = initialId; } else { if (configBean.offsetType == OffsetFieldType.STRING) stringOffset = lastSourceOffset; else objectIdOffset = new ObjectId(lastSourceOffset); } LOG.debug("Getting new cursor with params: {} {} {}", maxBatchSize, offsetField, configBean.offsetType == OffsetFieldType.STRING ? stringOffset : objectIdOffset); if (configBean.isCapped) { cursor = mongoCollection .find() .filter(Filters.gt( offsetField, configBean.offsetType == OffsetFieldType.STRING ? stringOffset : objectIdOffset )) .cursorType(CursorType.TailableAwait) .batchSize(maxBatchSize) .iterator(); } else { cursor = mongoCollection .find() .filter(Filters.gt( offsetField, configBean.offsetType == OffsetFieldType.STRING ? stringOffset : objectIdOffset )) .sort(Sorts.ascending(offsetField)) .cursorType(CursorType.NonTailable) .batchSize(maxBatchSize) .iterator(); } } }
@Test public void passThrough() { Collation collation = Collation.builder() .locale("en") .caseLevel(true) .build(); DBCollectionFindOptions options = new FindOptions() .batchSize(42) .limit(18) .modifier("i'm a", "modifier") .modifier("i am", 2) .projection(new BasicDBObject("field", "value")) .maxTime(15, TimeUnit.MINUTES) .maxAwaitTime(45, TimeUnit.SECONDS) .skip(12) .sort(new BasicDBObject("field", -1)) .cursorType(CursorType.TailableAwait) .noCursorTimeout(true) .oplogReplay(true) .partial(true) .readPreference(ReadPreference.secondaryPreferred(2, TimeUnit.MINUTES)) .readConcern(ReadConcern.LOCAL) .collation(collation).getOptions(); assertEquals(42, options.getBatchSize()); assertEquals(18, options.getLimit()); assertEquals(new BasicDBObject("i'm a", "modifier") .append("i am", 2), options.getModifiers()); assertEquals(new BasicDBObject("field", "value"), options.getProjection()); assertEquals(15, options.getMaxTime(TimeUnit.MINUTES)); assertEquals(45, options.getMaxAwaitTime(TimeUnit.SECONDS)); assertEquals(12, options.getSkip()); assertEquals(new BasicDBObject("field", -1), options.getSort()); assertEquals(CursorType.TailableAwait, options.getCursorType()); assertTrue(options.isNoCursorTimeout()); assertTrue(options.isOplogReplay()); assertTrue(options.isPartial()); assertEquals(ReadPreference.secondaryPreferred(2, TimeUnit.MINUTES), options.getReadPreference()); assertEquals(ReadConcern.LOCAL, options.getReadConcern()); assertEquals(collation, options.getCollation()); }
/** * Get the cursor type. * * @return the cursor type */ public CursorType getCursorType() { return options.getCursorType(); }