/** * MongoDB bulk save * * @return */ protected boolean bulkSave2(List<DBObject> objects) { if (objects.size() == 0) return false; BulkWriteOperation builder = getCollection().initializeUnorderedBulkOperation(); for (DBObject doc : objects) { builder.insert(doc); } BulkWriteResult result = builder.execute(); return result.isAcknowledged(); }
private void executeBatchUpdate(BulkWriteOperation batchOp, BasicDBList fullBatch) { if(batchOp != null){ BulkWriteResult result = batchOp.execute(); logger.debug("Wrote sample batch - sent {} : updated {}", fullBatch.size(), result.getModifiedCount()); } }
private void executeBatchWrite(BulkWriteOperation batchOp, BasicDBList fullBatch) { if(batchOp != null){ BulkWriteResult result = batchOp.execute(); logger.debug("Wrote sample batch - sent {} : inserted {}", fullBatch.size(), result.getInsertedCount()); } }
@Override public void putAll(GridCacheTx tx, Map<? extends String, ? extends YagoLabel> map) throws GridException { BulkWriteOperation bulk = labelColl.initializeUnorderedBulkOperation(); for (Entry<? extends String, ? extends YagoLabel> entry : map.entrySet()) { BasicDBObject dbo = toDBObject(entry.getKey(), entry.getValue()); bulk.find(new BasicDBObject("_id", entry.getKey())).upsert().replaceOne(dbo); } BulkWriteResult writeResult = bulk.execute(); log.debug("Put {} documents: inserted={}, modified={}, upserted={}", map.size(), writeResult.getInsertedCount(), writeResult.getModifiedCount(), writeResult.getUpserts().size()); }
public static final <T extends Model> BulkWriteOperation fetchBulkWriteOperation(Class<T> modelClass) { return App.get().registryGet(bulkWriteOperationKey(modelClass)); }
public static final <T extends Model> void stashBulkWriteOperation(Class<T> modelClass, BulkWriteOperation bulkWriteOp) { App.get().registryPut(bulkWriteOperationKey(modelClass), bulkWriteOp); }
@Override public void addToBulkOperation(BulkWriteOperation bulk) { bulk.insert(dbObject); }
@Override public void addToBulkOperation(BulkWriteOperation bulk) { bulk.find(query).update(statement); }
@Override public void addToBulkOperation(BulkWriteOperation bulk) {}
@Override public void addToBulkOperation(BulkWriteOperation bulk) { bulk.find(query).upsert().update(statement); }
private List<Integer> retryFailedDocs(List<Integer> failedDocs,CommitInfo ci) { List<Integer> newFailedDocs=new ArrayList<>(failedDocs.size()); for(Integer index:failedDocs) { BatchDoc doc=batch.get(index); // Read the doc DBObject findQuery=new BasicDBObject("_id",doc.id); if(cfg.isReevaluateQueryForRetry()) { if(query!=null) { List<DBObject> list=new ArrayList<>(2); list.add(findQuery); list.add(query); findQuery=new BasicDBObject("$and",list); } } DBObject updatedDoc=collection.findOne(findQuery); if(updatedDoc!=null) { // if updatedDoc is null, doc is lost. Error remains DBObject newDoc=reapplyChanges(index,updatedDoc); // Make sure reapplyChanges does not insert references // of objects from the old document into the // updatedDoc. That updates both copies of // documents. Use deepCopy if(newDoc!=null) { DBObject replaceQuery=writeReplaceQuery(updatedDoc); // Update the doc ver to our doc ver. This doc is here // because its docVer is not set to our docver, so // this is ok DocVerUtil.setDocVer(newDoc,docVer); // Using bulkwrite here with one doc to use the // findAndReplace API, which is lacking in // DBCollection BulkWriteOperation nestedBwo=collection.initializeUnorderedBulkOperation(); nestedBwo.find(replaceQuery).replaceOne(newDoc); try { if(nestedBwo.execute().getMatchedCount()==1) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Successfully retried to update a doc: replaceQuery={} newDoc={}", replaceQuery, newDoc); } // Successful update ci.errors.remove(index); } } catch(Exception e) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Failed retrying to update a doc: replaceQuery={} newDoc={} error={}", replaceQuery, newDoc, e.toString()); } newFailedDocs.add(index); } } else { // reapllyChanges removed the doc from the resultset ci.errors.remove(index); } } else { // Doc no longer exists if (LOGGER.isDebugEnabled()) { LOGGER.debug("Removing doc id={} from retry queue, because it does not exist or match anymore", index); } ci.errors.remove(index); ci.lostDocs.add(index); } } return newFailedDocs; }
/** * Runs a batch update using bwo * * @param bwo The bulk write operation * @param writeConcern * @param batchSize * @param results The results are populated during this call with an error for each failed doc * @param logger The logger * * @return If returns true, all docs are updated. Otherwise, there * are some failed docs, and concurrent update error detection * should be called */ public static boolean batchUpdate(BulkWriteOperation bwo, WriteConcern writeConcern, int batchSize, Map<Integer,Error> results, Logger logger) { boolean ret=true; BulkWriteResult writeResult; logger.debug("attemptToUpdate={}",batchSize); try { if(writeConcern==null) { writeResult=bwo.execute(); } else { writeResult=bwo.execute(writeConcern); } logger.debug("writeResult={}",writeResult); if(batchSize==writeResult.getMatchedCount()) { logger.debug("Successful update"); } else { logger.warn("notUpdated={}",batchSize-writeResult.getMatchedCount()); ret=false; } } catch (BulkWriteException e) { List<BulkWriteError> writeErrors=e.getWriteErrors(); if(writeErrors!=null) { for(BulkWriteError we:writeErrors) { if (MongoCrudConstants.isDuplicate(we.getCode())) { results.put(we.getIndex(), Error.get("update", MongoCrudConstants.ERR_DUPLICATE, we.getMessage())); } else { results.put(we.getIndex(), Error.get("update", MongoCrudConstants.ERR_SAVE_ERROR, we.getMessage())); } } } ret=false; } return ret; }
@Override public void exportDocuments(final Iterable<JSONObject> it, final int tell) { if (it == null) { throw new NullPointerException("it"); } if (tell <= 0) { throw new IllegalArgumentException("tell <= 0"); } BulkWriteOperation builder = coll.initializeOrderedBulkOperation(); //coll.initializeUnorderedBulkOperation(); int tot = 0; int bulkNum = 0; for (JSONObject obj : it) { if (++tot % tell == 0) { System.out.println("+++" + tot); } try { coll.insert(convertToDBObj(obj), WriteConcern.SAFE); } catch(IllegalArgumentException iae) { Logger.getLogger(this.getClass().getName()) .severe(iae.getMessage()); } } /*for (JSONObject obj : it) { if (++tot % tell == 0) { System.out.println("+++" + tot); } bulkNum++; builder.insert(convertToDBObj(obj)); if (bulkNum >= MAX_BULK_DOCS) { System.out.print("Sending documents to MongoDB - "); final BulkWriteResult result = builder.execute(); final int inserted = result.getInsertedCount(); System.out.println("OK"); if (inserted < bulkNum) { final String msg = "Insertion error: inserted[" + inserted + "] expected[" + bulkNum + "]"; Logger.getLogger(Json2Mongo.class.getName()) .log(Level.SEVERE, msg); } bulkNum = 0; builder = coll.initializeOrderedBulkOperation(); //builder = coll.initializeUnorderedBulkOperation(); } } if (bulkNum > 0) { final BulkWriteResult result = builder.execute(); final int inserted = result.getInsertedCount(); if (inserted < bulkNum) { final String msg = "Insertion error: inserted[" + inserted + "] expected[" + bulkNum + "]"; Logger.getLogger(Json2Mongo.class.getName()) .log(Level.SEVERE, msg); } }*/ }
private void storeBatch(BasicDBList sample, BasicDBList resultList) { // The batch may span collection splits, so maintain // a current collection and batch operation BulkWriteOperation currentOp = null; int currentOpOffset = 0; int sampleIdx = 0; DBCollection currentColl = null; logger.debug("Received batch of size : {}", sample.size()); try{ for(; sampleIdx < sample.size(); ++sampleIdx){ // prepare the sample to batch BasicDBObject doc = (BasicDBObject) (sample.get(sampleIdx)); SampleId _id = this.idFactory.createId(doc); doc.put(Sample.ID_KEY, _id.toObject()); resultList.add(_id.toObject()); long timestamp = doc.getLong(Sample.TS_KEY); DBCollection collection = collectionAllocator.getCollection(timestamp); // if the collection has changed, commit the current // batch to the collection and start new if(collection.equals(currentColl) == false){ executeBatchWrite(currentOp, sample); currentColl = collection; currentOp = collection.initializeUnorderedBulkOperation(); currentOpOffset = sampleIdx; } // put the doc insert into the batch currentOp.insert(doc); } // Finalize the last batch executeBatchWrite(currentOp, sample); } catch(Exception ex){ // One of the bulk writes has failed BasicDBList failedDocs = new BasicDBList(); if(ex instanceof BulkWriteException){ // We need to figure out the failures and remove the writes // that worked from the batch int batchSize = sampleIdx - currentOpOffset; BulkWriteException bwex = (BulkWriteException)ex; int errorCount = bwex.getWriteErrors().size(); if(errorCount < batchSize){ for(BulkWriteError we : bwex.getWriteErrors()){ failedDocs.add(sample.get(currentOpOffset + we.getIndex())); } // since we have accounted for the failures in the current // batch, move the offset forward to the last sample currentOpOffset = sampleIdx; } } // If this happened part way through the batch, send remaining // docs to failed list and update sample to contain only failed docs if(currentOpOffset > 0){ for(; currentOpOffset < sample.size(); ++currentOpOffset) failedDocs.add(sample.get(currentOpOffset)); sample.clear(); sample.addAll(failedDocs); } // TODO : we also need to handle the result Ids here as well, // the failed doc Ids must be pulled from the resultList throw ex; } }
@Override public void process(Iterable<StratioStreamingMessage> messages) throws Exception { Integer partitionSize = maxBatchSize; if (partitionSize == null || partitionSize <= 0){ partitionSize = Iterables.size(messages); } Iterable<List<StratioStreamingMessage>> partitionIterables = Iterables.partition(messages, partitionSize); try { for (List<StratioStreamingMessage> messageList : partitionIterables) { Map<String, BulkWriteOperation> elementsToInsert = new HashMap<String, BulkWriteOperation>(); for (StratioStreamingMessage event : messageList) { BasicDBObject object = new BasicDBObject(TIMESTAMP_FIELD, event.getTimestamp()); for (ColumnNameTypeValue columnNameTypeValue : event.getColumns()) { object.append(columnNameTypeValue.getColumn(), columnNameTypeValue.getValue()); } BulkWriteOperation bulkInsertOperation = elementsToInsert.get(event.getStreamName()); if (bulkInsertOperation == null) { bulkInsertOperation = getDB().getCollection(event.getStreamName()) .initializeUnorderedBulkOperation(); elementsToInsert.put(event.getStreamName(), bulkInsertOperation); getDB().getCollection(event.getStreamName()) .createIndex(new BasicDBObject(TIMESTAMP_FIELD, -1)); } bulkInsertOperation.insert(object); } for (Entry<String, BulkWriteOperation> stratioStreamingMessage : elementsToInsert.entrySet()) { stratioStreamingMessage.getValue().execute(); } } } catch (Exception e) { log.error("Error saving in Mongo: " + e.getMessage()); } }
/** * Add this operation to a bulk operation * @param bulk Bulk operation */ public abstract void addToBulkOperation(BulkWriteOperation bulk);