/** * Runs a batch update using bwo * * @param bwo The bulk write operation * @param writeConcern * @param batchSize * @param results The results are populated during this call with an error for each failed doc * @param logger The logger * * @return If returns true, all docs are updated. Otherwise, there * are some failed docs, and concurrent update error detection * should be called */ public static boolean batchUpdate(BulkWriteOperation bwo, WriteConcern writeConcern, int batchSize, Map<Integer,Error> results, Logger logger) { boolean ret=true; BulkWriteResult writeResult; logger.debug("attemptToUpdate={}",batchSize); try { if(writeConcern==null) { writeResult=bwo.execute(); } else { writeResult=bwo.execute(writeConcern); } logger.debug("writeResult={}",writeResult); if(batchSize==writeResult.getMatchedCount()) { logger.debug("Successful update"); } else { logger.warn("notUpdated={}",batchSize-writeResult.getMatchedCount()); ret=false; } } catch (BulkWriteException e) { List<BulkWriteError> writeErrors=e.getWriteErrors(); if(writeErrors!=null) { for(BulkWriteError we:writeErrors) { if (MongoCrudConstants.isDuplicate(we.getCode())) { results.put(we.getIndex(), Error.get("update", MongoCrudConstants.ERR_DUPLICATE, we.getMessage())); } else { results.put(we.getIndex(), Error.get("update", MongoCrudConstants.ERR_SAVE_ERROR, we.getMessage())); } } } ret=false; } return ret; }
private void storeBatch(BasicDBList sample, BasicDBList resultList) { // The batch may span collection splits, so maintain // a current collection and batch operation BulkWriteOperation currentOp = null; int currentOpOffset = 0; int sampleIdx = 0; DBCollection currentColl = null; logger.debug("Received batch of size : {}", sample.size()); try{ for(; sampleIdx < sample.size(); ++sampleIdx){ // prepare the sample to batch BasicDBObject doc = (BasicDBObject) (sample.get(sampleIdx)); SampleId _id = this.idFactory.createId(doc); doc.put(Sample.ID_KEY, _id.toObject()); resultList.add(_id.toObject()); long timestamp = doc.getLong(Sample.TS_KEY); DBCollection collection = collectionAllocator.getCollection(timestamp); // if the collection has changed, commit the current // batch to the collection and start new if(collection.equals(currentColl) == false){ executeBatchWrite(currentOp, sample); currentColl = collection; currentOp = collection.initializeUnorderedBulkOperation(); currentOpOffset = sampleIdx; } // put the doc insert into the batch currentOp.insert(doc); } // Finalize the last batch executeBatchWrite(currentOp, sample); } catch(Exception ex){ // One of the bulk writes has failed BasicDBList failedDocs = new BasicDBList(); if(ex instanceof BulkWriteException){ // We need to figure out the failures and remove the writes // that worked from the batch int batchSize = sampleIdx - currentOpOffset; BulkWriteException bwex = (BulkWriteException)ex; int errorCount = bwex.getWriteErrors().size(); if(errorCount < batchSize){ for(BulkWriteError we : bwex.getWriteErrors()){ failedDocs.add(sample.get(currentOpOffset + we.getIndex())); } // since we have accounted for the failures in the current // batch, move the offset forward to the last sample currentOpOffset = sampleIdx; } } // If this happened part way through the batch, send remaining // docs to failed list and update sample to contain only failed docs if(currentOpOffset > 0){ for(; currentOpOffset < sample.size(); ++currentOpOffset) failedDocs.add(sample.get(currentOpOffset)); sample.clear(); sample.addAll(failedDocs); } // TODO : we also need to handle the result Ids here as well, // the failed doc Ids must be pulled from the resultList throw ex; } }