Java 类com.mongodb.client.model.WriteModel 实例源码
项目:Prism
文件:MongoRecords.java
@Override
public StorageWriteResult write(List<DataContainer> containers) throws Exception {
MongoCollection<Document> collection = MongoStorageAdapter.getCollection(MongoStorageAdapter.collectionEventRecordsName);
// Build an array of documents
List<WriteModel<Document>> documents = new ArrayList<>();
for (DataContainer container : containers) {
Document document = documentFromView(container);
//Prism.getLogger().debug(DataUtil.jsonFromDataView(container).toString());
// TTL
document.append("Expires", DateUtil.parseTimeStringToDate(expiration, true));
// Insert
documents.add(new InsertOneModel<>(document));
}
// Write
collection.bulkWrite(documents, bulkWriteOptions);
// @todo implement real results, BulkWriteResult
return new StorageWriteResult();
}
项目:AbacusUtil
文件:AsyncMongoDBExecutor.java
public CompletableFuture<BulkWriteResult> bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests) {
return asyncExecutor.execute(new Callable<BulkWriteResult>() {
@Override
public BulkWriteResult call() throws Exception {
return dbExecutor.bulkWrite(collectionName, requests);
}
});
}
项目:para-dao-mongodb
文件:MongoDBDAO.java
@Override
public <P extends ParaObject> void updateAll(String appid, List<P> objects) {
if (StringUtils.isBlank(appid) || objects == null) {
return;
}
try {
ArrayList<WriteModel<Document>> updates = new ArrayList<WriteModel<Document>>();
List<String> ids = new ArrayList<String>(objects.size());
for (P object : objects) {
if (object != null) {
object.setUpdated(Utils.timestamp());
Document id = new Document(ID, object.getId());
Document data = new Document("$set", toRow(object, Locked.class, true));
UpdateOneModel<Document> um = new UpdateOneModel<Document>(id, data);
updates.add(um);
ids.add(object.getId());
}
}
BulkWriteResult res = getTable(appid).bulkWrite(updates, new BulkWriteOptions().ordered(true));
logger.debug("Updated: " + res.getModifiedCount() + ", keys: " + ids);
} catch (Exception e) {
logger.error(null, e);
}
logger.debug("DAO.updateAll() {}", objects.size());
}
项目:oap
文件:MongoStorage.java
@Override
public void fsync() {
super.fsync();
val count = new MutableInt();
Stream.of( data.values()
.stream()
.filter( m -> m.modified >= lastFsync ) )
.grouped( bulkSize )
.forEach( list -> {
count.add( list.size() );
final List<? extends WriteModel<Metadata<T>>> bulk = Lists.map( list,
metadata -> {
val id = identifier.get( metadata.object );
return new ReplaceOneModel<>( eq( "_id", new ObjectId( id ) ), metadata, UPDATE_OPTIONS_UPSERT );
} );
collection.bulkWrite( bulk );
} );
log.info( "[{}] fsync total: {}, modified: {}", collection.getNamespace(), size(), count.intValue() );
lastFsync = System.currentTimeMillis();
}
项目:restheart
文件:DocumentDAO.java
@Override
public BulkOperationResult bulkDeleteDocuments(
String dbName,
String collName,
BsonDocument filter,
BsonDocument shardedKeys) {
MongoDatabase mdb = client.getDatabase(dbName);
MongoCollection<BsonDocument> mcoll
= mdb.getCollection(collName, BsonDocument.class);
List<WriteModel<BsonDocument>> deletes = new ArrayList<>();
Bson _filter;
if (shardedKeys != null) {
_filter = and(filter, shardedKeys);
} else {
_filter = filter;
}
deletes.add(new DeleteManyModel<>(_filter));
BulkWriteResult result = mcoll.bulkWrite(deletes);
return new BulkOperationResult(HttpStatus.SC_OK, null, result);
}
项目:restheart
文件:DAOUtils.java
public static BulkOperationResult bulkUpsertDocuments(
final MongoCollection<BsonDocument> coll,
final BsonArray documents,
final BsonDocument filter,
final BsonDocument shardKeys) {
Objects.requireNonNull(coll);
Objects.requireNonNull(documents);
ObjectId newEtag = new ObjectId();
List<WriteModel<BsonDocument>> wm = getBulkWriteModel(
coll,
documents,
filter,
shardKeys,
newEtag);
BulkWriteResult result = coll.bulkWrite(wm);
return new BulkOperationResult(HttpStatus.SC_OK, newEtag, result);
}
项目:MongoSyphon
文件:MongoBulkWriter.java
public MongoBulkWriter(String URI, String namespace)
{
logger = LoggerFactory.getLogger(MongoBulkWriter.class);
logger.info("Connecting to " + URI );
mongoClient = new MongoClient(new MongoClientURI(URI));
String[] parts = namespace.split("\\.");
db = mongoClient.getDatabase(parts[0]);
collection = db.getCollection(parts[1]);
ops = new ArrayList<WriteModel<Document>>();
}
项目:ibm-performance-monitor
文件:ProfiledMongoClientTest.java
@Test
public void bulkWrite()
{
List<WriteModel<Document>> list = insertOneWithBulk();
coll.deleteOne(Filters.eq("name", "DELETEME"));
coll.bulkWrite(list, new BulkWriteOptions());
coll.deleteMany(Filters.eq("name", "DELETEME"));
}
项目:ibm-performance-monitor
文件:ProfiledMongoClientTest.java
private List<WriteModel<Document>> insertOneWithBulk()
{
List<WriteModel<Document>> list = new ArrayList<WriteModel<Document>>();
list.add(new InsertOneModel<Document>(createDeleteDocument()));
coll.bulkWrite(list);
return list;
}
项目:kafka-connect-mongodb
文件:MongodbSinkTask.java
/**
* Put the records in the sink.
*
* @param collection the set of records to send.
*/
@Override
public void put(Collection<SinkRecord> collection) {
List<SinkRecord> records = new ArrayList<>(collection);
for (int i = 0; i < records.size(); i++) {
Map<String, List<WriteModel<Document>>> bulks = new HashMap<>();
for (int j = 0; j < bulkSize && i < records.size(); j++, i++) {
SinkRecord record = records.get(i);
Map<String, Object> jsonMap = SchemaUtils.toJsonMap((Struct) record.value());
String topic = record.topic();
if (bulks.get(topic) == null) {
bulks.put(topic, new ArrayList<WriteModel<Document>>());
}
Document newDocument = new Document(jsonMap)
.append("_id", record.kafkaOffset());
log.trace("Adding to bulk: {}", newDocument.toString());
bulks.get(topic).add(new UpdateOneModel<Document>(
Filters.eq("_id", record.kafkaOffset()),
new Document("$set", newDocument),
new UpdateOptions().upsert(true)));
}
i--;
log.trace("Executing bulk");
for (String key : bulks.keySet()) {
try {
com.mongodb.bulk.BulkWriteResult result = mapping.get(key).bulkWrite(bulks.get(key));
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
}
项目:AbacusUtil
文件:AsyncMongoDBExecutor.java
public CompletableFuture<BulkWriteResult> bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests,
final BulkWriteOptions options) {
return asyncExecutor.execute(new Callable<BulkWriteResult>() {
@Override
public BulkWriteResult call() throws Exception {
return dbExecutor.bulkWrite(collectionName, requests, options);
}
});
}
项目:AbacusUtil
文件:MongoCollectionExecutor.java
public BulkWriteResult bulkWrite(final List<? extends WriteModel<? extends Document>> requests, final BulkWriteOptions options) {
if (options == null) {
return coll.bulkWrite(requests);
} else {
return coll.bulkWrite(requests, options);
}
}
项目:AbacusUtil
文件:AsyncMongoCollectionExecutor.java
public CompletableFuture<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends Document>> requests) {
return asyncExecutor.execute(new Callable<BulkWriteResult>() {
@Override
public BulkWriteResult call() throws Exception {
return collExecutor.bulkWrite(requests);
}
});
}
项目:AbacusUtil
文件:AsyncMongoCollectionExecutor.java
public CompletableFuture<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends Document>> requests, final BulkWriteOptions options) {
return asyncExecutor.execute(new Callable<BulkWriteResult>() {
@Override
public BulkWriteResult call() throws Exception {
return collExecutor.bulkWrite(requests, options);
}
});
}
项目:mongo-java-driver-rx
文件:MongoCollectionImpl.java
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}), observableAdapter);
}
项目:POCDriver
文件:MongoWorker.java
private void updateSingleRecord(List<WriteModel<Document>> bulkWriter,
Document key) {
// Key Query
rotateCollection();
Document query = new Document();
Document change;
if (key == null) {
int range = sequence * testOpts.workingset / 100;
int rest = sequence - range;
int recordno = rest + getNextVal(range);
query.append("_id",
new Document("w", workerID).append("i", recordno));
} else {
query.append("_id", key);
}
int updateFields = (testOpts.updateFields <= testOpts.numFields) ? testOpts.updateFields : testOpts.numFields;
if (updateFields == 1) {
long changedfield = (long) getNextVal((int) testOpts.NUMBER_SIZE);
Document fields = new Document("fld0", changedfield);
change = new Document("$set", fields);
} else {
TestRecord tr = createNewRecord();
tr.internalDoc.remove("_id");
change = new Document("$set", tr.internalDoc);
}
if (testOpts.findandmodify == false) {
bulkWriter.add(new UpdateManyModel<Document>(query, change));
} else {
this.coll.findOneAndUpdate(query, change); //These are immediate not batches
}
testResults.RecordOpsDone("updates", 1);
}
项目:POCDriver
文件:MongoWorker.java
private TestRecord insertNewRecord(List<WriteModel<Document>> bulkWriter) {
int[] arr = new int[2];
arr[0] = testOpts.arraytop;
arr[1] = testOpts.arraynext;
TestRecord tr = createNewRecord();
bulkWriter.add(new InsertOneModel<Document>(tr.internalDoc));
return tr;
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(observe(new Block<SingleResultCallback<BulkWriteResult>>(){
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}));
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(observe(new Block<SingleResultCallback<BulkWriteResult>>(){
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(clientSession, requests, options, callback);
}
}));
}
项目:mandrel
文件:MongoMetricsRepository.java
public void checkFilled() {
LocalDateTime now = LocalDateTime.now();
LocalDateTime keytime = now.withMinute(0).withSecond(0).withNano(0);
if (TIMESERIES_ALLOWED_KEYS.stream().anyMatch(key -> {
Document serie = timeseries.find(Filters.and(Filters.eq("type", key), Filters.eq("timestamp_hour", keytime))).limit(1).first();
if (serie != null) {
Map<String, Long> values = (Map<String, Long>) serie.get("values");
if (values.size() != 60) {
log.warn("Wrong values size for timeserie collection {}", key);
return true;
}
return false;
}
return false;
})) {
log.warn("Dropping the timeseries collection");
timeseries.drop();
}
List<? extends WriteModel<Document>> requests = TIMESERIES_ALLOWED_KEYS
.stream()
.map(key -> Pair.of(key, timeseries.find(Filters.and(Filters.eq("type", key), Filters.eq("timestamp_hour", keytime))).limit(1).first()))
.filter(doc -> doc.getRight() == null)
.map(pair -> pair.getLeft())
.map(key -> {
Document document = new Document();
document.append("type", key).append("timestamp_hour", keytime);
document.append("values",
IntStream.range(0, 60).collect(Document::new, (doc, val) -> doc.put(Integer.toString(val), Long.valueOf(0)), Document::putAll));
return document;
})
.map(doc -> new UpdateOneModel<Document>(Filters.and(Filters.eq("type", doc.getString("type")), Filters.eq("timestamp_hour", keytime)),
new Document("$set", doc), new UpdateOptions().upsert(true))).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(requests)) {
timeseries.bulkWrite(requests);
}
}
项目:mandrel
文件:MongoMetricsRepository.java
public void prepareMinutes(LocalDateTime keytime) {
List<? extends WriteModel<Document>> requests = TIMESERIES_ALLOWED_KEYS
.stream()
.map(el -> {
Document document = new Document();
document.append("type", el).append("timestamp_hour", keytime);
document.append("values",
IntStream.range(0, 60).collect(Document::new, (doc, val) -> doc.put(Integer.toString(val), Long.valueOf(0)), Document::putAll));
return document;
})
.map(doc -> new UpdateOneModel<Document>(Filters.and(Filters.eq("type", doc.getString("type")), Filters.eq("timestamp_hour", keytime)),
new Document("$set", doc), new UpdateOptions().upsert(true))).collect(Collectors.toList());
timeseries.bulkWrite(requests);
}
项目:render
文件:RenderDao.java
/**
* Saves the specified tile spec to the database.
*
* @param stackId stack identifier.
* @param resolvedTileSpecs collection of resolved tile specs (with referenced transforms).
*
* @throws IllegalArgumentException
* if any required parameters or transform spec references are missing.
*/
public void saveResolvedTiles(final StackId stackId,
final ResolvedTileSpecCollection resolvedTileSpecs)
throws IllegalArgumentException {
MongoUtil.validateRequiredParameter("stackId", stackId);
MongoUtil.validateRequiredParameter("resolvedTileSpecs", resolvedTileSpecs);
final Collection<TransformSpec> transformSpecs = resolvedTileSpecs.getTransformSpecs();
final Collection<TileSpec> tileSpecs = resolvedTileSpecs.getTileSpecs();
if (transformSpecs.size() > 0) {
saveResolvedTransforms(stackId, transformSpecs);
}
if (tileSpecs.size() > 0) {
final MongoCollection<Document> tileCollection = getTileCollection(stackId);
final List<WriteModel<Document>> modelList = new ArrayList<>(tileSpecs.size());
Document query = new Document();
Document tileSpecObject;
for (final TileSpec tileSpec : tileSpecs) {
query = new Document("tileId", tileSpec.getTileId());
tileSpecObject = Document.parse(tileSpec.toJson());
modelList.add(new ReplaceOneModel<>(query, tileSpecObject, MongoUtil.UPSERT_OPTION));
}
final BulkWriteResult result = tileCollection.bulkWrite(modelList, MongoUtil.UNORDERED_OPTION);
if (LOG.isDebugEnabled()) {
final String bulkResultMessage = MongoUtil.toMessage("tile specs", result, tileSpecs.size());
LOG.debug("saveResolvedTiles: {} using {}.initializeUnorderedBulkOp()",
bulkResultMessage, MongoUtil.fullName(tileCollection), query.toJson());
}
}
}
项目:vertx-mongo-client
文件:MongoClientImpl.java
@Override
public MongoClient bulkWriteWithOptions(String collection, List<BulkOperation> operations, BulkWriteOptions bulkWriteOptions, Handler<AsyncResult<MongoClientBulkWriteResult>> resultHandler) {
requireNonNull(collection, "collection cannot be null");
requireNonNull(operations, "operations cannot be null");
requireNonNull(bulkWriteOptions, "bulkWriteOptions cannot be null");
requireNonNull(resultHandler, "resultHandler cannot be null");
MongoCollection<JsonObject> coll = getCollection(collection, bulkWriteOptions.getWriteOption());
List<WriteModel<JsonObject>> bulkOperations = convertBulkOperations(operations);
coll.bulkWrite(bulkOperations, mongoBulkWriteOptions(bulkWriteOptions),
toMongoClientBulkWriteResult(resultHandler));
return this;
}
项目:vertx-mongo-client
文件:MongoClientImpl.java
private List<WriteModel<JsonObject>> convertBulkOperations(List<BulkOperation> operations) {
List<WriteModel<JsonObject>> result = new ArrayList<>(operations.size());
for (BulkOperation bulkOperation : operations) {
switch (bulkOperation.getType()) {
case DELETE:
Bson bsonFilter = toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter()));
if (bulkOperation.isMulti()) {
result.add(new DeleteManyModel<>(bsonFilter));
} else {
result.add(new DeleteOneModel<>(bsonFilter));
}
break;
case INSERT:
result.add(new InsertOneModel<>(encodeKeyWhenUseObjectId(bulkOperation.getDocument())));
break;
case REPLACE:
result.add(new ReplaceOneModel<>(toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())), bulkOperation.getDocument(),
new com.mongodb.client.model.UpdateOptions().upsert(bulkOperation.isUpsert())));
break;
case UPDATE:
Bson filter = toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter()));
Bson document = toBson(encodeKeyWhenUseObjectId(bulkOperation.getDocument()));
com.mongodb.client.model.UpdateOptions updateOptions = new com.mongodb.client.model.UpdateOptions()
.upsert(bulkOperation.isUpsert());
if (bulkOperation.isMulti()) {
result.add(new UpdateManyModel<>(filter, document, updateOptions));
} else {
result.add(new UpdateOneModel<>(filter, document, updateOptions));
}
break;
default:
throw new IllegalArgumentException("Unknown bulk operation type: " + bulkOperation.getClass());
}
}
return result;
}
项目:restheart
文件:DocumentDAO.java
@Override
public BulkOperationResult bulkPatchDocuments(
String dbName,
String collName,
BsonDocument filter,
BsonDocument shardedKeys,
BsonDocument data) {
MongoDatabase mdb = client.getDatabase(dbName);
MongoCollection<BsonDocument> mcoll
= mdb.getCollection(collName, BsonDocument.class);
List<WriteModel<BsonDocument>> patches = new ArrayList<>();
Bson _filter;
if (shardedKeys != null) {
_filter = and(filter, shardedKeys);
} else {
_filter = filter;
}
patches.add(new UpdateManyModel<>(
_filter,
DAOUtils.getUpdateDocument(data),
DAOUtils.U_NOT_UPSERT_OPS));
BulkWriteResult result = mcoll.bulkWrite(patches);
return new BulkOperationResult(HttpStatus.SC_OK, null, result);
}
项目:AbacusUtil
文件:MongoDBExecutor.java
public BulkWriteResult bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests) {
return collExecutor(collectionName).bulkWrite(requests);
}
项目:AbacusUtil
文件:MongoDBExecutor.java
public BulkWriteResult bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests,
final BulkWriteOptions options) {
return collExecutor(collectionName).bulkWrite(requests, options);
}
项目:AbacusUtil
文件:MongoCollectionExecutor.java
public BulkWriteResult bulkWrite(final List<? extends WriteModel<? extends Document>> requests) {
return bulkWrite(requests, null);
}
项目:mongo-java-driver-rx
文件:MongoCollectionImpl.java
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
项目:POCDriver
文件:MongoWorker.java
private void updateSingleRecord(List<WriteModel<Document>> bulkWriter) {
updateSingleRecord(bulkWriter, null);
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(clientSession, requests, new BulkWriteOptions());
}
项目:render
文件:MatchDao.java
public void saveMatches(final MatchCollectionId collectionId,
final List<CanvasMatches> matchesList)
throws IllegalArgumentException {
MongoUtil.validateRequiredParameter("collectionId", collectionId);
MongoUtil.validateRequiredParameter("matchesList", matchesList);
LOG.debug("saveMatches: entry, collectionId={}, matchesList.size()={}",
collectionId, matchesList.size());
if (matchesList.size() > 0) {
final MongoCollection<Document> collection =
matchDatabase.getCollection(collectionId.getDbCollectionName());
ensureMatchIndexes(collection);
final List<WriteModel<Document>> modelList = new ArrayList<>(matchesList.size());
final UpdateOptions upsertOption = new UpdateOptions().upsert(true);
Document filter;
Document matchesObject;
for (final CanvasMatches canvasMatches : matchesList) {
canvasMatches.normalize();
filter = new Document(
"pGroupId", canvasMatches.getpGroupId()).append(
"pId", canvasMatches.getpId()).append(
"qGroupId", canvasMatches.getqGroupId()).append(
"qId", canvasMatches.getqId());
matchesObject = Document.parse(canvasMatches.toJson());
modelList.add(new ReplaceOneModel<>(filter, matchesObject, upsertOption));
}
final BulkWriteResult result = collection.bulkWrite(modelList, MongoUtil.UNORDERED_OPTION);
if (LOG.isDebugEnabled()) {
final String bulkResultMessage = MongoUtil.toMessage("matches", result, matchesList.size());
LOG.debug("saveMatches: {} using {}.initializeUnorderedBulkOp()",
bulkResultMessage, MongoUtil.fullName(collection));
}
}
}
项目:mongo-java-driver-rx
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @return an Observable with a single element the BulkWriteResult
*/
Observable<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests);
项目:mongo-java-driver-rx
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return an Observable with a single element the BulkWriteResult
*/
Observable<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
项目:mongo-java-driver-reactivestreams
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @return a publisher with a single element the BulkWriteResult
*/
Publisher<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests);
项目:mongo-java-driver-reactivestreams
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a publisher with a single element the BulkWriteResult
*/
Publisher<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
项目:mongo-java-driver-reactivestreams
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param clientSession the client session with which to associate this operation
* @param requests the writes to execute
* @return a publisher with a single element the BulkWriteResult
* @mongodb.server.release 3.6
* @since 1.7
*/
Publisher<BulkWriteResult> bulkWrite(ClientSession clientSession, List<? extends WriteModel<? extends TDocument>> requests);
项目:mongo-java-driver-reactivestreams
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param clientSession the client session with which to associate this operation
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a publisher with a single element the BulkWriteResult
* @mongodb.server.release 3.6
* @since 1.7
*/
Publisher<BulkWriteResult> bulkWrite(ClientSession clientSession, List<? extends WriteModel<? extends TDocument>> requests,
BulkWriteOptions options);