public void updateProcessingStatus(Collection<String> statusesToApplyTo, Stream<Submittable> submittables, Submission submission, ProcessingStatusEnum processingStatusEnum) { BulkOperations ops = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, ProcessingStatus.class); Update update = update("status", processingStatusEnum.name()); submittables .map(submittable -> query( where("submissionId").is(submission.getId()) .and("submittableId").is(submittable.getId()) .and("status").in(statusesToApplyTo) )) .forEach(query -> ops.updateOne(query, update) ); BulkWriteResult writeResult = ops.execute(); logger.info("Setting processing status to {} for certs for {} items in submission {}", processingStatusEnum, writeResult.getModifiedCount(), submission.getId() ); }
public void applyProcessingCertificates(ProcessingCertificateEnvelope envelope, Class submittableClass) { Assert.notNull(envelope); Assert.notNull(envelope.getSubmissionId()); Assert.notNull(envelope.getProcessingCertificates()); BulkOperations ops = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, submittableClass); boolean haveAccession = false; for (ProcessingCertificate certificate : envelope.getProcessingCertificates()) { Query query = query( where("_id").is(certificate.getSubmittableId()) .and("submission.$id").is(envelope.getSubmissionId()) ); Update update = new Update(); boolean haveUpdates = false; if (certificate.getAccession() != null) { update.set("accession", certificate.getAccession()); haveUpdates = true; haveAccession = true; } if (haveUpdates) { ops.updateOne(query, update); } } if (haveAccession) { BulkWriteResult writeResult = ops.execute(); logger.info("Applying certs for {} in submission {}, {} certs, changed {}", submittableClass, envelope.getSubmissionId(), envelope.getProcessingCertificates().size(), writeResult.getModifiedCount() ); } }
/** * MongoDB bulk save * * @return */ protected boolean bulkSave2(List<DBObject> objects) { if (objects.size() == 0) return false; BulkWriteOperation builder = getCollection().initializeUnorderedBulkOperation(); for (DBObject doc : objects) { builder.insert(doc); } BulkWriteResult result = builder.execute(); return result.isAcknowledged(); }
private void executeBatchUpdate(BulkWriteOperation batchOp, BasicDBList fullBatch) { if(batchOp != null){ BulkWriteResult result = batchOp.execute(); logger.debug("Wrote sample batch - sent {} : updated {}", fullBatch.size(), result.getModifiedCount()); } }
private void executeBatchWrite(BulkWriteOperation batchOp, BasicDBList fullBatch) { if(batchOp != null){ BulkWriteResult result = batchOp.execute(); logger.debug("Wrote sample batch - sent {} : inserted {}", fullBatch.size(), result.getInsertedCount()); } }
@Override public void putAll(GridCacheTx tx, Map<? extends String, ? extends YagoLabel> map) throws GridException { BulkWriteOperation bulk = labelColl.initializeUnorderedBulkOperation(); for (Entry<? extends String, ? extends YagoLabel> entry : map.entrySet()) { BasicDBObject dbo = toDBObject(entry.getKey(), entry.getValue()); bulk.find(new BasicDBObject("_id", entry.getKey())).upsert().replaceOne(dbo); } BulkWriteResult writeResult = bulk.execute(); log.debug("Put {} documents: inserted={}, modified={}, upserted={}", map.size(), writeResult.getInsertedCount(), writeResult.getModifiedCount(), writeResult.getUpserts().size()); }
/** * Runs a batch update using bwo * * @param bwo The bulk write operation * @param writeConcern * @param batchSize * @param results The results are populated during this call with an error for each failed doc * @param logger The logger * * @return If returns true, all docs are updated. Otherwise, there * are some failed docs, and concurrent update error detection * should be called */ public static boolean batchUpdate(BulkWriteOperation bwo, WriteConcern writeConcern, int batchSize, Map<Integer,Error> results, Logger logger) { boolean ret=true; BulkWriteResult writeResult; logger.debug("attemptToUpdate={}",batchSize); try { if(writeConcern==null) { writeResult=bwo.execute(); } else { writeResult=bwo.execute(writeConcern); } logger.debug("writeResult={}",writeResult); if(batchSize==writeResult.getMatchedCount()) { logger.debug("Successful update"); } else { logger.warn("notUpdated={}",batchSize-writeResult.getMatchedCount()); ret=false; } } catch (BulkWriteException e) { List<BulkWriteError> writeErrors=e.getWriteErrors(); if(writeErrors!=null) { for(BulkWriteError we:writeErrors) { if (MongoCrudConstants.isDuplicate(we.getCode())) { results.put(we.getIndex(), Error.get("update", MongoCrudConstants.ERR_DUPLICATE, we.getMessage())); } else { results.put(we.getIndex(), Error.get("update", MongoCrudConstants.ERR_SAVE_ERROR, we.getMessage())); } } } ret=false; } return ret; }