@Test public void withDatabase() throws Exception { String db = "mongodb://localhost/pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void withCollection() throws Exception { String db = "mongodb://localhost/pets.Pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class, MongoCollection.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(collection) .expect(bind(Key.get(MongoCollection.class, Names.named("Pets")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void withDirectDb() throws Exception { String db = "mongodb://localhost/pets.Pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class, MongoCollection.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named(db)))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(collection) .expect(bind(Key.get(MongoCollection.class, Names.named("Pets")))) .expect(env) .run(unit -> { new MongoRx(db) .configure(unit.get(Env.class), conf(null), unit.get(Binder.class)); }); }
@Test public void populate() { int simulationCount = 1; MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); MongoDatabase database = mongoClient.getDatabase(databaseName); MongoCollection<Document> collection = database.getCollection(sample); for (int i = 0; i < simulationCount; i++) { Document d = new Document("name", "Person " + i).append("updatedAt", new Date()); collection.insertOne(d).toList().toBlocking().single(); } }
protected AsyncMongoQuery(MongoDatabase database) { this.database = database; @SuppressWarnings("unchecked") Q query = (Q) this; this.queryMixin = new QueryMixin<Q>(query, new DefaultQueryMetadata(), false); this.serializer = new MongodbSerializer(); }
/** * Create a new GridFS bucket with the default {@code 'fs'} bucket name * * <p>Requires the concrete {@link MongoDatabaseImpl} implementation of the MongoDatabase interface.</p> * * @param database the database instance to use with GridFS. * @return the GridFSBucket */ public static GridFSBucket create(final MongoDatabase database) { notNull("database", database); if (database instanceof MongoDatabaseImpl) { return new GridFSBucketImpl(com.mongodb.async.client.gridfs.GridFSBuckets.create(((MongoDatabaseImpl) database).getWrapped()), database.getObservableAdapter()); } else { throw new IllegalArgumentException("GridFS requires the concrete MongoDatabaseImpl implementation."); } }
/** * Create a new GridFS bucket with a custom bucket name * * <p>Requires the concrete {@link MongoDatabaseImpl} implementation of the MongoDatabase interface.</p> * * @param database the database instance to use with GridFS * @param bucketName the custom bucket name to use * @return the GridFSBucket */ public static GridFSBucket create(final MongoDatabase database, final String bucketName) { notNull("database", database); notNull("bucketName", bucketName); if (database instanceof MongoDatabaseImpl) { return new GridFSBucketImpl(com.mongodb.async.client.gridfs.GridFSBuckets.create(((MongoDatabaseImpl) database).getWrapped(), bucketName), database.getObservableAdapter()); } else { throw new IllegalArgumentException("GridFS requires the concrete MongoDatabaseImpl implementation."); } }
@Override public Observable<Document> startImport() { MongoClient client = MongoClients.create(connectionString); MongoDatabase db = client.getDatabase(dbName); return db.getCollection(collectionName).find().toObservable() .map(new Func1<org.bson.Document, Document>() { public Document call(org.bson.Document mongoDoc) { mongoDoc.put(typeField, type); RawJsonDocument d = RawJsonDocument.create(mongoDoc .getObjectId("_id").toHexString(), mongoDoc .toJson()); return d; }; }); }
protected MongoQuery(MongoDatabase database) { super(database); }
public static MongoQuery forDatabase(MongoDatabase database) { return new MongoQuery(database); }
MongoDatabaseImpl(final com.mongodb.async.client.MongoDatabase wrapped, final ObservableAdapter observableAdapter) { this.wrapped = notNull("wrapped", wrapped); this.observableAdapter = notNull("observableAdapter", observableAdapter); }
@Override public MongoDatabase withObservableAdapter(final ObservableAdapter observableAdapter) { return new MongoDatabaseImpl(wrapped, observableAdapter); }
@Override public MongoDatabase withCodecRegistry(final CodecRegistry codecRegistry) { return new MongoDatabaseImpl(wrapped.withCodecRegistry(codecRegistry), observableAdapter); }
@Override public MongoDatabase withReadPreference(final ReadPreference readPreference) { return new MongoDatabaseImpl(wrapped.withReadPreference(readPreference), observableAdapter); }
@Override public MongoDatabase withWriteConcern(final WriteConcern writeConcern) { return new MongoDatabaseImpl(wrapped.withWriteConcern(writeConcern), observableAdapter); }
@Override public MongoDatabase withReadConcern(final ReadConcern readConcern) { return new MongoDatabaseImpl(wrapped.withReadConcern(readConcern), observableAdapter); }
@Override public MongoDatabase getDatabase(final String name) { return new MongoDatabaseImpl(wrapped.getDatabase(name), observableAdapter); }
@SuppressWarnings({"rawtypes", "unchecked"}) @Override public void configure(final Env env, final Config conf, final Binder binder) { /** connection string */ ConnectionString cstr = Try.apply(() -> new ConnectionString(db)) .orElseGet(() -> new ConnectionString(conf.getString(db))); log.debug("Starting {}", cstr); boolean first = instances.getAndIncrement() == 0; Throwing.Function3<Class, String, Object, Void> bind = (type, name, value) -> { binder.bind(Key.get(type, Names.named(name))).toInstance(value); if (first) { binder.bind(Key.get(type)).toInstance(value); } return null; }; /** settings */ MongoClientSettings.Builder settings = settings(cstr, dbconf(db, conf)); if (configurer != null) { configurer.accept(settings, conf); } MongoClient client = MongoClients.create(settings.build()); bind.apply(MongoClient.class, db, client); /** bind database */ Optional.ofNullable(cstr.getDatabase()).ifPresent(dbname -> { // observable adapter MongoDatabase predb = adapter .map(a -> client.getDatabase(dbname).withObservableAdapter(a)) .orElseGet(() -> client.getDatabase(dbname)); // codec registry MongoDatabase database = codecRegistry .map(predb::withCodecRegistry) .orElse(predb); bind.apply(MongoDatabase.class, dbname, database); /** bind collection */ Optional.ofNullable(cstr.getCollection()).ifPresent(cname -> { MongoCollection<Document> collection = database.getCollection(cname); bind.apply(MongoCollection.class, cname, collection); }); }); /** mapper */ env.router() .map(mapper()); log.info("Started {}", cstr); env.onStop(() -> { log.debug("Stopping {}", cstr); client.close(); log.info("Stopped {}", cstr); }); }
/** * Gets the wrapped MongoDatabase * * <p>This should not be considered a part of the public API.</p> * @return wrapped MongoDatabase */ public com.mongodb.async.client.MongoDatabase getWrapped() { return wrapped; }