/** * 最大统计 * * @param collectionName * @param match * @param maxField * @return */ public Object max(String collectionName, Document match, String maxField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, Accumulators.max("_max", "$" + maxField)) ) ); Document first = aggregate.first(); if (first != null) { return first.get("_max"); } return null; }
/** * 根据统计字段计算统计结果(gte最小值)并排序 * * @param collectionName 集合名 * @param match match条件 * @param field 统计字段 * @param minCount 最小值 * @return */ public LinkedHashMap<String, Integer> sortMap(String collectionName, Document match, String field, int minCount) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group("$" + field, Accumulators.sum("_count", 1)) , match(new Document("_count", new Document("$gte", minCount))) , sort(new Document("_count", -1)) ) ); LinkedHashMap<String, Integer> map = new LinkedHashMap<String, Integer>(); MongoCursor<Document> iterator = aggregate.iterator(); while (iterator.hasNext()) { Document next = iterator.next(); map.put(next.getString("_id"), next.getInteger("_count")); } return map; }
/** * Get the MongoDB cursor. */ private MongoCursor<Document> getCursor(int skip) { if (cursor == null && cursorId == 0) { Document query = Document.parse(config.getMongo().getQuery()); List<Bson> pipes = new ArrayList<>(3); pipes.add(match(query)); pipes.add(skip(skip)); Optional.ofNullable(config.getMongo().getProject()).ifPresent(p -> pipes.add(project(Document.parse(p)))); AggregateIterable<Document> aggregate = collection.aggregate(pipes) .allowDiskUse(true) .useCursor(true); cursor = aggregate.iterator(); // TODO: Persist cursor ID somewhere to allow restarts. Optional.ofNullable(cursor.getServerCursor()).ifPresent(serverCursor -> cursorId = serverCursor.getId()); } else if (cursor == null && cursorId != 0) { // TODO: Lookup cursor ID for resume. // Open existing cursor in case of restart?? } return cursor; }
/** * Fetches the most recent version of a stream. * @return the most recent version. * @param eventStreamId */ public long getMostRecentStreamVersion(String eventStreamId) { MongoCollection<Document> casualStreams = database.getCollection(CASUAL_STREAMS_COLLECTION); AggregateIterable<Document> aggregate = casualStreams.aggregate( Lists.newArrayList( new Document("$match", new Document("_id", new ObjectId(eventStreamId)) ), new Document("$group", new Document("_id", "$_id") .append("maxVersion", new Document("$max", "$version_")) ) ) ); return Optional .ofNullable(aggregate.first()) .map(d -> d.getLong("maxVersion")) .orElse(0L); }
/** * Extracts aggregated statuses for environment resources over time * * @param environmentName environment that resources belongs to * @param resourceIds resource id`s to fetch aggregated statuses * @param from start date frame * @param to end date frame * @return */ public synchronized List<AggregatedResourceStatus> getAggregatedStatuses(String environmentName, Set<String> resourceIds, Date from, Date to) { List<AggregatedResourceStatus> aggStatusesResulting = new ArrayList<>(); // build start and end time frames for each month in date range List<Date[]> dateFrames = DataUtils.splitDatesIntoMonths(from, to); // iterate over each time frame and extract aggregated results for (Date[] dates : dateFrames) { switchCollection(dates[0]); // leaving creation of aggregation pipeline as is without fancy stuff, just to keep readable Document searchQuery = new Document("environmentName", environmentName).append("updated", new Document("$gte", dates[0]).append("$lte", dates[1])); if (resourceIds != null) searchQuery.append("resource.resourceId", new Document("$in", resourceIds)); //aggregate statuses AggregateIterable<Document> documents = thisCollection.aggregate(Arrays.asList( match(searchQuery), Document.parse("{$group: {'_id':{ 'res': '$resource','status':'$statusOrdinal'},'count':{ '$sum' :1}}}"), Document.parse("{$group: {'_id':{ 'resource': '$_id.res'},'statuses': {'$push': {'statusOrdinal':'$_id.status', 'count': '$count'}}, 'count':{'$sum':'$count'}}}") )); // map into aggregated statuses POJO List<AggregatedResourceStatus> aggStatuses = documents.map(DocumentMapper::aggregatedResourceStatusFromDocument).into(new ArrayList<>()); aggStatusesResulting.addAll(aggStatuses); } // now merging aggregated statuses for each month return buildAggregatedResourceList(aggStatusesResulting); }
public String getAverageAgeByCompany() { AggregateIterable<Document> documents = userCollection.aggregate( Arrays.asList( Aggregates.group("$company", Accumulators.avg("averageAge", "$age")), Aggregates.sort(Sorts.ascending("_id")) )); System.err.println(JSON.serialize(documents)); return JSON.serialize(documents); }
/** * 统计频数 * * @param collectionName * @param match * @param distinctField * @return */ public int distinctCount(String collectionName, Document match, String distinctField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, addToSet("_array", "$" + distinctField)) , project(new Document("_num", new Document("$size", "$_array"))) ) ); Document first = aggregate.first(); if (first != null) { return first.getInteger("_num"); } return 0; }
/** * 获取不一样的记录 * * @param collectionName * @param match * @param distinctField * @return */ public List distinct(String collectionName, Document match, String distinctField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, addToSet("_array", "$" + distinctField)) ) ); Document first = aggregate.first(); if (first != null) { return (List) first.get("_array"); } return null; }
/** * 最小统计 * * @param collectionName * @param match * @param minField * @return */ public Object min(String collectionName, Document match, String minField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, Accumulators.min("_min", "$" + minField)) ) ); Document first = aggregate.first(); if (first != null) { return first.get("_min"); } return null; }
/** * 合统计 * * @param collectionName * @param match * @param sumField * @return */ public Double sum(String collectionName, Document match, String sumField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, Accumulators.sum("_sum", "$" + sumField)) ) ); Document first = aggregate.first(); if (first != null) { return first.getDouble("_sum"); } return null; }
/** * 平均统计 * * @param collectionName * @param match * @param avgField * @return */ public Double avg(String collectionName, Document match, String avgField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, Accumulators.avg("_avg", "$" + avgField)) ) ); Document first = aggregate.first(); if (first != null) { return first.getDouble("_avg"); } return null; }
/** * 最近统计 * * @param collectionName * @param match * @param lastField * @param sort * @return */ public Object last(String collectionName, Document match, String lastField, Document sort) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , sort(sort) , group(null, Accumulators.last("_last", "$" + lastField)) ) ); Document first = aggregate.first(); if (first != null) { return first.get("_last"); } return null; }
/** * 标准差统计 * * @param collectionName * @param match * @param stdDevField * @return */ public Double stdDevPop(String collectionName, Document match, String stdDevField) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , group(null, Accumulators.stdDevPop("_stdDev", "$" + stdDevField)) ) ); Document first = aggregate.first(); if (first != null) { return first.getDouble("_stdDev"); } return null; }
/** * 采样标准差统计 * * @param collectionName * @param match * @param stdDevField * @param sampleSize * @return */ public Double stdDevSamp(String collectionName, Document match, String stdDevField, int sampleSize) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match) , sample(sampleSize) , group(null, Accumulators.stdDevSamp("_stdDev", "$" + stdDevField)) ) ); Document first = aggregate.first(); if (first != null) { return first.getDouble("_stdDev"); } return null; }
/** * 统计值 是否在统计结果(gte最小值)中 * * @param collectionName 集合名 * @param match match条件 * @param field 统计字段 * @param value 统计值 * @param minCount 最小值 * @return */ public boolean inSortMap(String collectionName, Document match, String field, Object value, int minCount) { AggregateIterable<Document> aggregate = getDB().getCollection(collectionName).aggregate( Arrays.asList( match(match.append(field, value)) , group("$" + field, Accumulators.sum("_count", 1)) , match(new Document("_count", new Document("$gte", minCount))) ) ); Document first = aggregate.first(); return first == null ? false : true; }
private void submitBatchQuery() { int count = 0; executedRangeMap.clear(); final List<Document> pipeline = new ArrayList<>(); final List<DBObject> match = new ArrayList<>(); while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){ count++; RyaStatement query = queryIterator.next(); executedRangeMap.putAll(query, rangeMap.get(query)); final DBObject currentQuery = strategy.getQuery(query); match.add(currentQuery); } if (match.size() > 1) { pipeline.add(new Document("$match", new Document("$or", match))); } else if (match.size() == 1) { pipeline.add(new Document("$match", match.get(0))); } else { batchQueryResultsIterator = Iterators.emptyIterator(); return; } // Executing redact aggregation to only return documents the user has access to. pipeline.addAll(AggregationUtil.createRedactPipeline(auths)); log.info(pipeline); final AggregateIterable<Document> aggIter = coll.aggregate(pipeline); aggIter.batchSize(1000); batchQueryResultsIterator = aggIter.iterator(); }
static AggregateIterable<Document> callAggregateCmd( MongoCollection<Document> mongoCollection, QueryProperties queryProps ) throws OdaException { if( ! queryProps.hasAggregateCommand() ) return null; DBObject operationExprObj = queryProps.getOperationExprAsParsedObject( true ); if( operationExprObj == null ) return null; // convert user-specified operation expression to operation pipeline List<Document> operationList = QueryProperties .getObjectsAsDocumentList( operationExprObj ); //DBObject firstOp = QueryProperties.getFirstObjectSet( operationExprObj ); if( operationList == null ) return null; // no valid DBObject operation //DBObject[] addlOps = QueryProperties.getSecondaryObjectSets( operationExprObj ); // aggregation $limit and $skip operators applies to the number // of documents in the *input* pipeline, and thus cannot be used to apply // the searchLimit and numSkipDocuments properties defined for data set // $match and $sort pipeline operators are built in an aggregate command // execute the aggregate command try { return mongoCollection.aggregate( operationList ); } catch( RuntimeException ex ) { OdaException odaEx = new OdaException( Messages.mDbOp_aggrCmdFailed ); odaEx.initCause( ex ); throw odaEx; } }
/** * Constructor. * @param aggIter Iterator of documents in AggregationPipelineQueryNode's * intermediate solution representation. * @param varToOriginalName A mapping from field names in the pipeline * result documents to equivalent variable names in the original query. * Where an entry does not exist for a field, the field name and variable * name are assumed to be the same. * @param bindings A partial solution. May be empty. */ public PipelineResultIteration(AggregateIterable<Document> aggIter, Map<String, String> varToOriginalName, BindingSet bindings) { this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName); this.bindings = Preconditions.checkNotNull(bindings); Preconditions.checkNotNull(aggIter); aggIter.batchSize(BATCH_SIZE); this.cursor = aggIter.iterator(); }
<T extends IEntity, RESULT> AggregateIterable<RESULT> aggregate(Class<T> entity, Class<RESULT> resultClass, Aggregation... aggregations) throws Exception;