public Set<VertexEvent> getOutVertexEventSet(final AC tt) { while (true) { try { // db.tEdgeEvents.aggregate([{$match:{"_o":"1","_t":{ $lt : ISODate(0) // }}},{$project:{"_i":1,"_t":1,"_id":0}},{$group:{"_id":"$_i", "_mt": {$min: // "$_t"}}}]) BsonDocument match = new BsonDocument("$match", new BsonDocument(Tokens.OUT_VERTEX, new BsonString(vertex.toString())).append(Tokens.TIMESTAMP, new BsonDocument("$gt", new BsonDateTime(timestamp)))); BsonDocument project = new BsonDocument("$project", new BsonDocument(Tokens.IN_VERTEX, new BsonBoolean(true)) .append(Tokens.TIMESTAMP, new BsonBoolean(true)) .append(Tokens.ID, new BsonBoolean(false))); BsonDocument group = new BsonDocument("$group", new BsonDocument(Tokens.ID, new BsonString("$" + Tokens.IN_VERTEX)).append(Tokens.TIMESTAMP, new BsonDocument("$min", new BsonString("$" + Tokens.TIMESTAMP)))); ArrayList<BsonDocument> aggregateQuery = new ArrayList<BsonDocument>(); aggregateQuery.add(match); aggregateQuery.add(project); aggregateQuery.add(group); HashSet<VertexEvent> ret = new HashSet<VertexEvent>(); Function<BsonDocument, VertexEvent> mapper = new Function<BsonDocument, VertexEvent>() { @Override public VertexEvent apply(BsonDocument d) { String inV = d.getString(Tokens.ID).getValue(); Long t = d.getDateTime(Tokens.TIMESTAMP).getValue(); return new VertexEvent(graph, new ChronoVertex(inV, graph), t); } }; vertex.graph.getEdgeEvents().aggregate(aggregateQuery).map(mapper).into(ret); return ret; } catch (MongoCursorNotFoundException e1) { System.out.println(e1.getErrorMessage()); } } }
private Stream<ChronoEdge> getOutChronoEdgeStream(final BsonArray labels, final int branchFactor, final boolean setParallel) { while (true) { try { HashSet<ChronoEdge> edgeSet = new HashSet<ChronoEdge>(); BsonDocument filter = new BsonDocument(); filter.append(Tokens.OUT_VERTEX, new BsonString(this.toString())); Iterator<BsonDocument> it = graph.getEdgeCollection() .find(new BsonDocument(Tokens.OUT_VERTEX, new BsonString(this.toString()))) .projection(new BsonDocument(Tokens.LABEL, new BsonBoolean(true)) .append(Tokens.IN_VERTEX, new BsonBoolean(true)) .append(Tokens.ID, new BsonBoolean(false))) .iterator(); while (it.hasNext()) { BsonDocument doc = it.next(); String inV = doc.getString(Tokens.IN_VERTEX).getValue(); String label = doc.getString(Tokens.LABEL).getValue(); edgeSet.add(new ChronoEdge(this.toString() + "|" + label + "|" + inV, this.toString(), inV, label, graph)); } if (setParallel) return edgeSet.parallelStream(); else return edgeSet.stream(); } catch (MongoCursorNotFoundException e1) { System.out.println(e1.getErrorMessage()); } } // HashSet<ChronoEdge> edgeSet = new HashSet<ChronoEdge>(); // BsonDocument filter = new BsonDocument(); // BsonDocument inner = new BsonDocument(); // filter.put(Tokens.OUT_VERTEX, new BsonString(this.toString())); // if (labels != null && labels.size() != 0) { // inner.put(Tokens.FC.$in.toString(), labels); // filter.put(Tokens.LABEL, inner); // } // // Iterator<BsonDocument> it = null; // if (branchFactor == Integer.MAX_VALUE) // it = // graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).iterator(); // else // it = // graph.getEdgeCollection().find(filter).projection(Tokens.PRJ_ONLY_ID).limit(branchFactor).iterator(); // // while (it.hasNext()) { // BsonDocument d = it.next(); // edgeSet.add(new ChronoEdge(d.getString(Tokens.ID).getValue(), this.graph)); // } // if (setParallel) // return edgeSet.parallelStream(); // else // return edgeSet.stream(); }