@Override public StorageWriteResult write(List<DataContainer> containers) throws Exception { MongoCollection<Document> collection = MongoStorageAdapter.getCollection(MongoStorageAdapter.collectionEventRecordsName); // Build an array of documents List<WriteModel<Document>> documents = new ArrayList<>(); for (DataContainer container : containers) { Document document = documentFromView(container); //Prism.getLogger().debug(DataUtil.jsonFromDataView(container).toString()); // TTL document.append("Expires", DateUtil.parseTimeStringToDate(expiration, true)); // Insert documents.add(new InsertOneModel<>(document)); } // Write collection.bulkWrite(documents, bulkWriteOptions); // @todo implement real results, BulkWriteResult return new StorageWriteResult(); }
@Override public void storeAll(Map<String, Supplement> map) { log.info("storeAll"); List<InsertOneModel> batch = new LinkedList<InsertOneModel>(); for (Map.Entry<String, Supplement> entry : map.entrySet()) { String key = entry.getKey(); Supplement value = entry.getValue(); batch.add(new InsertOneModel( new Document("name", value.getName()).append("price", value.getPrice()) .append("_id", key))); } this.collection.bulkWrite(batch, new BulkWriteOptions().ordered(false)); }
private List<WriteModel<Document>> insertOneWithBulk() { List<WriteModel<Document>> list = new ArrayList<WriteModel<Document>>(); list.add(new InsertOneModel<Document>(createDeleteDocument())); coll.bulkWrite(list); return list; }
public int bulkInsert(final Collection<?> entities, final BulkWriteOptions options) { final List<InsertOneModel<Document>> list = new ArrayList<>(entities.size()); for (Object entity : entities) { if (entity instanceof Document) { list.add(new InsertOneModel<Document>((Document) entity)); } else { list.add(new InsertOneModel<Document>(MongoDBExecutor.toDocument(entity))); } } return bulkWrite(list, options).getInsertedCount(); }
public MongoAdminClient importJsonFile(String fileNamePath) { int count = 0; int batch = 100; List<InsertOneModel<Document>> docs = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(fileNamePath))) { String line; while ((line = br.readLine()) != null) { docs.add(new InsertOneModel<>(Document.parse(line))); count++; if (count == batch) { this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false)); docs.clear(); count = 0; } } } catch (IOException fnfe) { fnfe.printStackTrace(); } if (count > 0) { collection.bulkWrite(docs, new BulkWriteOptions().ordered(false)); } return this; }
public MongoAdminClient importJsonInputStream(InputStream fileInputStream) { int count = 0; int batch = 100; List<InsertOneModel<Document>> docs = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream))) { String line; while ((line = br.readLine()) != null) { docs.add(new InsertOneModel<>(Document.parse(line))); count++; if (count == batch) { this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false)); docs.clear(); count = 0; } } } catch (IOException fnfe) { fnfe.printStackTrace(); } if (count > 0) { collection.bulkWrite(docs, new BulkWriteOptions().ordered(false)); } return this; }
private TestRecord insertNewRecord(List<WriteModel<Document>> bulkWriter) { int[] arr = new int[2]; arr[0] = testOpts.arraytop; arr[1] = testOpts.arraynext; TestRecord tr = createNewRecord(); bulkWriter.add(new InsertOneModel<Document>(tr.internalDoc)); return tr; }
private List<WriteModel<JsonObject>> convertBulkOperations(List<BulkOperation> operations) { List<WriteModel<JsonObject>> result = new ArrayList<>(operations.size()); for (BulkOperation bulkOperation : operations) { switch (bulkOperation.getType()) { case DELETE: Bson bsonFilter = toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())); if (bulkOperation.isMulti()) { result.add(new DeleteManyModel<>(bsonFilter)); } else { result.add(new DeleteOneModel<>(bsonFilter)); } break; case INSERT: result.add(new InsertOneModel<>(encodeKeyWhenUseObjectId(bulkOperation.getDocument()))); break; case REPLACE: result.add(new ReplaceOneModel<>(toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())), bulkOperation.getDocument(), new com.mongodb.client.model.UpdateOptions().upsert(bulkOperation.isUpsert()))); break; case UPDATE: Bson filter = toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())); Bson document = toBson(encodeKeyWhenUseObjectId(bulkOperation.getDocument())); com.mongodb.client.model.UpdateOptions updateOptions = new com.mongodb.client.model.UpdateOptions() .upsert(bulkOperation.isUpsert()); if (bulkOperation.isMulti()) { result.add(new UpdateManyModel<>(filter, document, updateOptions)); } else { result.add(new UpdateOneModel<>(filter, document, updateOptions)); } break; default: throw new IllegalArgumentException("Unknown bulk operation type: " + bulkOperation.getClass()); } } return result; }
public void Create(Document doc) { ops.add(new InsertOneModel<Document>(doc)); FlushOpsIfFull(); }
@Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final List<FlowFile> flowFiles = session.get(batchSize); if (flowFiles == null) { return; } ComponentLog logger = this.getLogger(); final String source = context.getProperty(INSERT_COMMAND_SOURCE).getValue(); List<InsertOneModel<Document>> documentsToInsert = new ArrayList<>(flowFiles.size()); /* * Collect FlowFiles that are marked for bulk insertion. Matches same * index as documentsToInsert */ List<FlowFile> flowFilesAttemptedInsert = new ArrayList<>(); logger.debug("Attempting to batch insert {} FlowFiles", new Object[]{flowFiles.size()}); for (FlowFile flowFile : flowFiles) { final String payload; try { switch (source) { case "content": final String[] result = new String[1]; session.read(flowFile, (in) -> result[0] = IOUtils.toString(in)); payload = result[0]; break; case "attribute": String command = context.getProperty(INSERT_COMMAND_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); payload = flowFile.getAttribute(command); break; default: throw new Exception("Invalid source choice: " + source); } BasicDBObject parse = (BasicDBObject) JSON.parse(payload); Document documentToInsert = new Document(parse.toMap()); logger.debug("Creating InsertOneModel with Document {}", new Object[]{documentToInsert}); InsertOneModel<Document> iom = new InsertOneModel<>(documentToInsert); documentsToInsert.add(iom); } catch (Exception e) { /* * If any FlowFiles error on translation to a Mongo Object, they were not added to * the documentsToInsert, so route to failure immediately */ logger.error("Encountered exception while processing FlowFile for Mongo Storage. Routing to failure and continuing.", e); FlowFile failureFlowFile = session.putAttribute(flowFile, "mongo.exception", e.getMessage()); session.transfer(failureFlowFile, REL_FAILURE); continue; } // add to the ordered list so we can determine which fail on bulk // write flowFilesAttemptedInsert.add(flowFile); } /* * Perform the bulk insert if any documents are there to insert */ if (!documentsToInsert.isEmpty()) { logger.debug("Attempting to bulk insert {} documents", new Object[]{documentsToInsert.size()}); Map<Integer, BulkWriteError> writeErrors = executeBulkInsert(documentsToInsert); /* * Route FlowFiles to the proper relationship based on the returned * errors */ logger.debug("Evaluating FlowFile routing against {} Write Errors for {} FlowFiles", new Object[]{writeErrors.size(), flowFilesAttemptedInsert.size()}); transferFlowFiles(session, flowFilesAttemptedInsert, writeErrors); } }
protected Map<Integer, BulkWriteError> executeBulkInsert(List<InsertOneModel<Document>> documentsToInsert) { // mapping of array indices for flow file errors Map<Integer, BulkWriteError> writeErrors = new HashMap<>(); try { collection.bulkWrite(documentsToInsert, writeOptions); } catch (MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError docError : errors) { writeErrors.put(docError.getIndex(), docError); } getLogger().warn("Unable to perform bulk inserts", e); } return writeErrors; }