Java 类com.mongodb.BulkWriteOperation 实例源码
项目:LODVader
文件:DBSuperClass2.java
/**
* MongoDB bulk save
*
* @return
*/
protected boolean bulkSave2(List<DBObject> objects) {
if (objects.size() == 0)
return false;
BulkWriteOperation builder = getCollection().initializeUnorderedBulkOperation();
for (DBObject doc : objects) {
builder.insert(doc);
}
BulkWriteResult result = builder.execute();
return result.isAcknowledged();
}
项目:hvdf
文件:RollupStorageInterceptor.java
private void executeBatchUpdate(BulkWriteOperation batchOp, BasicDBList fullBatch) {
if(batchOp != null){
BulkWriteResult result = batchOp.execute();
logger.debug("Wrote sample batch - sent {} : updated {}",
fullBatch.size(), result.getModifiedCount());
}
}
项目:hvdf
文件:RawStorageInterceptor.java
private void executeBatchWrite(BulkWriteOperation batchOp, BasicDBList fullBatch) {
if(batchOp != null){
BulkWriteResult result = batchOp.execute();
logger.debug("Wrote sample batch - sent {} : inserted {}",
fullBatch.size(), result.getInsertedCount());
}
}
项目:lumen-kb
文件:YagoLabelCacheStore.java
@Override
public void putAll(GridCacheTx tx,
Map<? extends String, ? extends YagoLabel> map)
throws GridException {
BulkWriteOperation bulk = labelColl.initializeUnorderedBulkOperation();
for (Entry<? extends String, ? extends YagoLabel> entry : map.entrySet()) {
BasicDBObject dbo = toDBObject(entry.getKey(), entry.getValue());
bulk.find(new BasicDBObject("_id", entry.getKey())).upsert().replaceOne(dbo);
}
BulkWriteResult writeResult = bulk.execute();
log.debug("Put {} documents: inserted={}, modified={}, upserted={}",
map.size(), writeResult.getInsertedCount(), writeResult.getModifiedCount(), writeResult.getUpserts().size());
}
项目:geeCommerce-Java-Shop-Software-and-PIM
文件:Mongo.java
public static final <T extends Model> BulkWriteOperation fetchBulkWriteOperation(Class<T> modelClass) {
return App.get().registryGet(bulkWriteOperationKey(modelClass));
}
项目:geeCommerce-Java-Shop-Software-and-PIM
文件:Mongo.java
public static final <T extends Model> void stashBulkWriteOperation(Class<T> modelClass,
BulkWriteOperation bulkWriteOp) {
App.get().registryPut(bulkWriteOperationKey(modelClass), bulkWriteOp);
}
项目:storm-mongodb
文件:Insert.java
@Override
public void addToBulkOperation(BulkWriteOperation bulk) {
bulk.insert(dbObject);
}
项目:storm-mongodb
文件:Update.java
@Override
public void addToBulkOperation(BulkWriteOperation bulk) {
bulk.find(query).update(statement);
}
项目:storm-mongodb
文件:Query.java
@Override
public void addToBulkOperation(BulkWriteOperation bulk) {}
项目:storm-mongodb
文件:Upsert.java
@Override
public void addToBulkOperation(BulkWriteOperation bulk) {
bulk.find(query).upsert().update(statement);
}
项目:lightblue-mongo
文件:MongoSafeUpdateProtocol.java
private List<Integer> retryFailedDocs(List<Integer> failedDocs,CommitInfo ci) {
List<Integer> newFailedDocs=new ArrayList<>(failedDocs.size());
for(Integer index:failedDocs) {
BatchDoc doc=batch.get(index);
// Read the doc
DBObject findQuery=new BasicDBObject("_id",doc.id);
if(cfg.isReevaluateQueryForRetry()) {
if(query!=null) {
List<DBObject> list=new ArrayList<>(2);
list.add(findQuery);
list.add(query);
findQuery=new BasicDBObject("$and",list);
}
}
DBObject updatedDoc=collection.findOne(findQuery);
if(updatedDoc!=null) {
// if updatedDoc is null, doc is lost. Error remains
DBObject newDoc=reapplyChanges(index,updatedDoc);
// Make sure reapplyChanges does not insert references
// of objects from the old document into the
// updatedDoc. That updates both copies of
// documents. Use deepCopy
if(newDoc!=null) {
DBObject replaceQuery=writeReplaceQuery(updatedDoc);
// Update the doc ver to our doc ver. This doc is here
// because its docVer is not set to our docver, so
// this is ok
DocVerUtil.setDocVer(newDoc,docVer);
// Using bulkwrite here with one doc to use the
// findAndReplace API, which is lacking in
// DBCollection
BulkWriteOperation nestedBwo=collection.initializeUnorderedBulkOperation();
nestedBwo.find(replaceQuery).replaceOne(newDoc);
try {
if(nestedBwo.execute().getMatchedCount()==1) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Successfully retried to update a doc: replaceQuery={} newDoc={}", replaceQuery, newDoc);
}
// Successful update
ci.errors.remove(index);
}
} catch(Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed retrying to update a doc: replaceQuery={} newDoc={} error={}", replaceQuery, newDoc, e.toString());
}
newFailedDocs.add(index);
}
} else {
// reapllyChanges removed the doc from the resultset
ci.errors.remove(index);
}
} else {
// Doc no longer exists
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Removing doc id={} from retry queue, because it does not exist or match anymore", index);
}
ci.errors.remove(index);
ci.lostDocs.add(index);
}
}
return newFailedDocs;
}
项目:lightblue-mongo
文件:BatchUpdate.java
/**
* Runs a batch update using bwo
*
* @param bwo The bulk write operation
* @param writeConcern
* @param batchSize
* @param results The results are populated during this call with an error for each failed doc
* @param logger The logger
*
* @return If returns true, all docs are updated. Otherwise, there
* are some failed docs, and concurrent update error detection
* should be called
*/
public static boolean batchUpdate(BulkWriteOperation bwo,
WriteConcern writeConcern,
int batchSize,
Map<Integer,Error> results,
Logger logger) {
boolean ret=true;
BulkWriteResult writeResult;
logger.debug("attemptToUpdate={}",batchSize);
try {
if(writeConcern==null) {
writeResult=bwo.execute();
} else {
writeResult=bwo.execute(writeConcern);
}
logger.debug("writeResult={}",writeResult);
if(batchSize==writeResult.getMatchedCount()) {
logger.debug("Successful update");
} else {
logger.warn("notUpdated={}",batchSize-writeResult.getMatchedCount());
ret=false;
}
} catch (BulkWriteException e) {
List<BulkWriteError> writeErrors=e.getWriteErrors();
if(writeErrors!=null) {
for(BulkWriteError we:writeErrors) {
if (MongoCrudConstants.isDuplicate(we.getCode())) {
results.put(we.getIndex(),
Error.get("update", MongoCrudConstants.ERR_DUPLICATE, we.getMessage()));
} else {
results.put(we.getIndex(),
Error.get("update", MongoCrudConstants.ERR_SAVE_ERROR, we.getMessage()));
}
}
}
ret=false;
}
return ret;
}
项目:interop
文件:Json2Mongo.java
@Override
public void exportDocuments(final Iterable<JSONObject> it,
final int tell) {
if (it == null) {
throw new NullPointerException("it");
}
if (tell <= 0) {
throw new IllegalArgumentException("tell <= 0");
}
BulkWriteOperation builder = coll.initializeOrderedBulkOperation();
//coll.initializeUnorderedBulkOperation();
int tot = 0;
int bulkNum = 0;
for (JSONObject obj : it) {
if (++tot % tell == 0) {
System.out.println("+++" + tot);
}
try {
coll.insert(convertToDBObj(obj), WriteConcern.SAFE);
} catch(IllegalArgumentException iae) {
Logger.getLogger(this.getClass().getName())
.severe(iae.getMessage());
}
}
/*for (JSONObject obj : it) {
if (++tot % tell == 0) {
System.out.println("+++" + tot);
}
bulkNum++;
builder.insert(convertToDBObj(obj));
if (bulkNum >= MAX_BULK_DOCS) {
System.out.print("Sending documents to MongoDB - ");
final BulkWriteResult result = builder.execute();
final int inserted = result.getInsertedCount();
System.out.println("OK");
if (inserted < bulkNum) {
final String msg = "Insertion error: inserted[" + inserted
+ "] expected[" + bulkNum + "]";
Logger.getLogger(Json2Mongo.class.getName())
.log(Level.SEVERE, msg);
}
bulkNum = 0;
builder = coll.initializeOrderedBulkOperation();
//builder = coll.initializeUnorderedBulkOperation();
}
}
if (bulkNum > 0) {
final BulkWriteResult result = builder.execute();
final int inserted = result.getInsertedCount();
if (inserted < bulkNum) {
final String msg = "Insertion error: inserted[" + inserted
+ "] expected[" + bulkNum + "]";
Logger.getLogger(Json2Mongo.class.getName())
.log(Level.SEVERE, msg);
}
}*/
}
项目:hvdf
文件:RawStorageInterceptor.java
private void storeBatch(BasicDBList sample, BasicDBList resultList) {
// The batch may span collection splits, so maintain
// a current collection and batch operation
BulkWriteOperation currentOp = null;
int currentOpOffset = 0;
int sampleIdx = 0;
DBCollection currentColl = null;
logger.debug("Received batch of size : {}", sample.size());
try{
for(; sampleIdx < sample.size(); ++sampleIdx){
// prepare the sample to batch
BasicDBObject doc = (BasicDBObject) (sample.get(sampleIdx));
SampleId _id = this.idFactory.createId(doc);
doc.put(Sample.ID_KEY, _id.toObject());
resultList.add(_id.toObject());
long timestamp = doc.getLong(Sample.TS_KEY);
DBCollection collection = collectionAllocator.getCollection(timestamp);
// if the collection has changed, commit the current
// batch to the collection and start new
if(collection.equals(currentColl) == false){
executeBatchWrite(currentOp, sample);
currentColl = collection;
currentOp = collection.initializeUnorderedBulkOperation();
currentOpOffset = sampleIdx;
}
// put the doc insert into the batch
currentOp.insert(doc);
}
// Finalize the last batch
executeBatchWrite(currentOp, sample);
} catch(Exception ex){
// One of the bulk writes has failed
BasicDBList failedDocs = new BasicDBList();
if(ex instanceof BulkWriteException){
// We need to figure out the failures and remove the writes
// that worked from the batch
int batchSize = sampleIdx - currentOpOffset;
BulkWriteException bwex = (BulkWriteException)ex;
int errorCount = bwex.getWriteErrors().size();
if(errorCount < batchSize){
for(BulkWriteError we : bwex.getWriteErrors()){
failedDocs.add(sample.get(currentOpOffset + we.getIndex()));
}
// since we have accounted for the failures in the current
// batch, move the offset forward to the last sample
currentOpOffset = sampleIdx;
}
}
// If this happened part way through the batch, send remaining
// docs to failed list and update sample to contain only failed docs
if(currentOpOffset > 0){
for(; currentOpOffset < sample.size(); ++currentOpOffset)
failedDocs.add(sample.get(currentOpOffset));
sample.clear();
sample.addAll(failedDocs);
}
// TODO : we also need to handle the result Ids here as well,
// the failed doc Ids must be pulled from the resultList
throw ex;
}
}
项目:Decision
文件:SaveToMongoActionExecutionFunction.java
@Override
public void process(Iterable<StratioStreamingMessage> messages) throws Exception {
Integer partitionSize = maxBatchSize;
if (partitionSize == null || partitionSize <= 0){
partitionSize = Iterables.size(messages);
}
Iterable<List<StratioStreamingMessage>> partitionIterables = Iterables.partition(messages, partitionSize);
try {
for (List<StratioStreamingMessage> messageList : partitionIterables) {
Map<String, BulkWriteOperation> elementsToInsert = new HashMap<String, BulkWriteOperation>();
for (StratioStreamingMessage event : messageList) {
BasicDBObject object = new BasicDBObject(TIMESTAMP_FIELD, event.getTimestamp());
for (ColumnNameTypeValue columnNameTypeValue : event.getColumns()) {
object.append(columnNameTypeValue.getColumn(), columnNameTypeValue.getValue());
}
BulkWriteOperation bulkInsertOperation = elementsToInsert.get(event.getStreamName());
if (bulkInsertOperation == null) {
bulkInsertOperation = getDB().getCollection(event.getStreamName())
.initializeUnorderedBulkOperation();
elementsToInsert.put(event.getStreamName(), bulkInsertOperation);
getDB().getCollection(event.getStreamName())
.createIndex(new BasicDBObject(TIMESTAMP_FIELD, -1));
}
bulkInsertOperation.insert(object);
}
for (Entry<String, BulkWriteOperation> stratioStreamingMessage : elementsToInsert.entrySet()) {
stratioStreamingMessage.getValue().execute();
}
}
} catch (Exception e) {
log.error("Error saving in Mongo: " + e.getMessage());
}
}
项目:storm-mongodb
文件:CRUDOperation.java
/**
* Add this operation to a bulk operation
* @param bulk Bulk operation
*/
public abstract void addToBulkOperation(BulkWriteOperation bulk);