Java 类com.mongodb.client.model.BulkWriteOptions 实例源码
项目:para-dao-mongodb
文件:MongoDBDAO.java
@Override
public <P extends ParaObject> void updateAll(String appid, List<P> objects) {
if (StringUtils.isBlank(appid) || objects == null) {
return;
}
try {
ArrayList<WriteModel<Document>> updates = new ArrayList<WriteModel<Document>>();
List<String> ids = new ArrayList<String>(objects.size());
for (P object : objects) {
if (object != null) {
object.setUpdated(Utils.timestamp());
Document id = new Document(ID, object.getId());
Document data = new Document("$set", toRow(object, Locked.class, true));
UpdateOneModel<Document> um = new UpdateOneModel<Document>(id, data);
updates.add(um);
ids.add(object.getId());
}
}
BulkWriteResult res = getTable(appid).bulkWrite(updates, new BulkWriteOptions().ordered(true));
logger.debug("Updated: " + res.getModifiedCount() + ", keys: " + ids);
} catch (Exception e) {
logger.error(null, e);
}
logger.debug("DAO.updateAll() {}", objects.size());
}
项目:core-ng-project
文件:MongoCollectionImpl.java
@Override
public long bulkDelete(List<?> ids) {
StopWatch watch = new StopWatch();
int deletedRows = 0;
try {
List<DeleteOneModel<T>> models = new ArrayList<>(ids.size());
for (Object id : ids) {
models.add(new DeleteOneModel<>(Filters.eq("_id", id)));
}
BulkWriteResult result = collection().bulkWrite(models, new BulkWriteOptions().ordered(false));
deletedRows = result.getDeletedCount();
return deletedRows;
} finally {
long elapsedTime = watch.elapsedTime();
ActionLogContext.track("mongoDB", elapsedTime, 0, deletedRows);
logger.debug("bulkDelete, collection={}, size={}, elapsedTime={}", collectionName, ids.size(), elapsedTime);
checkSlowOperation(elapsedTime);
}
}
项目:testcontainers-hazelcast
文件:MongoMapStore.java
@Override
public void storeAll(Map<String, Supplement> map) {
log.info("storeAll");
List<InsertOneModel> batch = new LinkedList<InsertOneModel>();
for (Map.Entry<String, Supplement> entry : map.entrySet()) {
String key = entry.getKey();
Supplement value = entry.getValue();
batch.add(new InsertOneModel(
new Document("name", value.getName()).append("price", value.getPrice())
.append("_id", key)));
}
this.collection.bulkWrite(batch, new BulkWriteOptions().ordered(false));
}
项目:nifi-nars
文件:StoreInMongo.java
@OnScheduled
public void onScheduled(final ProcessContext context) {
batchSize = context.getProperty(MongoProps.BATCH_SIZE).asInteger();
boolean ordered = context.getProperty(MongoProps.ORDERED).asBoolean();
writeOptions = new BulkWriteOptions().ordered(ordered);
createMongoConnection(context);
ensureIndexes(context, collection);
}
项目:ibm-performance-monitor
文件:ProfiledMongoClientTest.java
@Test
public void bulkWrite()
{
List<WriteModel<Document>> list = insertOneWithBulk();
coll.deleteOne(Filters.eq("name", "DELETEME"));
coll.bulkWrite(list, new BulkWriteOptions());
coll.deleteMany(Filters.eq("name", "DELETEME"));
}
项目:AbacusUtil
文件:AsyncMongoDBExecutor.java
public CompletableFuture<Integer> bulkInsert(final String collectionName, final Collection<?> entities, final BulkWriteOptions options) {
return asyncExecutor.execute(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return dbExecutor.bulkInsert(collectionName, entities, options);
}
});
}
项目:AbacusUtil
文件:AsyncMongoDBExecutor.java
public CompletableFuture<BulkWriteResult> bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests,
final BulkWriteOptions options) {
return asyncExecutor.execute(new Callable<BulkWriteResult>() {
@Override
public BulkWriteResult call() throws Exception {
return dbExecutor.bulkWrite(collectionName, requests, options);
}
});
}
项目:AbacusUtil
文件:MongoCollectionExecutor.java
public int bulkInsert(final Collection<?> entities, final BulkWriteOptions options) {
final List<InsertOneModel<Document>> list = new ArrayList<>(entities.size());
for (Object entity : entities) {
if (entity instanceof Document) {
list.add(new InsertOneModel<Document>((Document) entity));
} else {
list.add(new InsertOneModel<Document>(MongoDBExecutor.toDocument(entity)));
}
}
return bulkWrite(list, options).getInsertedCount();
}
项目:AbacusUtil
文件:MongoCollectionExecutor.java
public BulkWriteResult bulkWrite(final List<? extends WriteModel<? extends Document>> requests, final BulkWriteOptions options) {
if (options == null) {
return coll.bulkWrite(requests);
} else {
return coll.bulkWrite(requests, options);
}
}
项目:AbacusUtil
文件:AsyncMongoCollectionExecutor.java
public CompletableFuture<Integer> bulkInsert(final Collection<?> entities, final BulkWriteOptions options) {
return asyncExecutor.execute(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return collExecutor.bulkInsert(entities, options);
}
});
}
项目:AbacusUtil
文件:AsyncMongoCollectionExecutor.java
public CompletableFuture<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends Document>> requests, final BulkWriteOptions options) {
return asyncExecutor.execute(new Callable<BulkWriteResult>() {
@Override
public BulkWriteResult call() throws Exception {
return collExecutor.bulkWrite(requests, options);
}
});
}
项目:df_data_service
文件:MongoAdminClient.java
public MongoAdminClient importJsonFile(String fileNamePath) {
int count = 0;
int batch = 100;
List<InsertOneModel<Document>> docs = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(fileNamePath))) {
String line;
while ((line = br.readLine()) != null) {
docs.add(new InsertOneModel<>(Document.parse(line)));
count++;
if (count == batch) {
this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
docs.clear();
count = 0;
}
}
} catch (IOException fnfe) {
fnfe.printStackTrace();
}
if (count > 0) {
collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
}
return this;
}
项目:df_data_service
文件:MongoAdminClient.java
public MongoAdminClient importJsonInputStream(InputStream fileInputStream) {
int count = 0;
int batch = 100;
List<InsertOneModel<Document>> docs = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(fileInputStream))) {
String line;
while ((line = br.readLine()) != null) {
docs.add(new InsertOneModel<>(Document.parse(line)));
count++;
if (count == batch) {
this.collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
docs.clear();
count = 0;
}
}
} catch (IOException fnfe) {
fnfe.printStackTrace();
}
if (count > 0) {
collection.bulkWrite(docs, new BulkWriteOptions().ordered(false));
}
return this;
}
项目:mongo-java-driver-rx
文件:MongoCollectionImpl.java
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<BulkWriteResult>>() {
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}), observableAdapter);
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(observe(new Block<SingleResultCallback<BulkWriteResult>>(){
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(requests, options, callback);
}
}));
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests,
final BulkWriteOptions options) {
return new ObservableToPublisher<BulkWriteResult>(observe(new Block<SingleResultCallback<BulkWriteResult>>(){
@Override
public void apply(final SingleResultCallback<BulkWriteResult> callback) {
wrapped.bulkWrite(clientSession, requests, options, callback);
}
}));
}
项目:AbacusUtil
文件:MongoDBExecutor.java
public int bulkInsert(final String collectionName, final Collection<?> entities, final BulkWriteOptions options) {
return collExecutor(collectionName).bulkInsert(entities, options);
}
项目:AbacusUtil
文件:MongoDBExecutor.java
public BulkWriteResult bulkWrite(final String collectionName, final List<? extends WriteModel<? extends Document>> requests,
final BulkWriteOptions options) {
return collExecutor(collectionName).bulkWrite(requests, options);
}
项目:mongo-java-driver-rx
文件:MongoCollectionImpl.java
@Override
public Observable<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(requests, new BulkWriteOptions());
}
项目:mongo-java-driver-reactivestreams
文件:MongoCollectionImpl.java
@Override
public Publisher<BulkWriteResult> bulkWrite(final ClientSession clientSession,
final List<? extends WriteModel<? extends TDocument>> requests) {
return bulkWrite(clientSession, requests, new BulkWriteOptions());
}
项目:mongo-java-driver-rx
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return an Observable with a single element the BulkWriteResult
*/
Observable<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
项目:mongo-java-driver-reactivestreams
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a publisher with a single element the BulkWriteResult
*/
Publisher<BulkWriteResult> bulkWrite(List<? extends WriteModel<? extends TDocument>> requests, BulkWriteOptions options);
项目:mongo-java-driver-reactivestreams
文件:MongoCollection.java
/**
* Executes a mix of inserts, updates, replaces, and deletes.
*
* @param clientSession the client session with which to associate this operation
* @param requests the writes to execute
* @param options the options to apply to the bulk write operation
* @return a publisher with a single element the BulkWriteResult
* @mongodb.server.release 3.6
* @since 1.7
*/
Publisher<BulkWriteResult> bulkWrite(ClientSession clientSession, List<? extends WriteModel<? extends TDocument>> requests,
BulkWriteOptions options);