/** * Prepares {@link ReadPreference} from given {@link ReadPreferenceEnum} * * @param readPreferenceEnum Read preference enum value provided in config * @return Read preference for mongo client options */ public static ReadPreference readPreference(ReadPreferenceEnum readPreferenceEnum) { switch (readPreferenceEnum) { case NEAREST: return ReadPreference.nearest(); case PRIMARY: return ReadPreference.primary(); case SECONDARY: return ReadPreference.secondary(); case PRIMARY_PREFERRED: return ReadPreference.primaryPreferred(); case SECONDARY_PREFERRED: return ReadPreference.secondaryPreferred(); default: return null; } }
private void prepareClient() { try { ServerAddress address = new ServerAddress(config.getMongo().getHost(), config.getMongo().getPort()); MongoClientOptions options = MongoClientOptions.builder() .serverSelectionTimeout(5000) .socketKeepAlive(false) .readPreference(ReadPreference.primaryPreferred()) .sslInvalidHostNameAllowed(true) .build(); client = connectToClient(address, options); } catch (Exception ex) { logger.error(ex.getMessage(), ex); System.exit(-1); } }
public MongoClientOptions toMongoClientOptions(final CodecRegistry codecRegistry) { return builder() .sslEnabled(sslEnabled) .codecRegistry(codecRegistry) .readPreference(ReadPreference.valueOf(readPreference)) .connectTimeout(connectTimeout) .serverSelectionTimeout(serverSelectionTimeout) .cursorFinalizerEnabled(true) .maxWaitTime(maxWaitTime) .maxConnectionLifeTime(connectionpool.getMaxLifeTime()) .threadsAllowedToBlockForConnectionMultiplier(connectionpool.getBlockedConnectionMultiplier()) .maxConnectionIdleTime(connectionpool.getMaxIdleTime()) .minConnectionsPerHost(connectionpool.getMinSize()) .connectionsPerHost(connectionpool.getMaxSize()) .build(); }
public void init( Stage.Context context, List<Stage.ConfigIssue> issues, ReadPreference readPreference, WriteConcern writeConcern ) { mongoClient = createClient(context, issues, readPreference, writeConcern); if (!issues.isEmpty()) { return; } mongoDatabase = createMongoDatabase(context, issues, readPreference, writeConcern); if (!issues.isEmpty()) { return; } mongoCollection = createMongoCollection(context, issues, readPreference, writeConcern); }
private MongoDatabase createMongoDatabase( Stage.Context context, List<Stage.ConfigIssue> issues, ReadPreference readPreference, WriteConcern writeConcern ) { MongoDatabase mongoDatabase = null; try { if (readPreference != null) { mongoDatabase = mongoClient.getDatabase(database).withReadPreference(readPreference); } else if (writeConcern != null) { mongoDatabase = mongoClient.getDatabase(database).withWriteConcern(writeConcern); } } catch (MongoClientException e) { issues.add(context.createConfigIssue( Groups.MONGODB.name(), MONGO_CONFIG_PREFIX + "database", Errors.MONGODB_02, database, e.toString() )); } return mongoDatabase; }
private MongoCollection createMongoCollection( Stage.Context context, List<Stage.ConfigIssue> issues, ReadPreference readPreference, WriteConcern writeConcern ) { MongoCollection mongoCollection = null; try { if (readPreference != null) { mongoCollection = mongoDatabase.getCollection(collection).withReadPreference(readPreference); } else if (writeConcern != null) { mongoCollection = mongoDatabase.getCollection(collection).withWriteConcern(writeConcern); } } catch (MongoClientException e) { issues.add(context.createConfigIssue( Groups.MONGODB.name(), MONGO_CONFIG_PREFIX + "collection", Errors.MONGODB_03, collection, e.toString() )); } return mongoCollection; }
public static void checkCapped(MongoDatabase database, String collectionName, int size, int maxDocuments, boolean delete) { if (Lists.newArrayList(database.listCollectionNames()).contains(collectionName)) { log.debug("'{}' collection already exists...", collectionName); // Check if already capped Document command = new Document("collStats", collectionName); boolean isCapped = database.runCommand(command, ReadPreference.primary()).getBoolean("capped").booleanValue(); if (!isCapped) { if (delete) { database.getCollection(collectionName).drop(); database.createCollection(collectionName, new CreateCollectionOptions().capped(true).maxDocuments(maxDocuments).sizeInBytes(size)); } else { log.info("'{}' is not capped, converting it...", collectionName); command = new Document("convertToCapped", collectionName).append("size", size).append("max", maxDocuments); database.runCommand(command, ReadPreference.primary()); } } else { log.debug("'{}' collection already capped!", collectionName); } } else { database.createCollection(collectionName, new CreateCollectionOptions().capped(true).maxDocuments(maxDocuments).sizeInBytes(size)); } }
private MongoBlob getBlob(String id, long lastMod) { DBObject query = getBlobQuery(id, lastMod); // try the secondary first // TODO add a configuration option for whether to try reading from secondary ReadPreference pref = ReadPreference.secondaryPreferred(); DBObject fields = new BasicDBObject(); fields.put(MongoBlob.KEY_DATA, 1); MongoBlob blob = (MongoBlob) getBlobCollection().findOne(query, fields, pref); if (blob == null) { // not found in the secondary: try the primary pref = ReadPreference.primary(); blob = (MongoBlob) getBlobCollection().findOne(query, fields, pref); } return blob; }
@Override public CloseableIterable<NodeDocument> getCandidates(final long startTime) { DBObject query = start(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals( NodeDocument.getModifiedInSecs(startTime)) .get(); DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1); DBCursor cursor = getNodeCollection().find(query) .sort(sortFields) .setReadPreference(ReadPreference.primary()); return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() { @Override public NodeDocument apply(DBObject input) { return store.convertFromDBObject(Collection.NODES, input); } }), cursor); }
@Override public void setReadWriteMode(String readWriteMode) { if (readWriteMode == null || readWriteMode.equals(lastReadWriteMode)) { return; } lastReadWriteMode = readWriteMode; try { Map<String, String> map = Splitter.on(", ").withKeyValueSeparator(":").split(readWriteMode); String read = map.get("read"); if (read != null) { ReadPreference readPref = ReadPreference.valueOf(read); if (!readPref.equals(this.readPreference)) { this.readPreference = readPref; } } String write = map.get("write"); if (write != null) { WriteConcern writeConcern = WriteConcern.valueOf(write); if (!writeConcern.equals(this.writeConcern)) { this.writeConcern = writeConcern; } } } catch (Exception e) { // unsupported or parse error - ignore } }
@Test public void testMongoReadPreferencesDefault() throws Exception{ assertEquals(ReadPreference.primary(), mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY)); assertEquals(ReadPreference.primaryPreferred(), mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY)); //By default Mongo read preference is primary assertEquals(ReadPreference.primary(), mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY)); //Change the default and assert again mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary()); assertEquals(ReadPreference.secondary(), mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY)); //for case where parent age cannot be determined the preference should be primaryPreferred assertEquals(ReadPreference.primaryPreferred(), mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); //For collection other than NODES always primary assertEquals(ReadPreference.primary(), mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); }
@Test public void testMongoReadPreferencesWithAge() throws Exception{ //Change the default ReadPreference testPref = ReadPreference.secondary(); mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref); NodeBuilder b1 = nodeStore.getRoot().builder(); b1.child("x").child("y"); nodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); String id = Utils.getIdFromPath("/x/y"); String parentId = Utils.getParentId(id); mongoDS.invalidateCache(NODES,id); //For modifiedTime < replicationLag primary should be preferred assertEquals(ReadPreference.primaryPreferred(), mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); //Going into future to make parent /x old enough clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag); mongoDS.setClock(clock); //For old modified nodes secondaries should be preferred assertEquals(testPref, mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH)); }
public int getLockCount(String callerId, String resourceId) { Date now = new Date(); BasicDBObject q = new BasicDBObject(). append(CALLERID, callerId). append(RESOURCEID, resourceId). append(EXPIRATION, new BasicDBObject("$gt", now)). append(COUNT, new BasicDBObject("$gt", 0)); BasicDBObject field = new BasicDBObject(COUNT, 1); DBObject lock = coll.findOne(q, field,ReadPreference.primary()); if (lock != null) { int cnt = ((Number) lock.get(COUNT)).intValue(); LOGGER.debug("{}/{} lockCount={}", callerId, resourceId, cnt); return cnt; } else { throw new InvalidLockException(resourceId); } }
public void ping(String callerId, String resourceId) { Date now = new Date(); BasicDBObject q = new BasicDBObject(). append(CALLERID, callerId). append(RESOURCEID, resourceId). append(EXPIRATION, new BasicDBObject("$gt", now)). append(COUNT, new BasicDBObject("$gt", 0)); DBObject lock = coll.findOne(q,null,ReadPreference.primary()); if (lock != null) { Date expiration = new Date(now.getTime() + ((Number) lock.get(TTL)).longValue()); int ver = ((Number) lock.get(VERSION)).intValue(); BasicDBObject update = new BasicDBObject(). append("$set", new BasicDBObject(TIMESTAMP, now). append(EXPIRATION, expiration)). append("$inc", new BasicDBObject(VERSION, 1)); q = q.append(VERSION, ver); WriteResult wr = coll.update(q, update, false, false, WriteConcern.ACKNOWLEDGED); if (wr.getN() != 1) { throw new InvalidLockException(resourceId); } LOGGER.debug("{}/{} pinged", callerId, resourceId); } else { throw new InvalidLockException(resourceId); } }
/** * Returns the set of document ids that were not updated with docver * * @param docver The current document version * @param documentIds The document ids to scan * * @return The set of document ids that were not updated with docver */ public static Set<Object> getFailedUpdates(DBCollection collection, ObjectId docver, List<Object> documentIds) { Set<Object> failedIds=new HashSet<>(); if(!documentIds.isEmpty()) { // documents with the given _ids and whose docver contains our docVer are the ones we managed to update // others are failures BasicDBObject query=new BasicDBObject(DOCVER_FLD,new BasicDBObject("$ne",docver)); query.append("_id",new BasicDBObject("$in",documentIds)); try (DBCursor cursor = collection.find(query,new BasicDBObject("_id",1)) .setReadPreference(ReadPreference.primary())) { while(cursor.hasNext()) { failedIds.add(cursor.next().get("_id")); } } } return failedIds; }
@Test public void readPreference() throws IOException { try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("parse-test-datasources.json")) { JsonNode node = JsonUtils.json(is); MongoConfiguration metadataConfig = new MongoConfiguration(); metadataConfig.initializeFromJson(node.get("metadata_readPreference")); MongoConfiguration dataConfig = new MongoConfiguration(); dataConfig.initializeFromJson(node.get("mongodata_readPreference")); assertEquals(ReadPreference.nearest(), metadataConfig.getMongoClientOptions().getReadPreference()); assertEquals(ReadPreference.secondary(), dataConfig.getMongoClientOptions().getReadPreference()); assertEquals(WriteConcern.SAFE, metadataConfig.getWriteConcern()); } }
public void correctlyExtractsMongoClientFromConfiguration() throws Exception { final Example example = factory.build(testFile); final MongoClient client = example.getMongoClient().build(environment); assertThat(client.getAddress().getHost()).isIn("localhost", "127.0.0.1"); assertThat(client.getAddress().getPort()).isEqualTo(ServerAddress.defaultPort()); assertThat(client.getCredentialsList()).isEmpty(); final MongoClientOptions options = client.getMongoClientOptions(); assertThat(options.getDbDecoderFactory()).isEqualTo(DefaultDBDecoder.FACTORY); assertThat(options.getDbEncoderFactory()).isEqualTo(DefaultDBEncoder.FACTORY); assertThat(options.getReadPreference()).isEqualTo(ReadPreference.primary()); assertThat(options.getWriteConcern()).isEqualTo(WriteConcern.ACKNOWLEDGED); assertThat(options.getSocketFactory()).isEqualTo(SocketFactory.getDefault()); }
public static ReadPreferenceChoice getReadPreferenceChoice( ReadPreference readPref ) { if( readPref == null ) return PRIMARY; // default String readPrefName = readPref.getName(); if( readPrefName == ReadPreference.primary().getName() ) return PRIMARY; if( readPrefName == ReadPreference.primaryPreferred().getName() ) return PRIMARY_PREFERRED; if( readPrefName == ReadPreference.secondary().getName() ) return SECONDARY; if( readPrefName == ReadPreference.secondaryPreferred().getName() ) return SECONDARY_PREFERRED; if( readPrefName == ReadPreference.nearest().getName() ) return NEAREST; return PRIMARY; // default }
@Override public long count(String statement, Object parameter) { logger.debug("Execute 'count' mongodb command. Statement '" + statement + "'."); SelectConfig config = (SelectConfig) configuration.getConfig(statement); if (config == null) { throw new MongoDaoException(statement, "Count statement id '" + statement + "' not found."); } String collection = config.getCollection(); NodeEntry query = config.getQuery(); DB db = factory.getDataSource().getDB(); DBCollection coll = db.getCollection(collection); Map<String, Object> q = (Map<String, Object>) query.executorNode(configuration, parameter); DBObject queryDbo = new BasicDBObject(q); logger.debug("Execute 'count' mongodb command. Query '" + queryDbo + "'."); return coll.count(queryDbo, ReadPreference.secondaryPreferred()); }
private FindIterable<Document> getCursor(){ MongoClient client = DBCacheManager.INSTANCE.getCachedMongoPool(mongoDbName, mongoUserName); //MongoClient client = DBCacheManager.INSTANCE.getCachedMongoPool(mongoDbName, "ccwOplRO"); client.setReadPreference(ReadPreference.secondary()); MongoCollection<Document> collection =client.getDatabase(localDb).getCollection(oplogRs); FindIterable<Document> it = collection.find(Filters.and(Filters.eq(NS, ns),Filters.gt(TS, lastReadTime))) .cursorType(CursorType.TailableAwait).noCursorTimeout(true).maxAwaitTime(30, TimeUnit.MINUTES); return it; }
private void configureClientOptions(final Map<String, Object> properties) { final MongoClientOptions.Builder builder = MongoClientOptions.builder(); final String writeConcern = (String) properties.get(ECLIPSELINK_NOSQL_PROPERTY_MONGO_WRITE_CONCERN); final String readPreference = (String) properties.get(ECLIPSELINK_NOSQL_PROPERTY_MONGO_READ_PREFERENCE); if (writeConcern != null) { builder.writeConcern(WriteConcern.valueOf(writeConcern)); } if (readPreference != null) { builder.readPreference(ReadPreference.valueOf(readPreference)); } mongoClientOptions = builder.build(); }
private void configureClientOptions(final Map<String, Object> properties) { final MongoClientOptions.Builder builder = MongoClientOptions.builder(); setOptions(builder, (final String key) -> (String) properties.get(HIBERNATE_OGM_MONGODB_OPTIONS_PREFIX + "." + key)); final String writeConcern = (String) properties.get(HIBERNATE_OGM_MONGODB_WRITE_CONCERN); final String readPreference = (String) properties.get(HIBERNATE_OGM_MONGODB_READ_PREFERENCE); if (writeConcern != null) { builder.writeConcern(WriteConcern.valueOf(writeConcern)); } if (readPreference != null) { builder.readPreference(ReadPreference.valueOf(readPreference)); } mongoClientOptions = builder.build(); }
@Test public void testMongoClientOptions() { // GIVEN final Map<String, Object> properties = new HashMap<>(); when(descriptor.getProperties()).thenReturn(properties); properties.put("eclipselink.nosql.property.mongo.db", "foo"); // it looks like only the two options below are supported by EclipseLink final ReadPreference readPreference = ReadPreference.nearest(); final WriteConcern writeConcern = WriteConcern.JOURNALED; properties.put("eclipselink.nosql.property.mongo.read-preference", readPreference.getName()); properties.put("eclipselink.nosql.property.mongo.write-concern", "JOURNALED"); final ConfigurationFactory factory = new ConfigurationFactoryImpl(); // WHEN final Configuration configuration = factory.createConfiguration(descriptor); // THEN assertThat(configuration, notNullValue()); final MongoClientOptions clientOptions = configuration.getClientOptions(); assertThat(clientOptions, notNullValue()); assertThat(clientOptions.getReadPreference(), equalTo(readPreference)); assertThat(clientOptions.getWriteConcern(), equalTo(writeConcern)); }
@Bean public MongoDbFactory mongoDbFactory() throws Exception { MongoClientURI uri = new MongoClientURI(mongoDbUrl); mongo = new MongoClient(uri); mongo.setReadPreference(ReadPreference.primary()); mongo.setWriteConcern(WriteConcern.ACKNOWLEDGED); return new SimpleMongoDbFactory(mongo, uri.getDatabase()); }
@Override public <TResult> Observable<TResult> runCommand(final Bson command, final ReadPreference readPreference, final Class<TResult> clazz) { return RxObservables.create(Observables.observe(new Block<SingleResultCallback<TResult>>() { @Override public void apply(final SingleResultCallback<TResult> callback) { wrapped.runCommand(command, readPreference, clazz, callback); } }), observableAdapter); }
public FindIterable(final MongoNamespace namespace, final CodecRegistry codecRegistry, final ReadPreference readPreference, final OperationExecutor executor, final Bson filter, final FindOptions findOptions) { this.namespace = notNull("namespace", namespace); this.codecRegistry = notNull("codecRegistry", codecRegistry); this.readPreference = notNull("readPreference", readPreference); this.executor = notNull("executor", executor); this.filter = notNull("filter", filter); this.findOptions = notNull("findOptions", findOptions); }
RefFindOne(DBCollection collection, ReadPreference readPreference, ExtendedUnmarshaller unmarshaller, QueryFactory queryFactory, String query, Object... parameters) { this.unmarshaller = unmarshaller; this.collection = collection; this.readPreference = readPreference; this.queryFactory = queryFactory; this.query = this.queryFactory.createQuery(query, parameters); }
@Override public <TResult> Publisher<TResult> runCommand(final Bson command, final ReadPreference readPreference, final Class<TResult> clazz) { return new ObservableToPublisher<TResult>(observe(new Block<SingleResultCallback<TResult>>() { @Override public void apply(final SingleResultCallback<TResult> callback) { wrapped.runCommand(command, readPreference, clazz, callback); } })); }
@Override public <TResult> Publisher<TResult> runCommand(final ClientSession clientSession, final Bson command, final ReadPreference readPreference, final Class<TResult> clazz) { return new ObservableToPublisher<TResult>(observe(new Block<SingleResultCallback<TResult>>() { @Override public void apply(final SingleResultCallback<TResult> callback) { wrapped.runCommand(clientSession, command, readPreference, clazz, callback); } })); }
@Override public <T> void execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ClientSession session, final SingleResultCallback<T> callback) { readPreferences.add(readPreference); clientSessions.add(session); if (queueExecution) { queuedReadOperations.add(operation); queuedReadCallbacks.add(callback); } else { readOperations.add(operation); callResult(callback); } }
public static MongoClient newClient(String server, String user, String pass, String db) throws UnknownHostException{ MongoClientOptions options = MongoClientOptions .builder() .readPreference( ReadPreference.secondaryPreferred() ) .build(); List<InetSocketAddress> serverList = AddrUtil.getAddresses(server); List<ServerAddress> addrs = new ArrayList<ServerAddress>(); Iterator<InetSocketAddress> it = serverList.iterator(); while ( it.hasNext() ){ InetSocketAddress isa = it.next(); addrs.add( new ServerAddress( isa.getAddress(), isa.getPort() ) ); } if ( user != null ) { MongoCredential cred = MongoCredential.createCredential( user, db, pass.toCharArray() ); List<MongoCredential> creds = new ArrayList<MongoCredential>(); creds.add( cred ); return new MongoClient( addrs, creds, options ); } else { return new MongoClient( addrs, options ); } }
@Bean public MongoDbFactory mongoDbFactory() throws Exception { MongoCredential credential = MongoCredential.createMongoCRCredential("user1", "test", "password1".toCharArray()); MongoClient mongoClient; if (propertyResolver.getProperty("mode").equalsIgnoreCase("cluster")){ List<ServerAddress> servers = mongo.getServerAddressList(); mongoClient = new MongoClient(servers, Arrays.asList(credential)); mongoClient.setReadPreference(ReadPreference.nearest()); mongoClient.getReplicaSetStatus(); return new SimpleMongoDbFactory(mongoClient, propertyResolver.getProperty("databaseName")); } else { return new SimpleMongoDbFactory(mongo, propertyResolver.getProperty("databaseName")); } }
@Provides @Singleton Mongo providesMongo() { Mongo mongo = null; try { final MongoClientOptions mongoClientOptions = MongoClientOptions.builder().autoConnectRetry(true).readPreference(ReadPreference.primaryPreferred()).build(); mongo = new MongoClient(SetupConfig.get().getMongoServerAddresses(), mongoClientOptions); } catch (final UnknownHostException e) { addError(e); } return mongo; }
public MongoFileStoreConfig configure() { MongoFileStoreConfig config = MongoFileStoreConfig.builder().bucket("spring") // .asyncDeletes(true) // background deleting .chunkSize(ChunkSize.medium_256K) // good default .enableCompression(true)// .readPreference(ReadPreference.secondaryPreferred())// .writeConcern(WriteConcern.ACKNOWLEDGED)// .build(); return config; }
@Test public void test() throws IOException { MongoFileStoreConfig config = MongoFileStoreConfig.builder().bucket("xml")// .writeConcern(WriteConcern.SAFE).readPreference(ReadPreference.primary()).build(); MongoFileStore store = new MongoFileStore(database, config); File file = new File("/Users/dbusch/Documents/Gasplant", "GasStatementData_J24_20131101_20131001_4_1.xml"); MongoFile mongoFile = store.upload(file, "application/xml"); assertNotNull(mongoFile); mongoFile.validate(); }
@Override public CloseableIterable<NodeDocument> getPossiblyDeletedDocs(final long lastModifiedTime) { //_deletedOnce == true && _modified < lastModifiedTime DBObject query = start(NodeDocument.DELETED_ONCE).is(Boolean.TRUE) .put(NodeDocument.MODIFIED_IN_SECS).lessThan(NodeDocument.getModifiedInSecs(lastModifiedTime)) .get(); DBCursor cursor = getNodeCollection().find(query).setReadPreference(ReadPreference.secondaryPreferred()); return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() { @Override public NodeDocument apply(DBObject input) { return store.convertFromDBObject(Collection.NODES, input); } }), cursor); }
@Override public InvalidationResult invalidateCache() { final InvalidationResult result = new InvalidationResult(); int size = 0; List<String> cachedKeys = new ArrayList<String>(); for (Map.Entry<CacheValue, ? extends CachedNodeDocument> e : documentStore.getCacheEntries()) { size++; cachedKeys.add(e.getKey().toString()); } result.cacheSize = size; QueryBuilder query = QueryBuilder.start(Document.ID) .in(cachedKeys); // Fetch only the lastRev map and id final BasicDBObject keys = new BasicDBObject(Document.ID, 1); keys.put(Document.MOD_COUNT, 1); // Fetch lastRev for each such node DBCursor cursor = nodes.find(query.get(), keys); cursor.setReadPreference(ReadPreference.primary()); result.queryCount++; for (DBObject obj : cursor) { result.cacheEntriesProcessedCount++; String id = (String) obj.get(Document.ID); Number modCount = (Number) obj.get(Document.MOD_COUNT); CachedNodeDocument cachedDoc = documentStore.getCachedNodeDoc(id); if (cachedDoc != null && !Objects.equal(cachedDoc.getModCount(), modCount)) { documentStore.invalidateCache(Collection.NODES, id); result.invalidationCount++; } else { result.upToDateCount++; } } return result; }
@CheckForNull private <T extends Document> T findUncached(Collection<T> collection, String key, DocumentReadPreference docReadPref) { log("findUncached", key, docReadPref); DBCollection dbCollection = getDBCollection(collection); long start = start(); try { ReadPreference readPreference = getMongoReadPreference(collection, Utils.getParentId(key), docReadPref); if(readPreference.isSlaveOk()){ LOG.trace("Routing call to secondary for fetching [{}]", key); } DBObject obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, readPreference); if (obj == null && readPreference.isSlaveOk()) { //In case secondary read preference is used and node is not found //then check with primary again as it might happen that node document has not been //replicated. This is required for case like SplitDocument where the SplitDoc is fetched with //maxCacheAge == Integer.MAX_VALUE which results in readPreference of secondary. //In such a case we know that document with such an id must exist //but possibly dut to replication lag it has not reached to secondary. So in that case read again //from primary obj = dbCollection.findOne(getByKeyQuery(key).get(), null, null, ReadPreference.primary()); } if(obj == null){ return null; } T doc = convertFromDBObject(collection, obj); if (doc != null) { doc.seal(); } return doc; } finally { end("findUncached", start); } }
<T extends Document> ReadPreference getMongoReadPreference(Collection<T> collection, String parentId, DocumentReadPreference preference) { switch(preference){ case PRIMARY: return ReadPreference.primary(); case PREFER_PRIMARY : return ReadPreference.primaryPreferred(); case PREFER_SECONDARY : return getConfiguredReadPreference(collection); case PREFER_SECONDARY_IF_OLD_ENOUGH: if(collection != Collection.NODES){ return ReadPreference.primary(); } //Default to primary preferred such that in case primary is being elected //we can still read from secondary //TODO REVIEW Would that be safe ReadPreference readPreference = ReadPreference.primaryPreferred(); if (parentId != null) { long replicationSafeLimit = getTime() - maxReplicationLagMillis; NodeDocument cachedDoc = (NodeDocument) getIfCached(collection, parentId); if (cachedDoc != null && !cachedDoc.hasBeenModifiedSince(replicationSafeLimit)) { //If parent has been modified loooong time back then there children //would also have not be modified. In that case we can read from secondary readPreference = getConfiguredReadPreference(collection); } } return readPreference; default: throw new IllegalArgumentException("Unsupported usage " + preference); } }
@Override public void setReadWriteMode(String readWriteMode) { if (readWriteMode == null || readWriteMode.equals(lastReadWriteMode)) { return; } lastReadWriteMode = readWriteMode; try { String rwModeUri = readWriteMode; if(!readWriteMode.startsWith("mongodb://")){ rwModeUri = String.format("mongodb://localhost/?%s", readWriteMode); } MongoClientURI uri = new MongoClientURI(rwModeUri); ReadPreference readPref = uri.getOptions().getReadPreference(); if (!readPref.equals(nodes.getReadPreference())) { nodes.setReadPreference(readPref); LOG.info("Using ReadPreference {} ",readPref); } WriteConcern writeConcern = uri.getOptions().getWriteConcern(); if (!writeConcern.equals(nodes.getWriteConcern())) { nodes.setWriteConcern(writeConcern); LOG.info("Using WriteConcern " + writeConcern); } } catch (Exception e) { LOG.error("Error setting readWriteMode " + readWriteMode, e); } }