@SuppressWarnings("rawtypes") static Route.Mapper mapper() { return Route.Mapper.create("mongo-rx", v -> { if (v instanceof FindObservable) { return ((FindObservable) v).toObservable().toList(); } else if (v instanceof ListCollectionsObservable) { return ((ListCollectionsObservable) v).toObservable().toList(); } else if (v instanceof ListDatabasesObservable) { return ((ListDatabasesObservable) v).toObservable().toList(); } else if (v instanceof AggregateObservable) { return ((AggregateObservable) v).toObservable().toList(); } else if (v instanceof DistinctObservable) { return ((DistinctObservable) v).toObservable().toList(); } else if (v instanceof MapReduceObservable) { return ((MapReduceObservable) v).toObservable().toList(); } else if (v instanceof MongoObservable) { return ((MongoObservable) v).toObservable(); } return v; }); }
@Override public User getUser(String userName, String password) { FindObservable<Document> findObservable = getCollection().find(filter(userName, passwordUtil.toHash(password))).limit(1); return findObservable.first().map(toUser()).toBlocking().singleOrDefault(null); }
@Override public Observable<Document> find( Dataset dataset, String select, String where, String orderBy, int maxResults) { MongoStorageCoordinates storageCoordinates = new MongoStorageCoordinates(dataset.getSource()); StopWatch stopWatch = StopWatch.startForSplits(); Bson projections = selectClauseParser.get(Bson.class, select); long projectionElapsedTime = stopWatch.split(); Bson filter = whereClauseParser.get(Bson.class, where); long predicateElapsedTime = stopWatch.split(); Bson order = orderByClauseParser.get(Bson.class, orderBy); long orderByElapsedTime = stopWatch.split(); FindObservable<Document> findObservable = mongoProvider .provide() .getDatabase(storageCoordinates.getDatabaseName()) .getCollection(storageCoordinates.getCollectionName()) .find(filter) .projection(projections) .sort(order); if (maxResults > 0) { findObservable.limit(maxResults); } long findElapsedTime = stopWatch.split(); long totalElapsedTime = stopWatch.stop(); logger.info( "Total elapsed time for find call={}ms (projection={}ms, predicate={}ms, orderBy={}ms, find={}ms)", totalElapsedTime, projectionElapsedTime, predicateElapsedTime, orderByElapsedTime, findElapsedTime); return findObservable.toObservable(); }
private void assertDatasetIsWritten(CannedDataset cannedDataset) { FindObservable<Document> datasets = getCollection(configuration.getDatabaseName(), configuration.getDatasetStorageName()) .find(Filters.eq("name", cannedDataset.getDataset().getName())); assertThat( documentTransformer.transform(Dataset.class, datasets.first().toBlocking().single()), is(cannedDataset.getDataset())); }
@Override public Observable<Dataset> getAll(String userName) { FindObservable<Document> findObservable = getCollection().find(Filters.eq("owner", userName)); return findObservable.toObservable().map(toDataset()); }
@Override public Dataset get(String id) { FindObservable<Document> findObservable = getCollection().find(Filters.eq("id", id)).limit(1); return findObservable.first().map(toDataset()).toBlocking().singleOrDefault(null); }
@Override public FindObservable<TDocument> find() { return find(new BsonDocument(), getDocumentClass()); }
@Override public <TResult> FindObservable<TResult> find(final Class<TResult> clazz) { return find(new BsonDocument(), clazz); }
@Override public FindObservable<TDocument> find(final Bson filter) { return find(filter, getDocumentClass()); }
@Override public <TResult> FindObservable<TResult> find(final Bson filter, final Class<TResult> clazz) { return new FindObservableImpl<TResult>(wrapped.find(filter, clazz), observableAdapter); }
@SuppressWarnings({"unchecked", "rawtypes"}) @Test public void mongoRxMapper() throws Exception { String db = "mongodb://localhost"; new MockUnit(Env.class, Binder.class, MongoClient.class, FindObservable.class, ListCollectionsObservable.class, ListDatabasesObservable.class, AggregateObservable.class, DistinctObservable.class, MapReduceObservable.class, MongoObservable.class) .expect(instances(1)) .expect(cluster(db)) .expect(pool(db)) .expect(socket) .expect(socket(db)) .expect(server) .expect(ssl(db)) .expect(settings) .expect(mongo) .expect(bind(Key.get(MongoClient.class, Names.named("db")))) .expect(env) .expect(unit -> { Observable observable = unit.powerMock(Observable.class); expect(observable.toList()).andReturn(unit.powerMock(Observable.class)).times(6); Observable mobservable = unit.powerMock(Observable.class); FindObservable o1 = unit.get(FindObservable.class); expect(o1.toObservable()).andReturn(observable); ListCollectionsObservable o2 = unit.get(ListCollectionsObservable.class); expect(o2.toObservable()).andReturn(observable); ListDatabasesObservable o3 = unit.get(ListDatabasesObservable.class); expect(o3.toObservable()).andReturn(observable); AggregateObservable o4 = unit.get(AggregateObservable.class); expect(o4.toObservable()).andReturn(observable); DistinctObservable o5 = unit.get(DistinctObservable.class); expect(o5.toObservable()).andReturn(observable); MapReduceObservable o6 = unit.get(MapReduceObservable.class); expect(o6.toObservable()).andReturn(observable); MongoObservable o7 = unit.get(MongoObservable.class); expect(o7.toObservable()).andReturn(mobservable); }) .run(unit -> { new MongoRx() .configure(unit.get(Env.class), conf(null, "db", db), unit.get(Binder.class)); }, unit -> { Route.Mapper mongorx = unit.captured(Route.Mapper.class).iterator().next(); assertTrue(mongorx.map(unit.get(FindObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(ListCollectionsObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(ListDatabasesObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(AggregateObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(DistinctObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(MapReduceObservable.class)) instanceof Observable); assertTrue( mongorx.map(unit.get(MongoObservable.class)) instanceof Observable); assertEquals("x", mongorx.map("x")); }); }