/** * Check that we can talk to the configured MongoDB instance. * * @return a {@link Result} with details of whether this check was successful or not * @throws Exception not thrown, any failure to perform the check results in a failed {@link * Result} */ @Override protected Result check() throws Exception { MongoClient mongoClient = mongoProvider.provide(); List<ServerAddress> serverAddresses = mongoClient.getSettings().getClusterSettings().getHosts(); String address = serverAddresses.stream().map(ServerAddress::toString).collect(Collectors.joining(",")); try { // any read will suffice to prove connectivity mongoClient.getDatabase("xyz"); return Result.healthy("Connected to MongoDB at " + address); } catch (Exception ex) { return Result.unhealthy("Cannot connect to MongoDB at " + address); } }
@Test public void configure() throws Exception { String db = "mongodb://localhost"; new MockUnit(Env.class, Binder.class, MongoClient.class) .expect(instances(0)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class))) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void configure1() throws Exception { String db = "mongodb://localhost"; new MockUnit(Env.class, Binder.class, MongoClient.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void onStop() throws Exception { String db = "mongodb://localhost"; new MockUnit(Env.class, Binder.class, MongoClient.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(env) .expect(unit -> { MongoClient client = unit.get(MongoClient.class); client.close(); }) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }, unit -> { unit.captured(Throwing.Runnable.class).iterator().next().run(); }); }
@Test public void withDatabase() throws Exception { String db = "mongodb://localhost/pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void withCollection() throws Exception { String db = "mongodb://localhost/pets.Pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class, MongoCollection.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(collection) .expect(bind(Key.get(MongoCollection.class, Names.named("Pets")))) .expect(env) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }); }
@Test public void withDirectDb() throws Exception { String db = "mongodb://localhost/pets.Pets"; new MockUnit(Env.class, Binder.class, MongoClient.class, MongoDatabase.class, MongoCollection.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named(db)))) .expect(database) .expect(bind(Key.get(MongoDatabase.class, Names.named("pets")))) .expect(collection) .expect(bind(Key.get(MongoCollection.class, Names.named("Pets")))) .expect(env) .run(unit -> { new MongoRx(db) .configure(unit.get(Env.class), conf(null), unit.get(Binder.class)); }); }
@Override public MongoClient provide() { if (mongoClient == null) { mongoClient = createMongoClient(); } return mongoClient; }
@Test public void populate() { int simulationCount = 1; MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017"); MongoDatabase database = mongoClient.getDatabase(databaseName); MongoCollection<Document> collection = database.getCollection(sample); for (int i = 0; i < simulationCount; i++) { Document d = new Document("name", "Person " + i).append("updatedAt", new Date()); collection.insertOne(d).toList().toBlocking().single(); } }
@Override public Observable<Document> startImport() { MongoClient client = MongoClients.create(connectionString); MongoDatabase db = client.getDatabase(dbName); return db.getCollection(collectionName).find().toObservable() .map(new Func1<org.bson.Document, Document>() { public Document call(org.bson.Document mongoDoc) { mongoDoc.put(typeField, type); RawJsonDocument d = RawJsonDocument.create(mongoDoc .getObjectId("_id").toHexString(), mongoDoc .toJson()); return d; }; }); }
/** * Lazily instantiate the {@link MongoClient} instance. * * @return */ private MongoClient createMongoClient() { String host = applicationConfiguration.getMongoHost(); int port = applicationConfiguration.getMongoPort(); ConnectionString connectionString = new ConnectionString("mongodb://" + host + ":" + port); logger.info("Creating Mongo client for: {}:{}", host, port); MongoClientSettings mongoClientSettings = MongoClientSettings.builder() .applicationName("dragoman") .serverSettings( ServerSettings.builder() .applyConnectionString(connectionString) .addServerMonitorListener(new LoggingServerMonitorListener()) .addServerListener(new LoggingServerListener()) .build()) .clusterSettings( ClusterSettings.builder() .applyConnectionString(connectionString) .serverSelectionTimeout( applicationConfiguration.getMongoServerSelectionTimeout(), MILLISECONDS) .addClusterListener(new LoggingClusterListener()) .build()) .connectionPoolSettings( ConnectionPoolSettings.builder() .applyConnectionString(connectionString) .maxWaitTime( applicationConfiguration.getConnectionPoolMaxWaitTime(), MILLISECONDS) .minSize(applicationConfiguration.getConnectionPoolMinSize()) .maxSize(applicationConfiguration.getConnectionPoolMaxSize()) .addConnectionPoolListener(new LoggingConnectionPoolListener()) .build()) .socketSettings( SocketSettings.builder() .applyConnectionString(connectionString) .connectTimeout( applicationConfiguration.getMongoSocketConnectionTimeout(), MILLISECONDS) .readTimeout(applicationConfiguration.getMongoReadTimeout(), MILLISECONDS) .build()) .build(); return MongoClients.create(mongoClientSettings); }
protected MongoClient getMongoClient() { if (mongoClient == null) { mongoClient = MongoClients.create("mongodb://localhost:" + port); } return mongoClient; }
@SuppressWarnings({"rawtypes", "unchecked"}) @Override public void configure(final Env env, final Config conf, final Binder binder) { /** connection string */ ConnectionString cstr = Try.apply(() -> new ConnectionString(db)) .orElseGet(() -> new ConnectionString(conf.getString(db))); log.debug("Starting {}", cstr); boolean first = instances.getAndIncrement() == 0; Throwing.Function3<Class, String, Object, Void> bind = (type, name, value) -> { binder.bind(Key.get(type, Names.named(name))).toInstance(value); if (first) { binder.bind(Key.get(type)).toInstance(value); } return null; }; /** settings */ MongoClientSettings.Builder settings = settings(cstr, dbconf(db, conf)); if (configurer != null) { configurer.accept(settings, conf); } MongoClient client = MongoClients.create(settings.build()); bind.apply(MongoClient.class, db, client); /** bind database */ Optional.ofNullable(cstr.getDatabase()).ifPresent(dbname -> { // observable adapter MongoDatabase predb = adapter .map(a -> client.getDatabase(dbname).withObservableAdapter(a)) .orElseGet(() -> client.getDatabase(dbname)); // codec registry MongoDatabase database = codecRegistry .map(predb::withCodecRegistry) .orElse(predb); bind.apply(MongoDatabase.class, dbname, database); /** bind collection */ Optional.ofNullable(cstr.getCollection()).ifPresent(cname -> { MongoCollection<Document> collection = database.getCollection(cname); bind.apply(MongoCollection.class, cname, collection); }); }); /** mapper */ env.router() .map(mapper()); log.info("Started {}", cstr); env.onStop(() -> { log.debug("Stopping {}", cstr); client.close(); log.info("Stopped {}", cstr); }); }
@SuppressWarnings({"unchecked", "rawtypes"}) @Test public void mongoRxMapper() throws Exception { String db = "mongodb://localhost"; new MockUnit(Env.class, Binder.class, MongoClient.class, FindObservable.class, ListCollectionsObservable.class, ListDatabasesObservable.class, AggregateObservable.class, DistinctObservable.class, MapReduceObservable.class, MongoObservable.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(env) .expect(unit -> { Observable observable = unit.powerMock(Observable.class); expect(observable.toList()).andReturn(unit.powerMock(Observable.class)).times(6); Observable mobservable = unit.powerMock(Observable.class); FindObservable o1 = unit.get(FindObservable.class); expect(o1.toObservable()).andReturn(observable); ListCollectionsObservable o2 = unit.get(ListCollectionsObservable.class); expect(o2.toObservable()).andReturn(observable); ListDatabasesObservable o3 = unit.get(ListDatabasesObservable.class); expect(o3.toObservable()).andReturn(observable); AggregateObservable o4 = unit.get(AggregateObservable.class); expect(o4.toObservable()).andReturn(observable); DistinctObservable o5 = unit.get(DistinctObservable.class); expect(o5.toObservable()).andReturn(observable); MapReduceObservable o6 = unit.get(MapReduceObservable.class); expect(o6.toObservable()).andReturn(observable); MongoObservable o7 = unit.get(MongoObservable.class); expect(o7.toObservable()).andReturn(mobservable); }) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }, unit -> { Route.Mapper mongorx = unit.captured(Route.Mapper.class).iterator().next(); assertTrue(mongorx.map(unit.get(FindObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(ListCollectionsObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(ListDatabasesObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(AggregateObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(DistinctObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(MapReduceObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(MongoObservable.class)) instanceof Observable); assertEquals("x", mongorx.map("x")); }); }
/** * Provide (with create-on-first-usage semantics) this application's {@link MongoClient} instance. * * @return */ MongoClient provide();
/** * The internal MongoClientImpl constructor. * * <p>This should not be considered a part of the public API.</p> * @param wrapped the underlying MongoClient * @param observableAdapter the ObservableAdapter */ public MongoClientImpl(final com.mongodb.async.client.MongoClient wrapped, final ObservableAdapter observableAdapter) { this.wrapped = notNull("wrapped", wrapped); this.observableAdapter = notNull("observableAdapter", observableAdapter); }