Java 类com.mongodb.client.model.Aggregates 实例源码
项目:digital-display-garden-iteration-4-dorfner-v2
文件:PlantController.java
/**
* Takes `uploadID` and returns all bed names as a json format string
* @param uploadID - the year that the data was uploaded
* @return String representation of json with all bed names
*/
public String getGardenLocationsAsJson(String uploadID){
AggregateIterable<Document> documents
= plantCollection.aggregate(
Arrays.asList(
Aggregates.match(eq("uploadID", uploadID)), //!! Order is important here
Aggregates.group("$gardenLocation"),
Aggregates.sort(Sorts.ascending("_id"))
));
List<Document> listDoc = new ArrayList<>();
for (Document doc : documents) {
listDoc.add(doc);
}
listDoc.sort(new BedComparator());
return JSON.serialize(listDoc);
}
项目:digital-display-garden-iteration-4-revolverenguardia-1
文件:ExcelParser.java
/**
*
* @return a date-sorted List of all the distinct uploadIds in the DB
*/
public static List<String> listUploadIds(MongoDatabase database) {
MongoCollection<Document> plantCollection = database.getCollection("plants");
AggregateIterable<Document> documents
= plantCollection.aggregate(
Arrays.asList(
Aggregates.group("$uploadId"),
Aggregates.sort(Sorts.ascending("_id"))
));
List<String> lst = new LinkedList<>();
for(Document d: documents) {
lst.add(d.getString("_id"));
}
return lst;
}
项目:Backend
文件:MongoDBManager.java
public int returnValueToURL(String URL)
{
Block<Document> printBlock = new Block<Document>() {
@Override
public void apply(final Document document) {
System.out.println(document.toJson());
}
};
MongoCollection<Document> collection = db.getCollection("ratings");
collection.aggregate(
Arrays.asList(
Aggregates.group("URL", Accumulators.avg("rating", 1))))
.forEach(printBlock);
System.out.println(printBlock.toString());
return 0;
}
项目:digital-display-garden-iteration-3-sixguysburgers-fries
文件:MongoSpec.java
@Test
public void ageCounts() {
AggregateIterable<Document> documents
= userDocuments.aggregate(
Arrays.asList(
/*
* Groups data by the "age" field, and then counts
* the number of documents with each given age.
* This creates a new "constructed document" that
* has "age" as it's "_id", and the count as the
* "ageCount" field.
*/
Aggregates.group("$age",
Accumulators.sum("ageCount", 1)),
Aggregates.sort(Sorts.ascending("_id"))
)
);
List<Document> docs = intoList(documents);
assertEquals("Should be two distinct ages", 2, docs.size());
assertEquals(docs.get(0).get("_id"), 25);
assertEquals(docs.get(0).get("ageCount"), 1);
assertEquals(docs.get(1).get("_id"), 37);
assertEquals(docs.get(1).get("ageCount"), 2);
}
项目:digital-display-garden-iteration-3-sixguysburgers-fries
文件:MongoSpec.java
@Test
public void averageAge() {
AggregateIterable<Document> documents
= userDocuments.aggregate(
Arrays.asList(
Aggregates.group("$company",
Accumulators.avg("averageAge", "$age")),
Aggregates.sort(Sorts.ascending("_id"))
));
List<Document> docs = intoList(documents);
assertEquals("Should be three companies", 3, docs.size());
assertEquals("Frogs, Inc.", docs.get(0).get("_id"));
assertEquals(37.0, docs.get(0).get("averageAge"));
assertEquals("IBM", docs.get(1).get("_id"));
assertEquals(37.0, docs.get(1).get("averageAge"));
assertEquals("UMM", docs.get(2).get("_id"));
assertEquals(25.0, docs.get(2).get("averageAge"));
}
项目:digital-display-garden-iteration-4-dorfner-v2
文件:MongoSpec.java
@Test
public void ageCounts() {
AggregateIterable<Document> documents
= userDocuments.aggregate(
Arrays.asList(
/*
* Groups data by the "age" field, and then counts
* the number of documents with each given age.
* This creates a new "constructed document" that
* has "age" as it's "_id", and the count as the
* "ageCount" field.
*/
Aggregates.group("$age",
Accumulators.sum("ageCount", 1)),
Aggregates.sort(Sorts.ascending("_id"))
)
);
List<Document> docs = intoList(documents);
assertEquals("Should be two distinct ages", 2, docs.size());
assertEquals(docs.get(0).get("_id"), 25);
assertEquals(docs.get(0).get("ageCount"), 1);
assertEquals(docs.get(1).get("_id"), 37);
assertEquals(docs.get(1).get("ageCount"), 2);
}
项目:digital-display-garden-iteration-4-dorfner-v2
文件:MongoSpec.java
@Test
public void averageAge() {
AggregateIterable<Document> documents
= userDocuments.aggregate(
Arrays.asList(
Aggregates.group("$company",
Accumulators.avg("averageAge", "$age")),
Aggregates.sort(Sorts.ascending("_id"))
));
List<Document> docs = intoList(documents);
assertEquals("Should be three companies", 3, docs.size());
assertEquals("Frogs, Inc.", docs.get(0).get("_id"));
assertEquals(37.0, docs.get(0).get("averageAge"));
assertEquals("IBM", docs.get(1).get("_id"));
assertEquals(37.0, docs.get(1).get("averageAge"));
assertEquals("UMM", docs.get(2).get("_id"));
assertEquals(25.0, docs.get(2).get("averageAge"));
}
项目:digital-display-garden-iteration-2-spraguesanborn
文件:MongoSpec.java
@Test
public void ageCounts() {
AggregateIterable<Document> documents
= userDocuments.aggregate(
Arrays.asList(
/*
* Groups data by the "age" field, and then counts
* the number of documents with each given age.
* This creates a new "constructed document" that
* has "age" as it's "_id", and the count as the
* "ageCount" field.
*/
Aggregates.group("$age",
Accumulators.sum("ageCount", 1)),
Aggregates.sort(Sorts.ascending("_id"))
)
);
List<Document> docs = intoList(documents);
assertEquals("Should be two distinct ages", 2, docs.size());
assertEquals(docs.get(0).get("_id"), 25);
assertEquals(docs.get(0).get("ageCount"), 1);
assertEquals(docs.get(1).get("_id"), 37);
assertEquals(docs.get(1).get("ageCount"), 2);
}
项目:digital-display-garden-iteration-2-spraguesanborn
文件:MongoSpec.java
@Test
public void averageAge() {
AggregateIterable<Document> documents
= userDocuments.aggregate(
Arrays.asList(
Aggregates.group("$company",
Accumulators.avg("averageAge", "$age")),
Aggregates.sort(Sorts.ascending("_id"))
));
List<Document> docs = intoList(documents);
assertEquals("Should be three companies", 3, docs.size());
assertEquals("Frogs, Inc.", docs.get(0).get("_id"));
assertEquals(37.0, docs.get(0).get("averageAge"));
assertEquals("IBM", docs.get(1).get("_id"));
assertEquals(37.0, docs.get(1).get("averageAge"));
assertEquals("UMM", docs.get(2).get("_id"));
assertEquals(25.0, docs.get(2).get("averageAge"));
}
项目:sam
文件:ServerResource.java
private static List<Bson> getServerQuery(Bson filter) {
final List<Bson> pipeline = new ArrayList<>(6);
if (filter != ALL) {
pipeline.add(Aggregates.match(filter));
}
pipeline.add(Aggregates.unwind("$deployments", new UnwindOptions().preserveNullAndEmptyArrays(true)));
pipeline.add(Aggregates.lookup(Collections.APPLICATIONS, "deployments.applicationId", "id", "applications"));
pipeline.add(Aggregates.unwind("$applications", new UnwindOptions().preserveNullAndEmptyArrays(true)));
pipeline.add(Aggregates.group(
new Document().append("hostname", "$hostname").append("environment", "$environment"),
new BsonField("fqdn", new Document("$first", "$fqdn")),
new BsonField("description", new Document("$first", "$description")),
new BsonField("os", new Document("$first", "$os")),
new BsonField("network", new Document("$first", "$network")),
new BsonField("meta", new Document("$first", "$meta")),
new BsonField("attributes", new Document("$first", "$attributes")),
new BsonField("applications", new Document("$push", "$applications")),
new BsonField("deployments", new Document("$push", "$deployments"))));
pipeline.add(Aggregates.sort(Sorts.ascending("_id")));
return pipeline;
}
项目:digital-display-garden-iteration-4-revolverenguardia-1
文件:PlantController.java
/**
* Get a json containing a list of commonNames sorted by common name
* @param uploadID
* @return
*/
public String getCommonNamesJSON(String uploadID){
if (!ExcelParser.isValidUploadId(db, uploadID))
return "null";
AggregateIterable<Document> documents
= plantCollection.aggregate(
Arrays.asList(
Aggregates.match(eq("uploadId", uploadID)), //!! Order is important here
Aggregates.group("$commonName"),
Aggregates.sort(Sorts.ascending("commonName"))
));
return JSON.serialize(documents);
}
项目:digital-display-garden-iteration-3-sixguysburgers-fries
文件:PlantController.java
public String getGardenLocationsAsJson(String uploadID) {
AggregateIterable<Document> documents
= plantCollection.aggregate(
Arrays.asList(
Aggregates.match(eq("uploadId", uploadID)), //!! Order is important here
Aggregates.group("$gardenLocation"),
Aggregates.sort(Sorts.ascending("_id"))
));
return JSON.serialize(documents);
}
项目:digital-display-garden-iteration-3-sixguysburgers-fries
文件:PlantController.java
/**
*
* @return a sorted JSON array of all the distinct uploadIds in the DB
*/
public String listUploadIds() {
AggregateIterable<Document> documents
= plantCollection.aggregate(
Arrays.asList(
Aggregates.group("$uploadId"),
Aggregates.sort(Sorts.ascending("_id"))
));
List<String> lst = new LinkedList<>();
for(Document d: documents) {
lst.add(d.getString("_id"));
}
return JSON.serialize(lst);
// return JSON.serialize(plantCollection.distinct("uploadId","".getClass()));
}
项目:digital-display-garden-iteration-4-dorfner-v2
文件:PlantController.java
/**
*
* @return a sorted JSON array of all the distinct uploadIDs in plant collection of the DB
*/
public List<String> listUploadIDs() {
AggregateIterable<Document> documents
= plantCollection.aggregate(
Arrays.asList(
Aggregates.group("$uploadID"),
Aggregates.sort(Sorts.ascending("_id"))
));
List<String> lst = new LinkedList<>();
for(Document d: documents) {
lst.add(d.getString("_id"));
}
return lst;
// return JSON.serialize(plantCollection.distinct("uploadID","".getClass()));
}
项目:mongodb-rdbms-sync
文件:SyncNodeDetailsDao.java
public List<SyncNodeDetails> getNodeDetails(String lifeCycle) {
UnwindOptions options = new UnwindOptions();
options.preserveNullAndEmptyArrays(true);
Document group = new Document("$group",
new Document(SyncAttrs.ID, new Document("_id", "$_id").append("host","$host").append("node","$node").append("state","$state")
.append("concurrencyLevel","$concurrencyLevel").append("totalHeapSize", "$totalHeapSize")
.append("usedHeapSize", "$usedHeapSize").append("lifeCycle", "$lifeCycle"))
.append("eventArr", new Document("$addToSet", "$event_docs")));
return migrationNodeMapping.aggregate(Arrays.asList(Aggregates.match(Filters.eq(SyncAttrs.LIFE_CYCLE,lifeCycle)),
Aggregates.unwind("$activeEvents",options),
Aggregates.lookup("SyncEvents", "activeEvents", "_id", "event_docs"),
Aggregates.unwind("$event_docs", options),
group,Aggregates.project(new Document("node", "$_id").append("events","$eventArr").append("_id", false))), SyncNodeDetails.class)
.into(new ArrayList<SyncNodeDetails>());
}
项目:mongodb-rdbms-sync
文件:SyncEventDao.java
public SyncMarker getEventStats(ObjectId eventId) {
Document group = new Document("$group",
new Document(SyncAttrs.ID, null).append(SyncAttrs.TOTAL_ROWS, new Document("$sum", "$marker.totalRows"))
.append(SyncAttrs.ROWS_READ, new Document("$sum", "$marker.rowsRead"))
.append(SyncAttrs.ROWS_DUMPED, new Document("$sum", "$marker.rowsDumped"))
.append(SyncAttrs.START_TIME, new Document("$min", "$marker.startTime"))
.append(SyncAttrs.END_TIME, new Document("$max", "$marker.endTime")));
return syncEvents.aggregate(Arrays.asList(Aggregates.match(Filters.eq(SyncAttrs.PARENT_EVENT_ID, eventId)),
Aggregates.project(Projections.include(SyncAttrs.MARKER)), group), SyncMarker.class).first();
}
项目:mongodb-rdbms-sync
文件:SyncEventDao.java
public List<SyncError> getEventErrors(ObjectId eventId) {
Document group = new Document("$group",
new Document(SyncAttrs.ID, null).append(SyncAttrs.ERRORS, new Document("$addToSet", "$errors")));
return syncEvents.aggregate(
Arrays.asList(Aggregates.match(Filters.eq(SyncAttrs.PARENT_EVENT_ID, eventId)),
Aggregates.unwind("$errors"),
Aggregates
.project(Projections.include(SyncAttrs.ERRORS)),
group, Aggregates.unwind("$errors"),
Aggregates.project(new Document(SyncAttrs.ERROR_MESSAGE, "$errors.errorMessage")
.append(SyncAttrs.TRACE, "$errors.trace")
.append(SyncAttrs.THREAD_NAME, "$errors.threadName"))),
SyncError.class).allowDiskUse(true).into(new ArrayList<SyncError>());
}
项目:digital-display-garden-iteration-2-spraguesanborn
文件:UserController.java
public String getAverageAgeByCompany() {
AggregateIterable<Document> documents
= userCollection.aggregate(
Arrays.asList(
Aggregates.group("$company",
Accumulators.avg("averageAge", "$age")),
Aggregates.sort(Sorts.ascending("_id"))
));
System.err.println(JSON.serialize(documents));
return JSON.serialize(documents);
}
项目:ibm-performance-monitor
文件:ProfiledMongoClientTest.java
@Test
public void testAggregate()
{
List<Document> docList = new ArrayList<Document>();
coll.aggregate(Arrays.asList(Aggregates.match(Filters.eq("name", "Alto")),
Aggregates.group("color", Accumulators.sum("count", 1)))).into(docList);
assertEquals(1, docList.size());
docList.clear();
Document first = coll
.aggregate(Arrays.asList(Aggregates.match(Filters.eq("name", "Alto")),
Aggregates.group("color", Accumulators.sum("count", 1))), Document.class)
.allowDiskUse(true).batchSize(12).bypassDocumentValidation(true).collation(Collation.builder().build())
.first();
Assert.assertNotNull(first);
first = coll
.aggregate(Arrays.asList(Aggregates.match(Filters.eq("name", "Alto")),
Aggregates.group("color", Accumulators.sum("count", 1))), Document.class)
.allowDiskUse(true).batchSize(12).bypassDocumentValidation(true).collation(Collation.builder().build())
.map(new Function<Document, Document>()
{
@Override
public Document apply(Document t)
{
t.put("hello", "world");
return t;
}
}).first();
Assert.assertNotNull(first);
}
项目:ibm-performance-monitor
文件:MongoUtilitiesTest.java
@Test
public void testFilterParametersBsonDocumentMatchInAndEq()
{
Bson in = Filters.in("dateKey", "593898622313868b72a296ad", "593898622313868b72a296b4");
Bson eq = Filters.eq("eqfield","123");
Bson and = Filters.and(in, eq);
Bson match = Aggregates.match(and);
BsonDocument filterParameters = MongoUtilities.filterParameters(match);
assertEquals("{ \"$match\" : { \"dateKey\" : { \"$in\" : [\"*?\"] }, \"eqfield\" : \"?\" } }", filterParameters.toString());
}
项目:ibm-performance-monitor
文件:MongoUtilitiesTest.java
@Test
public void testFilterParametersBsonDocumentMatchInOrEq()
{
Bson in = Filters.in("dateKey", "593898622313868b72a296ad", "593898622313868b72a296b4");
Bson eq = Filters.eq("eqfield","123");
Bson or = Filters.or(in, eq);
Bson match = Aggregates.match(or);
BsonDocument filterParameters = MongoUtilities.filterParameters(match);
assertEquals("{ \"$match\" : { \"$or\" : [{ \"dateKey\" : { \"$in\" : [\"*?\"] } }, { \"eqfield\" : \"?\" }] } }", filterParameters.toString());
}
项目:sam
文件:ApplicationResource.java
private PaginatedCollection<Application> findApplications(Bson filter) {
final List<Bson> pipeline = new ArrayList<>(2);
if (filter != ALL) {
pipeline.add(Aggregates.match(filter));
}
pipeline.add(Aggregates.lookup(Collections.GROUPS, "group", "id", "group"));
return RestHelper.paginatedList(database
.getCollection(Collections.APPLICATIONS)
.aggregate(pipeline)
.map(Application::new)
);
}
项目:sam
文件:ApplicationResource.java
private Document findApplication(Bson filter) {
final Document bson = database.getCollection(Collections.APPLICATIONS)
.aggregate(Lists.newArrayList(
Aggregates.match(filter),
Aggregates.lookup(Collections.GROUPS, "group", "id", "group")
)).first();
if (bson == null) {
throw new WebApplicationException(Status.NOT_FOUND);
}
return bson;
}
项目:sam
文件:GroupResource.java
private Map<String,Group> getAllGroups() {
return Maps.uniqueIndex(database
.getCollection(Collections.GROUPS)
.aggregate(Lists.newArrayList(
Aggregates.lookup(Collections.APPLICATIONS, "id", "group", "applications"),
Aggregates.lookup(Collections.ASSETS, "id", "group", "assets")
)).map(Group::new),
t->t.id
);
}
项目:sam
文件:GroupResource.java
private PaginatedCollection<Tag> getTags() {
return RestHelper.paginatedList(database
.getCollection(Collections.GROUPS)
.aggregate(Lists.newArrayList(
Aggregates.unwind("$tags"),
Aggregates.group("$tags")
)).map(t->new Tag(t.getString("_id")))
);
}
项目:sam
文件:GroupResource.java
private List<String> getRootGroupIds(Optional<Function<String,Bson>> filterProvider) {
final List<Bson> pipeline = new ArrayList<>(5);
/*
* Optional filter, needs to be applied both before and after self join to include
* groups with inbound links from non tagged groups
*/
Bson inboundLinksFilter = Filters.size("inbound_links", 0);
if (filterProvider.isPresent()) {
final Bson tagFilter = filterProvider.get().apply("tags");
pipeline.add(Aggregates.match(tagFilter));
final Bson inboundLinksTagFilter = filterProvider.get().apply("inbound_links.tags");
inboundLinksFilter = Filters.or(inboundLinksFilter, Filters.not(inboundLinksTagFilter));
}
// Unwind groups field to be able to self-join
pipeline.add(Aggregates.unwind("$groups", new UnwindOptions().preserveNullAndEmptyArrays(true)));
// Self join on inbound references: group.groups -> group.id and filter no inbound references
pipeline.add(Aggregates.lookup(Collections.GROUPS, "id", "groups.id", "inbound_links"));
pipeline.add(Aggregates.match(inboundLinksFilter));
// Group on id to get distinct group names
pipeline.add(Aggregates.group("$id"));
return database
.getCollection(Collections.GROUPS)
.aggregate(pipeline)
.map(t->t.getString("_id"))
.into(Lists.newArrayList());
}
项目:sam
文件:ServerResource.java
private PaginatedCollection<String> getEnvironments() {
return RestHelper.paginatedList(database
.getCollection(Collections.SERVERS)
.aggregate(Lists.newArrayList(
Aggregates.group("$environment")
)).map(t->t.getString("_id"))
);
}
项目:sam
文件:AssetResource.java
private PaginatedCollection<Asset> findAssets(Bson filter) {
final List<Bson> pipeline = new ArrayList<>(2);
if (filter != ALL) {
pipeline.add(Aggregates.match(filter));
}
pipeline.add(Aggregates.lookup(Collections.GROUPS, "group", "id", "group"));
return RestHelper.paginatedList(database
.getCollection(Collections.ASSETS)
.aggregate(pipeline)
.map(Asset::new)
);
}
项目:sam
文件:AssetResource.java
private Document findAsset(Bson filter) {
final Document bson = database.getCollection(Collections.ASSETS)
.aggregate(Lists.newArrayList(
Aggregates.match(filter),
Aggregates.lookup(Collections.GROUPS, "group", "id", "group")
)).first();
if (bson == null) {
throw new WebApplicationException(Status.NOT_FOUND);
}
return bson;
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Create a pipeline query node based on a StatementPattern.
* @param collection The collection of triples to query.
* @param baseSP The leaf node in the query tree.
*/
public AggregationPipelineQueryNode(MongoCollection<Document> collection, StatementPattern baseSP) {
this.collection = Preconditions.checkNotNull(collection);
Preconditions.checkNotNull(baseSP);
this.varToOriginalName = HashBiMap.create();
StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName);
this.assuredBindingNames = new HashSet<>(mapping.varNames());
this.bindingNames = new HashSet<>(mapping.varNames());
this.pipeline = new LinkedList<>();
this.pipeline.add(Aggregates.match(getMatchExpression(baseSP)));
this.pipeline.add(Aggregates.project(mapping.getProjectExpression()));
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Add a $group step to filter out redundant solutions.
* @return True if the distinct operation was successfully appended.
*/
public boolean distinct() {
List<String> key = new LinkedList<>();
for (String varName : bindingNames) {
key.add(hashFieldExpr(varName));
}
List<BsonField> reduceOps = new LinkedList<>();
for (String field : FIELDS) {
reduceOps.add(new BsonField(field, new Document("$first", "$" + field)));
}
pipeline.add(Aggregates.group(new Document("$concat", key), reduceOps));
return true;
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Add a join with an individual {@link StatementPattern} to the pipeline.
* @param sp The statement pattern to join with
* @return true if the join was successfully added to the pipeline.
*/
public boolean joinWith(StatementPattern sp) {
Preconditions.checkNotNull(sp);
// 1. Determine shared variables and new variables
StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName);
NavigableSet<String> sharedVars = new ConcurrentSkipListSet<>(spMap.varNames());
sharedVars.retainAll(assuredBindingNames);
// 2. Join on one shared variable
String joinKey = sharedVars.pollFirst();
String collectionName = collection.getNamespace().getCollectionName();
Bson join;
if (joinKey == null) {
return false;
}
else {
join = Aggregates.lookup(collectionName,
HASHES + "." + joinKey,
spMap.hashField(joinKey),
JOINED_TRIPLE);
}
pipeline.add(join);
// 3. Unwind the joined triples so each document represents a binding
// set (solution) from the base branch and a triple that may match.
pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE));
// 4. (Optional) If there are any shared variables that weren't used as
// the join key, project all existing fields plus a new field that
// tests the equality of those shared variables.
BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE);
if (!sharedVars.isEmpty()) {
List<Bson> eqTests = new LinkedList<>();
for (String varName : sharedVars) {
String oldField = valueFieldExpr(varName);
String newField = joinFieldExpr(spMap.valueField(varName));
Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField));
eqTests.add(eqTest);
}
Bson eqProjectOpts = Projections.fields(
Projections.computed(FIELDS_MATCH, Filters.and(eqTests)),
Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP));
pipeline.add(Aggregates.project(eqProjectOpts));
matchOpts.put(FIELDS_MATCH, true);
}
// 5. Filter for solutions whose triples match the joined statement
// pattern, and, if applicable, whose additional shared variables
// match the current solution.
pipeline.add(Aggregates.match(matchOpts));
// 6. Project the results to include variables from the new SP (with
// appropriate renaming) and variables referenced only in the base
// pipeline (with previous names).
Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName)
.getProjectExpression(assuredBindingNames,
str -> joinFieldExpr(str));
assuredBindingNames.addAll(spMap.varNames());
bindingNames.addAll(spMap.varNames());
pipeline.add(Aggregates.project(finalProjectOpts));
return true;
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Add a SPARQL filter to the pipeline, if possible. A filter eliminates
* results that don't satisfy a given condition. Not all conditional
* expressions are supported. If unsupported expressions are used in the
* filter, the pipeline will remain unchanged and this method will return
* false. Currently only supports binary {@link Compare} conditions among
* variables and/or literals.
* @param condition The filter condition
* @return True if the filter was successfully converted into a pipeline
* step, false otherwise.
*/
public boolean filter(ValueExpr condition) {
if (condition instanceof Compare) {
Compare compare = (Compare) condition;
Compare.CompareOp operator = compare.getOperator();
Object leftArg = valueFieldExpr(compare.getLeftArg());
Object rightArg = valueFieldExpr(compare.getRightArg());
if (leftArg == null || rightArg == null) {
// unsupported value expression, can't convert filter
return false;
}
final String opFunc;
switch (operator) {
case EQ:
opFunc = "$eq";
break;
case NE:
opFunc = "$ne";
break;
case LT:
opFunc = "$lt";
break;
case LE:
opFunc = "$le";
break;
case GT:
opFunc = "$gt";
break;
case GE:
opFunc = "$ge";
break;
default:
// unrecognized comparison operator, can't convert filter
return false;
}
Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg));
pipeline.add(Aggregates.project(Projections.fields(
Projections.computed("FILTER", compareDoc),
Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP))));
pipeline.add(Aggregates.match(new Document("FILTER", true)));
pipeline.add(Aggregates.project(Projections.fields(
Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP))));
return true;
}
return false;
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Given that the current state of the pipeline produces data that can be
* interpreted as triples, add a project step to map each result from the
* intermediate result structure to a structure that can be stored in the
* triple store. Does not modify the internal pipeline, which will still
* produce intermediate results suitable for query evaluation.
* @param timestamp Attach this timestamp to the resulting triples.
* @param requireNew If true, add an additional step to check constructed
* triples against existing triples and only include new ones in the
* result. Adds a potentially expensive $lookup step.
* @throws IllegalStateException if the results produced by the current
* pipeline do not have variable names allowing them to be interpreted as
* triples (i.e. "subject", "predicate", and "object").
*/
public List<Bson> getTriplePipeline(long timestamp, boolean requireNew) {
if (!assuredBindingNames.contains(SUBJECT)
|| !assuredBindingNames.contains(PREDICATE)
|| !assuredBindingNames.contains(OBJECT)) {
throw new IllegalStateException("Current pipeline does not produce "
+ "records that can be converted into triples.\n"
+ "Required variable names: <" + SUBJECT + ", " + PREDICATE
+ ", " + OBJECT + ">\nCurrent variable names: "
+ assuredBindingNames);
}
List<Bson> triplePipeline = new LinkedList<>(pipeline);
List<Bson> fields = new LinkedList<>();
fields.add(Projections.computed(SUBJECT, valueFieldExpr(SUBJECT)));
fields.add(Projections.computed(SUBJECT_HASH, hashFieldExpr(SUBJECT)));
fields.add(Projections.computed(PREDICATE, valueFieldExpr(PREDICATE)));
fields.add(Projections.computed(PREDICATE_HASH, hashFieldExpr(PREDICATE)));
fields.add(Projections.computed(OBJECT, valueFieldExpr(OBJECT)));
fields.add(Projections.computed(OBJECT_HASH, hashFieldExpr(OBJECT)));
fields.add(Projections.computed(OBJECT_TYPE,
ConditionalOperators.ifNull(typeFieldExpr(OBJECT), DEFAULT_TYPE)));
fields.add(Projections.computed(CONTEXT, DEFAULT_CONTEXT));
fields.add(Projections.computed(STATEMENT_METADATA, DEFAULT_METADATA));
fields.add(DEFAULT_DV);
fields.add(Projections.computed(TIMESTAMP, new Document("$literal", timestamp)));
fields.add(Projections.computed(LEVEL, new Document("$add", Arrays.asList("$" + LEVEL, 1))));
triplePipeline.add(Aggregates.project(Projections.fields(fields)));
if (requireNew) {
// Prune any triples that already exist in the data store
String collectionName = collection.getNamespace().getCollectionName();
Bson includeAll = Projections.include(SUBJECT, SUBJECT_HASH,
PREDICATE, PREDICATE_HASH, OBJECT, OBJECT_HASH,
OBJECT_TYPE, CONTEXT, STATEMENT_METADATA,
DOCUMENT_VISIBILITY, TIMESTAMP, LEVEL);
List<Bson> eqTests = new LinkedList<>();
eqTests.add(new Document("$eq", Arrays.asList("$$this." + PREDICATE_HASH, "$" + PREDICATE_HASH)));
eqTests.add(new Document("$eq", Arrays.asList("$$this." + OBJECT_HASH, "$" + OBJECT_HASH)));
Bson redundantFilter = new Document("$filter", new Document("input", "$" + JOINED_TRIPLE)
.append("as", "this").append("cond", new Document("$and", eqTests)));
triplePipeline.add(Aggregates.lookup(collectionName, SUBJECT_HASH,
SUBJECT_HASH, JOINED_TRIPLE));
String numRedundant = "REDUNDANT";
triplePipeline.add(Aggregates.project(Projections.fields(includeAll,
Projections.computed(numRedundant, new Document("$size", redundantFilter)))));
triplePipeline.add(Aggregates.match(Filters.eq(numRedundant, 0)));
triplePipeline.add(Aggregates.project(Projections.fields(includeAll)));
}
return triplePipeline;
}
项目:ibm-performance-monitor
文件:MongoUtilitiesTest.java
@Test
public void testFilterParametersBsonDocumentMatchIn()
{
Bson match = Aggregates.match(Filters.in("dateKey", "593898622313868b72a296ad", "593898622313868b72a296b4"));
BsonDocument filterParameters = MongoUtilities.filterParameters(match);
assertEquals("{ \"$match\" : { \"dateKey\" : { \"$in\" : [\"*?\"] } } }", filterParameters.toString());
}
项目:ibm-performance-monitor
文件:MongoUtilitiesTest.java
@Test
public void testFilterParametersBsonDocumentGroup()
{
Bson group = Aggregates.group("_id", Accumulators.sum("totalQuantity", "$quantity"));
BsonDocument filterParameters = MongoUtilities.filterParameters(group);
assertEquals("{ \"$group\" : { \"_id\" : \"_id\", \"totalQuantity\" : { \"$sum\" : \"$quantity\" } } }", filterParameters.toString());
}
项目:ibm-performance-monitor
文件:MongoUtilitiesTest.java
@Test
public void testFilterParametersBsonDocumentLookup()
{
Bson lookup = Aggregates.lookup("fromField", "localField", "foreignField", "as");
BsonDocument filterParameters = MongoUtilities.filterParameters(lookup);
assertEquals("{ \"$lookup\" : { \"from\" : \"fromField\", \"localField\" : \"localField\", \"foreignField\" : \"foreignField\", \"as\" : \"as\" } }", filterParameters.toString());
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Add a step to the end of the current pipeline which prunes the results
* according to the recorded derivation level of their sources. At least one
* triple that was used to construct the result must have a derivation level
* at least as high as the parameter, indicating that it was derived via
* that many steps from the original data. (A value of zero is equivalent to
* input data that was not derived at all.) Use in conjunction with
* getTriplePipeline (which sets source level for generated triples) to
* avoid repeatedly deriving the same results.
* @param requiredLevel Required derivation depth. Reject a solution to the
* query if all of the triples involved in producing that solution have a
* lower derivation depth than this. If zero, does nothing.
*/
public void requireSourceDerivationDepth(int requiredLevel) {
if (requiredLevel > 0) {
pipeline.add(Aggregates.match(new Document(LEVEL,
new Document("$gte", requiredLevel))));
}
}
项目:incubator-rya
文件:AggregationPipelineQueryNode.java
/**
* Add a step to the end of the current pipeline which prunes the results
* according to the timestamps of their sources. At least one triple that
* was used to construct the result must have a timestamp at least as
* recent as the parameter. Use in iterative applications to avoid deriving
* solutions that would have been generated in an earlier iteration.
* @param t Minimum required timestamp. Reject a solution to the query if
* all of the triples involved in producing that solution have an earlier
* timestamp than this.
*/
public void requireSourceTimestamp(long t) {
pipeline.add(Aggregates.match(new Document(TIMESTAMP,
new Document("$gte", t))));
}