private void addWriteErrors( final List<BulkWriteError> wes, final Representation rep) { wes.stream().forEach(error -> { Representation nrep = new Representation(); nrep.addProperty("index", new BsonInt32(error.getIndex())); nrep.addProperty("mongodbErrorCode", new BsonInt32(error.getCode())); nrep.addProperty("httpStatus", new BsonInt32( ResponseHelper.getHttpStatusFromErrorCode( error.getCode()))); nrep.addProperty("message", new BsonString( ResponseHelper.getMessageFromErrorCode( error.getCode()))); rep.addRepresentation("rh:error", nrep); }); }
protected void transferFlowFiles(ProcessSession session, List<FlowFile> flowFilesAttemptedUpdate, Map<Integer, BulkWriteError> writeErrors) { ComponentLog logger = this.getLogger(); if (!writeErrors.isEmpty()) { logger.debug("Encountered errors on write"); /* * For each Bulk Updated Document, see if it encountered an error. * If it had an error (based on index in the list), add the Mongo * Error to the FlowFile attribute and route to Failure. Otherwise, * route to Success */ for (int i = 0; i < flowFilesAttemptedUpdate.size(); i++) { FlowFile ff = flowFilesAttemptedUpdate.get(i); if (writeErrors.containsKey(i)) { logger.debug("Found error for FlowFile index {}", new Object[]{i}); // Add the error information to the FlowFileAttributes, and // route to failure BulkWriteError bwe = writeErrors.get(i); logger.debug("FlowFile ID {} had Error Code {} and Message {}", new Object[]{ff.getId(), bwe.getCode(), bwe.getMessage()}); Map<String, String> failureAttributes = getAttributesForWriteFailure(bwe); ff = session.putAllAttributes(ff, failureAttributes); session.transfer(ff, REL_FAILURE); } else { logger.debug("Routing FlowFile ID {} with Index {} to Success", new Object[]{ff.getId(), i}); // Flow File did not have error, so route to success session.transfer(ff, REL_SUCCESS); } } } else { logger.debug("No errors encountered on bulk write, so routing all to success"); // All succeeded, so write all to success session.transfer(flowFilesAttemptedUpdate, REL_SUCCESS); } }
/** * Ensures current exception has been generated due to a duplicate (primary) key. * Differentiates between Fongo and Mongo exceptions since the behaviour under these databases * is different. */ public static void assertDuplicateKeyException(Throwable exception) { Preconditions.checkNotNull(exception, "exception"); // unwrap, if necessary exception = exception instanceof MongoException ? exception : exception.getCause(); // fongo throws directly DuplicateKeyException if (exception instanceof DuplicateKeyException) return; // MongoDB throws custom exception if (exception instanceof MongoCommandException) { String codeName = ((MongoCommandException) exception).getResponse().get("codeName").asString().getValue(); int errorCode = ((MongoCommandException) exception).getErrorCode(); check(codeName).is("DuplicateKey"); check(errorCode).is(11000); // all good here (can return) return; } // for bulk writes as well if (exception instanceof MongoBulkWriteException) { List<BulkWriteError> errors = ((MongoBulkWriteException) exception).getWriteErrors(); check(errors).hasSize(1); check(errors.get(0).getCode()).is(11000); check(errors.get(0).getMessage()).contains("duplicate key"); return; } // if we got here means there is a problem (no duplicate key exception) fail("Should get duplicate key exception after " + exception); }
protected Map<Integer, BulkWriteError> executeBulkUpdate(List<UpdateManyModel<Document>> documentsToUpdate) { // mapping of array indices for flow file errors Map<Integer, BulkWriteError> writeErrors = new HashMap<>(); try { collection.bulkWrite(documentsToUpdate); } catch (MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError docError : errors) { writeErrors.put(docError.getIndex(), docError); } getLogger().warn("Error occurred during bulk write", e); } return writeErrors; }
protected Map<String, String> getAttributesForWriteFailure(BulkWriteError bwe) { Map<String, String> failureAttributes = new HashMap<>(); failureAttributes.put("mongo.errorcode", String.valueOf(bwe.getCode())); failureAttributes.put("mongo.errormessage", bwe.getMessage()); return failureAttributes; }
@Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final List<FlowFile> flowFiles = session.get(batchSize); if (flowFiles == null) { return; } ComponentLog logger = this.getLogger(); final String source = context.getProperty(INSERT_COMMAND_SOURCE).getValue(); List<InsertOneModel<Document>> documentsToInsert = new ArrayList<>(flowFiles.size()); /* * Collect FlowFiles that are marked for bulk insertion. Matches same * index as documentsToInsert */ List<FlowFile> flowFilesAttemptedInsert = new ArrayList<>(); logger.debug("Attempting to batch insert {} FlowFiles", new Object[]{flowFiles.size()}); for (FlowFile flowFile : flowFiles) { final String payload; try { switch (source) { case "content": final String[] result = new String[1]; session.read(flowFile, (in) -> result[0] = IOUtils.toString(in)); payload = result[0]; break; case "attribute": String command = context.getProperty(INSERT_COMMAND_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); payload = flowFile.getAttribute(command); break; default: throw new Exception("Invalid source choice: " + source); } BasicDBObject parse = (BasicDBObject) JSON.parse(payload); Document documentToInsert = new Document(parse.toMap()); logger.debug("Creating InsertOneModel with Document {}", new Object[]{documentToInsert}); InsertOneModel<Document> iom = new InsertOneModel<>(documentToInsert); documentsToInsert.add(iom); } catch (Exception e) { /* * If any FlowFiles error on translation to a Mongo Object, they were not added to * the documentsToInsert, so route to failure immediately */ logger.error("Encountered exception while processing FlowFile for Mongo Storage. Routing to failure and continuing.", e); FlowFile failureFlowFile = session.putAttribute(flowFile, "mongo.exception", e.getMessage()); session.transfer(failureFlowFile, REL_FAILURE); continue; } // add to the ordered list so we can determine which fail on bulk // write flowFilesAttemptedInsert.add(flowFile); } /* * Perform the bulk insert if any documents are there to insert */ if (!documentsToInsert.isEmpty()) { logger.debug("Attempting to bulk insert {} documents", new Object[]{documentsToInsert.size()}); Map<Integer, BulkWriteError> writeErrors = executeBulkInsert(documentsToInsert); /* * Route FlowFiles to the proper relationship based on the returned * errors */ logger.debug("Evaluating FlowFile routing against {} Write Errors for {} FlowFiles", new Object[]{writeErrors.size(), flowFilesAttemptedInsert.size()}); transferFlowFiles(session, flowFilesAttemptedInsert, writeErrors); } }
protected Map<Integer, BulkWriteError> executeBulkInsert(List<InsertOneModel<Document>> documentsToInsert) { // mapping of array indices for flow file errors Map<Integer, BulkWriteError> writeErrors = new HashMap<>(); try { collection.bulkWrite(documentsToInsert, writeOptions); } catch (MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError docError : errors) { writeErrors.put(docError.getIndex(), docError); } getLogger().warn("Unable to perform bulk inserts", e); } return writeErrors; }
protected void transferFlowFiles(ProcessSession session, List<FlowFile> flowFilesAttemptedInsert, Map<Integer, BulkWriteError> writeErrors) { ComponentLog logger = this.getLogger(); if (!writeErrors.isEmpty()) { logger.debug("Encountered errors on write"); /* * For each Bulk Inserted Document, see if it encountered an error. * If it had an error (based on index in the list), add the Mongo * Error to the FlowFile attribute and route to Failure. Otherwise, * route to Success */ int numFlowfiles = flowFilesAttemptedInsert.size(); for (int i = 0; i < numFlowfiles; i++) { FlowFile ff = flowFilesAttemptedInsert.get(i); if (writeErrors.containsKey(i)) { logger.debug("Found error for FlowFile index {}", new Object[]{i}); // Add the error information to the FlowFileAttributes, and // route to failure BulkWriteError bwe = writeErrors.get(i); logger.debug("FlowFile ID {} had Error Code {} and Message {}", new Object[]{ff.getId(), bwe.getCode(), bwe.getMessage()}); Map<String, String> failureAttributes = getAttributesForWriteFailure(bwe); ff = session.putAllAttributes(ff, failureAttributes); session.transfer(ff, REL_FAILURE); // If ordered=true, mongo will stop processing insert attempts after the first failure in a batch if (writeOptions.isOrdered()) { logger.debug("Routing all flowfiles after FlowFile ID {} with Index {} to Failure because an error occurred and ordered=true", new Object[]{ff.getId(), i}); for (int j = i + 1; j < numFlowfiles; j++) { ff = flowFilesAttemptedInsert.get(j); ff = session.putAttribute(ff, "storeinmongo.error", "Insert not attempted because there was a failure earlier in batch and ordered=true"); session.transfer(ff, REL_FAILURE); } break; } } else { logger.debug("Routing FlowFile ID {} with Index {} to Success", new Object[]{ff.getId(), i}); // Flow File did not have error, so route to success session.transfer(ff, REL_SUCCESS); } } } else { logger.debug("No errors encountered on bulk write, so routing all to success"); // All succeeded, so write all to success session.transfer(flowFilesAttemptedInsert, REL_SUCCESS); } }
/** * MongoBulkWriteException contains error messages that inform * which documents were duplicated. This method catches those ID and print them. * @param e */ private void printDuplicatedException(MongoBulkWriteException e) { List<BulkWriteError> errors = e.getWriteErrors(); for (BulkWriteError error : errors) { String msg = error.getMessage(); Pattern pattern = Pattern.compile("[A-Z0-9]{9}"); // regex for note ID Matcher matcher = pattern.matcher(msg); if (matcher.find()) { // if there were a note ID String noteId = matcher.group(); LOG.warn("Note " + noteId + " not inserted since already exists in MongoDB"); } } }