protected MongoStorageCoordinates seed(Document... documents) { String databaseName = createDatabaseName(); String collectionName = createCollectionName(); MongoCollection<Document> mongoCollection = getMongoClient().getDatabase(databaseName).getCollection(collectionName); mongoCollection .insertMany(Lists.newArrayList(documents)) .timeout(10, SECONDS) .toBlocking() .single(); for (Document document : documents) { document.remove("_id"); } assertThat( "Failed to seed the given documents!", mongoCollection.count().toBlocking().single(), is((long) documents.length)); return new MongoStorageCoordinates(databaseName, collectionName); }
@Test public void withCollection() throws Exception { String db = "mongodb://localhost/pets.Pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class, MongoCollection.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(collection) .expect(bind(Key.get(MongoCollection.class, Names.named("Pets")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void withDirectDb() throws Exception { String db = "mongodb://localhost/pets.Pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class, MongoCollection.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named(db)))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(collection) .expect(bind(Key.get(MongoCollection.class, Names.named("Pets")))) .expect(env) .run(unit -> { new MongoRx(db) .configure(unit.get(Env.class), conf(null), unit.get(Binder.class)); }); }
@Test public void populate() { int simulationCount = 1; MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); MongoDatabase database = mongoClient.getDatabase(databaseName); MongoCollection<Document> collection = database.getCollection(sample); for (int i = 0; i < simulationCount; i++) { Document d = new Document("name", "Person " + i).append("updatedAt", new Date()); collection.insertOne(d).toList().toBlocking().single(); } }
private void assertDatasetContentsAreWritten(CannedDataset cannedDataset) { MongoCollection<Document> collection = getCollection(new MongoStorageCoordinates(cannedDataset.getDataset().getSource())); if (cannedDataset.getDocuments() != null) { assertThat( collection.count().toBlocking().single(), is((long) cannedDataset.getDocuments().size())); List<Document> actualDocuments = collection .find() .toObservable() .map( document -> { // remove the persisted _id to allow comparison with the unpersisted 'expected' // documents document.remove("_id"); return document; }) .toList() .toBlocking() .single(); for (Map<String, Object> expected : cannedDataset.getDocuments()) { assertThat(actualDocuments, hasItem(new Document(expected))); } } }
private MongoCollection<Document> getCollection() { return mongoProvider .provide() .getDatabase(storageCoordinates.getDatabaseName()) .getCollection(storageCoordinates.getCollectionName()); }
@Override public int write() { AtomicInteger count = new AtomicInteger(0); if (isNotBlank(configuration.getCannedDatasetsDirectory())) { List<CannedDataset> cannedDatasets = loader.load(configuration.getCannedDatasetsDirectory()); cannedDatasets .parallelStream() .forEach( cannedDataset -> { Dataset dataset = cannedDataset.getDataset(); logger.info("Writing canned dataset: {}", dataset.getName()); datasetDao.write(dataset); MongoStorageCoordinates storageCoordinates = new MongoStorageCoordinates(dataset.getSource()); MongoCollection<Document> collection = getCollection( storageCoordinates.getDatabaseName(), storageCoordinates.getCollectionName()); if (cannedDataset.getDocuments() != null) { List<Success> single = collection .insertMany(toDocuments(cannedDataset.getDocuments())) .toList() .toBlocking() .single(); logger.info( "Wrote {} documents for canned dataset: {}", single.size(), dataset.getName()); } logger.info("Wrote canned dataset: {}", dataset.getName()); count.incrementAndGet(); }); } return count.get(); }
private MongoCollection<Document> getCollection(String databaseName, String collectionName) { return mongoProvider.provide().getDatabase(databaseName).getCollection(collectionName); }
private MongoCollection<Document> getCollection(MongoStorageCoordinates storageCoordinates) { return mongoProvider .provide() .getDatabase(storageCoordinates.getDatabaseName()) .getCollection(storageCoordinates.getCollectionName()); }
MongoCollectionImpl(final com.mongodb.async.client.MongoCollection<TDocument> wrapped, final ObservableAdapter observableAdapter) { this.wrapped = notNull("wrapped", wrapped); this.observableAdapter = notNull("observableAdapter", observableAdapter); }
@Override public <NewTDocument> MongoCollection<NewTDocument> withDocumentClass(final Class<NewTDocument> clazz) { return new MongoCollectionImpl<NewTDocument>(wrapped.withDocumentClass(clazz), observableAdapter); }
@Override public MongoCollection<TDocument> withCodecRegistry(final CodecRegistry codecRegistry) { return new MongoCollectionImpl<TDocument>(wrapped.withCodecRegistry(codecRegistry), observableAdapter); }
@Override public MongoCollection<TDocument> withReadPreference(final ReadPreference readPreference) { return new MongoCollectionImpl<TDocument>(wrapped.withReadPreference(readPreference), observableAdapter); }
@Override public MongoCollection<TDocument> withWriteConcern(final WriteConcern writeConcern) { return new MongoCollectionImpl<TDocument>(wrapped.withWriteConcern(writeConcern), observableAdapter); }
@Override public MongoCollection<TDocument> withReadConcern(final ReadConcern readConcern) { return new MongoCollectionImpl<TDocument>(wrapped.withReadConcern(readConcern), observableAdapter); }
@Override public MongoCollection<TDocument> withObservableAdapter(final ObservableAdapter observableAdapter) { return new MongoCollectionImpl<TDocument>(wrapped, observableAdapter); }
@Override public MongoCollection<Document> getCollection(final String collectionName) { return getCollection(collectionName, Document.class); }
@Override public <TDocument> MongoCollection<TDocument> getCollection(final String collectionName, final Class<TDocument> clazz) { return new MongoCollectionImpl<TDocument>(wrapped.getCollection(collectionName, clazz), observableAdapter); }
@SuppressWarnings({"rawtypes", "unchecked"}) @Override public void configure(final Env env, final Config conf, final Binder binder) { /** connection string */ ConnectionString cstr = Try.apply(() -> new ConnectionString(db)) .orElseGet(() -> new ConnectionString(conf.getString(db))); log.debug("Starting {}", cstr); boolean first = instances.getAndIncrement() == 0; Throwing.Function3<Class, String, Object, Void> bind = (type, name, value) -> { binder.bind(Key.get(type, Names.named(name))).toInstance(value); if (first) { binder.bind(Key.get(type)).toInstance(value); } return null; }; /** settings */ MongoClientSettings.Builder settings = settings(cstr, dbconf(db, conf)); if (configurer != null) { configurer.accept(settings, conf); } MongoClient client = MongoClients.create(settings.build()); bind.apply(MongoClient.class, db, client); /** bind database */ Optional.ofNullable(cstr.getDatabase()).ifPresent(dbname -> { // observable adapter MongoDatabase predb = adapter .map(a -> client.getDatabase(dbname).withObservableAdapter(a)) .orElseGet(() -> client.getDatabase(dbname)); // codec registry MongoDatabase database = codecRegistry .map(predb::withCodecRegistry) .orElse(predb); bind.apply(MongoDatabase.class, dbname, database); /** bind collection */ Optional.ofNullable(cstr.getCollection()).ifPresent(cname -> { MongoCollection<Document> collection = database.getCollection(cname); bind.apply(MongoCollection.class, cname, collection); }); }); /** mapper */ env.router() .map(mapper()); log.info("Started {}", cstr); env.onStop(() -> { log.debug("Stopping {}", cstr); client.close(); log.info("Stopped {}", cstr); }); }