/** * runs a map-reduce-job on the collection. The functions are read from the classpath in the folder mongodb. The systems reads them from * files called <name>.map.js, <name>.reduce.js and optionally <name>.finalize.js. After this the result is converted * using the given {@link MapReduceResultHandler} * * @param <R> the type of the result class * @param name the name of the map-reduce functions * @param query the query to filter the elements used for the map-reduce * @param sort sort query to sort elements before running map-reduce * @param scope the global scope for the JavaScript run * @param conv the converter to convert the result * @return an {@link Iterable} with the result entries * @throws RuntimeException if resources cannot be read */ protected final <R> Iterable<R> mapReduce(String name, DBObject query, DBObject sort, Map<String, Object> scope, final MapReduceResultHandler<R> conv) { String map = this.getMRFunction(name, "map"); String reduce = this.getMRFunction(name, "reduce"); MapReduceCommand mrc = new MapReduceCommand(this.collection.getDBCollection(), map, reduce, null, OutputType.INLINE, query); String finalizeFunction = this.getMRFunction(name, "finalize"); if (finalizeFunction != null) { mrc.setFinalize(finalizeFunction); } if (sort != null) { mrc.setSort(sort); } if (scope != null) { mrc.setScope(scope); } MapReduceOutput mr = this.collection.getDBCollection().mapReduce(mrc); return new ConverterIterable<R>(mr.results().iterator(), conv); }
@Override public <T> MapreduceResults<T> mapReduce(final MapReduceOptions<T> options) { DBCollection collection = options.getQuery().getCollection(); final EntityCache cache = createCache(); MapreduceResults<T> results = new MapreduceResults<T>(collection.mapReduce(options.toCommand(getMapper()))); results.setOutputType(options.getOutputType()); if (OutputType.INLINE.equals(options.getOutputType())) { results.setInlineRequiredOptions(this, options.getResultType(), getMapper(), cache); } else { results.setQuery(newQuery(options.getResultType(), getDB().getCollection(results.getOutputCollectionName()))); } return results; }
public ContributorsCounter(String host, String dbname, String collectionName, String output) throws Exception { MongoClient mongo = null; try { mongo = new MongoClient(host, 27017); } catch (UnknownHostException e) { throw new Exception(e); } DB db = mongo.getDB(dbname); this.collection = db.getCollection(collectionName); this.outputType = MapReduceCommand.OutputType.REPLACE; DBObject query = new BasicDBObject(); query.put("author", new BasicDBObject("$exists", Boolean.TRUE)); //this.output = output; this.mr_cmd = new MapReduceCommand(collection, map, reduce, output, outputType, query); }
public DomainsCounter(String host, String dbname, String collectionName, String output) throws Exception { MongoClient mongo = null; try { mongo = new MongoClient(host, 27017); } catch (UnknownHostException e) { throw new Exception(e); } DB db = mongo.getDB(dbname); this.collection = db.getCollection(collectionName); this.outputType = MapReduceCommand.OutputType.REPLACE; DBObject query = new BasicDBObject(); query.put("url", new BasicDBObject("$exists", Boolean.TRUE)); //this.output = output; this.mr_cmd = new MapReduceCommand(collection, map, reduce, output, outputType, query); }
public TagsCounter(String host, String dbname, String collectionName, String output) throws Exception { MongoClient mongo = null; try { mongo = new MongoClient(host, 27017); } catch (UnknownHostException e) { throw new Exception(e); } DB db = mongo.getDB(dbname); this.collection = db.getCollection(collectionName); this.outputType = MapReduceCommand.OutputType.REPLACE; DBObject query = new BasicDBObject(); query.put("tags", new BasicDBObject("$ne", new String[0])); //this.output = output; this.mr_cmd = new MapReduceCommand(collection, map, reduce, output, outputType, query); }
public ContributorsCounter(String host, String dbname, String collectionName, String output) throws Exception { MongoClient mongo = null; try { mongo = new MongoClient(host, 27017); } catch (UnknownHostException e) { throw new Exception(e); } DB db = mongo.getDB(dbname); this.collection = db.getCollection(collectionName); this.outputType = MapReduceCommand.OutputType.REPLACE; DBObject query = new BasicDBObject(); query.put("uid", new BasicDBObject("$exists", Boolean.TRUE)); //this.output = output; this.mr_cmd = new MapReduceCommand(collection, map, reduce, output, outputType, query); }
/** * @return the query to use against these results */ public Query<T> createQuery() { if (outputType == OutputType.INLINE) { throw new MappingException("No collection available for inline mapreduce jobs"); } return query.cloneQuery(); }
/** * @return the type of the operation * @deprecated use {@link #getOutputType()} instead */ @Deprecated public MapreduceType getType() { if (outputType == OutputType.REDUCE) { return MapreduceType.REDUCE; } else if (outputType == OutputType.MERGE) { return MapreduceType.MERGE; } else if (outputType == OutputType.INLINE) { return MapreduceType.INLINE; } else { return MapreduceType.REPLACE; } }
@Test(expected = MongoException.class) public void testBadMR() throws Exception { final String map = "function () { if(this['radius']) { doEmit('circle', {count:1}); return; } emit('rect', {count:1}); }"; final String reduce = "function (key, values) { var total = 0; for ( var i=0; i<values.length; i++ ) {total += values[i].count;} " + "return { count : total }; }"; getDs().mapReduce(new MapReduceOptions<ResultEntity>() .resultType(ResultEntity.class) .outputType(OutputType.REPLACE) .query(getAds().find(Shape.class)) .map(map) .reduce(reduce)); }
@Test public void testMapReduce() throws Exception { final Random rnd = new Random(); //create 100 circles and rectangles for (int i = 0; i < 100; i++) { getAds().insert("shapes", new Circle(rnd.nextDouble())); getAds().insert("shapes", new Rectangle(rnd.nextDouble(), rnd.nextDouble())); } final String map = "function () { if(this['radius']) { emit('circle', {count:1}); return; } emit('rect', {count:1}); }"; final String reduce = "function (key, values) { var total = 0; for ( var i=0; i<values.length; i++ ) {total += values[i].count;} " + "return { count : total }; }"; final MapreduceResults<ResultEntity> mrRes = getDs().mapReduce(new MapReduceOptions<ResultEntity>() .outputType(OutputType.REPLACE) .query(getAds().find(Shape.class)) .map(map) .reduce(reduce) .resultType(ResultEntity.class)); Assert.assertEquals(2, mrRes.createQuery().count()); Assert.assertEquals(100, mrRes.createQuery().get().getValue().count, 0); final MapreduceResults<ResultEntity> inline = getDs().mapReduce(new MapReduceOptions<ResultEntity>() .outputType(OutputType.INLINE) .query(getAds().find(Shape.class)).map(map).reduce(reduce) .resultType(ResultEntity.class)); final Iterator<ResultEntity> iterator = inline.iterator(); Assert.assertEquals(2, count(iterator)); Assert.assertEquals(100, inline.iterator().next().getValue().count, 0); }
@Test public void testCollation() { checkMinServerVersion(3.4); getDs().save(asList(new Book("The Banquet", "Dante", 2), new Book("Divine Comedy", "Dante", 1), new Book("Eclogues", "Dante", 2), new Book("The Odyssey", "Homer", 10), new Book("Iliad", "Homer", 10))); final String map = "function () { emit(this.author, 1); return; }"; final String reduce = "function (key, values) { return values.length }"; Query<Book> query = getAds().find(Book.class) .field("author").equal("dante"); MapReduceOptions<CountResult> options = new MapReduceOptions<CountResult>() .resultType(CountResult.class) .outputType(OutputType.INLINE) .query(query) .map(map) .reduce(reduce); Iterator<CountResult> iterator = getDs().mapReduce(options).getInlineResults(); Assert.assertEquals(0, count(iterator)); options .inputCollection(getMorphia().getMapper().getCollectionName(Book.class)) .collation(Collation.builder() .locale("en") .collationStrength(CollationStrength.SECONDARY) .build()); iterator = getDs().mapReduce(options).getInlineResults(); CountResult result = iterator.next(); Assert.assertEquals("Dante", result.getAuthor()); Assert.assertEquals(3D, result.getCount(), 0); }
@Test public void testBypassDocumentValidation() { checkMinServerVersion(3.4); getDs().save(asList(new Book("The Banquet", "Dante", 2), new Book("Divine Comedy", "Dante", 1), new Book("Eclogues", "Dante", 2), new Book("The Odyssey", "Homer", 10), new Book("Iliad", "Homer", 10))); Document validator = Document.parse("{ count : { $gt : '10' } }"); ValidationOptions validationOptions = new ValidationOptions() .validator(validator) .validationLevel(ValidationLevel.STRICT) .validationAction(ValidationAction.ERROR); MongoDatabase database = getMongoClient().getDatabase(TEST_DB_NAME); database.getCollection("counts").drop(); database.createCollection("counts", new CreateCollectionOptions().validationOptions(validationOptions)); final String map = "function () { emit(this.author, 1); return; }"; final String reduce = "function (key, values) { return values.length }"; MapReduceOptions<CountResult> options = new MapReduceOptions<CountResult>() .query(getDs().find(Book.class)) .resultType(CountResult.class) .outputType(OutputType.REPLACE) .map(map) .reduce(reduce); try { getDs().mapReduce(options); fail("Document validation should have complained."); } catch (MongoCommandException e) { // expected } getDs().mapReduce(options.bypassDocumentValidation(true)); Assert.assertEquals(2, count(getDs().find(CountResult.class).iterator())); }
OutputType getOutputType() { return outputType; }
@Override @Deprecated public <T> MapreduceResults<T> mapReduce(final MapreduceType type, final Query query, final Class<T> outputType, final MapReduceCommand baseCommand) { Assert.parametersNotNull("map", baseCommand.getMap()); Assert.parameterNotEmpty("map", baseCommand.getMap()); Assert.parametersNotNull("reduce", baseCommand.getReduce()); Assert.parameterNotEmpty("reduce", baseCommand.getReduce()); if (query.getOffset() != 0 || query.getFieldsObject() != null) { throw new QueryException("mapReduce does not allow the offset/retrievedFields query options."); } final OutputType outType = type.toOutputType(); final DBCollection dbColl = query.getCollection(); final MapReduceCommand cmd = new MapReduceCommand(dbColl, baseCommand.getMap(), baseCommand.getReduce(), baseCommand.getOutputTarget(), outType, query.getQueryObject()); cmd.setFinalize(baseCommand.getFinalize()); cmd.setScope(baseCommand.getScope()); if (query.getLimit() > 0) { cmd.setLimit(query.getLimit()); } if (query.getSortObject() != null) { cmd.setSort(query.getSortObject()); } if (LOG.isTraceEnabled()) { LOG.info("Executing " + cmd.toString()); } final EntityCache cache = createCache(); MapreduceResults<T> results = new MapreduceResults<T>(dbColl.mapReduce(baseCommand)); results.setType(type); if (MapreduceType.INLINE.equals(type)) { results.setInlineRequiredOptions(this, outputType, getMapper(), cache); } else { results.setQuery(newQuery(outputType, getDB().getCollection(results.getOutputCollectionName()))); } return results; }
@Test @SuppressWarnings("deprecation") public void mapReduceCommand() { Query<FacebookUser> query = getDs().find(FacebookUser.class); MapReduceOptions<FacebookUser> options = new MapReduceOptions<FacebookUser>() .bypassDocumentValidation(true) .collation(Collation.builder().locale("en").build()) .finalize("i'm a finalize function") .jsMode(true) .limit(42) .map("i'm a map function") .maxTimeMS(42000) .outputCollection("output collection") .outputDB("output db") .outputType(OutputType.INLINE) .query(query) .readPreference(ReadPreference.primaryPreferred()) .reduce("i'm a reduce function") .scope(new Document("key", "value").append("key2", "value2")) .verbose(true); MapReduceCommand command = options.toCommand(getMorphia().getMapper()); assertTrue(command.getBypassDocumentValidation()); assertEquals(Collation.builder().locale("en").build(), command.getCollation()); assertTrue(command.getJsMode()); assertEquals(42, command.getLimit()); assertEquals("i'm a map function", command.getMap()); assertEquals(42000, command.getMaxTime(TimeUnit.MILLISECONDS)); assertEquals("output collection", command.getOutputTarget()); assertEquals("output db", command.getOutputDB()); assertEquals(query.getQueryObject(), command.getQuery()); assertEquals(query.getSortObject(), command.getSort()); assertEquals(ReadPreference.primaryPreferred(), command.getReadPreference()); assertEquals("i'm a map function", command.getMap()); assertEquals("i'm a reduce function", command.getReduce()); assertEquals("i'm a finalize function", command.getFinalize()); assertEquals(new Document("key", "value").append("key2", "value2"), command.getScope()); assertTrue(command.isVerbose()); }
/** * @return the type of the operation * @since 1.3 */ public OutputType getOutputType() { return outputType; }
/** * Sets the output type for this mapreduce job * * @param outputType the output type * @since 1.3 */ public void setOutputType(final OutputType outputType) { this.outputType = outputType; }
/** * Creates an Iterator over the results of the operation. * * @return the Iterator */ @Override public Iterator<T> iterator() { return outputType == OutputType.INLINE ? getInlineResults() : createQuery().fetch().iterator(); }