@Override public <P extends ParaObject> void updateAll(String appid, List<P> objects) { if (StringUtils.isBlank(appid) || objects == null) { return; } try { ArrayList<WriteModel<Document>> updates = new ArrayList<WriteModel<Document>>(); List<String> ids = new ArrayList<String>(objects.size()); for (P object : objects) { if (object != null) { object.setUpdated(Utils.timestamp()); Document id = new Document(ID, object.getId()); Document data = new Document("$set", toRow(object, Locked.class, true)); UpdateOneModel<Document> um = new UpdateOneModel<Document>(id, data); updates.add(um); ids.add(object.getId()); } } BulkWriteResult res = getTable(appid).bulkWrite(updates, new BulkWriteOptions().ordered(true)); logger.debug("Updated: " + res.getModifiedCount() + ", keys: " + ids); } catch (Exception e) { logger.error(null, e); } logger.debug("DAO.updateAll() {}", objects.size()); }
/** * Put the records in the sink. * * @param collection the set of records to send. */ @Override public void put(Collection<SinkRecord> collection) { List<SinkRecord> records = new ArrayList<>(collection); for (int i = 0; i < records.size(); i++) { Map<String, List<WriteModel<Document>>> bulks = new HashMap<>(); for (int j = 0; j < bulkSize && i < records.size(); j++, i++) { SinkRecord record = records.get(i); Map<String, Object> jsonMap = SchemaUtils.toJsonMap((Struct) record.value()); String topic = record.topic(); if (bulks.get(topic) == null) { bulks.put(topic, new ArrayList<WriteModel<Document>>()); } Document newDocument = new Document(jsonMap) .append("_id", record.kafkaOffset()); log.trace("Adding to bulk: {}", newDocument.toString()); bulks.get(topic).add(new UpdateOneModel<Document>( Filters.eq("_id", record.kafkaOffset()), new Document("$set", newDocument), new UpdateOptions().upsert(true))); } i--; log.trace("Executing bulk"); for (String key : bulks.keySet()) { try { com.mongodb.bulk.BulkWriteResult result = mapping.get(key).bulkWrite(bulks.get(key)); } catch (Exception e) { log.error(e.getMessage()); } } } }
public void checkFilled() { LocalDateTime now = LocalDateTime.now(); LocalDateTime keytime = now.withMinute(0).withSecond(0).withNano(0); if (TIMESERIES_ALLOWED_KEYS.stream().anyMatch(key -> { Document serie = timeseries.find(Filters.and(Filters.eq("type", key), Filters.eq("timestamp_hour", keytime))).limit(1).first(); if (serie != null) { Map<String, Long> values = (Map<String, Long>) serie.get("values"); if (values.size() != 60) { log.warn("Wrong values size for timeserie collection {}", key); return true; } return false; } return false; })) { log.warn("Dropping the timeseries collection"); timeseries.drop(); } List<? extends WriteModel<Document>> requests = TIMESERIES_ALLOWED_KEYS .stream() .map(key -> Pair.of(key, timeseries.find(Filters.and(Filters.eq("type", key), Filters.eq("timestamp_hour", keytime))).limit(1).first())) .filter(doc -> doc.getRight() == null) .map(pair -> pair.getLeft()) .map(key -> { Document document = new Document(); document.append("type", key).append("timestamp_hour", keytime); document.append("values", IntStream.range(0, 60).collect(Document::new, (doc, val) -> doc.put(Integer.toString(val), Long.valueOf(0)), Document::putAll)); return document; }) .map(doc -> new UpdateOneModel<Document>(Filters.and(Filters.eq("type", doc.getString("type")), Filters.eq("timestamp_hour", keytime)), new Document("$set", doc), new UpdateOptions().upsert(true))).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(requests)) { timeseries.bulkWrite(requests); } }
public void prepareMinutes(LocalDateTime keytime) { List<? extends WriteModel<Document>> requests = TIMESERIES_ALLOWED_KEYS .stream() .map(el -> { Document document = new Document(); document.append("type", el).append("timestamp_hour", keytime); document.append("values", IntStream.range(0, 60).collect(Document::new, (doc, val) -> doc.put(Integer.toString(val), Long.valueOf(0)), Document::putAll)); return document; }) .map(doc -> new UpdateOneModel<Document>(Filters.and(Filters.eq("type", doc.getString("type")), Filters.eq("timestamp_hour", keytime)), new Document("$set", doc), new UpdateOptions().upsert(true))).collect(Collectors.toList()); timeseries.bulkWrite(requests); }
private List<WriteModel<JsonObject>> convertBulkOperations(List<BulkOperation> operations) { List<WriteModel<JsonObject>> result = new ArrayList<>(operations.size()); for (BulkOperation bulkOperation : operations) { switch (bulkOperation.getType()) { case DELETE: Bson bsonFilter = toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())); if (bulkOperation.isMulti()) { result.add(new DeleteManyModel<>(bsonFilter)); } else { result.add(new DeleteOneModel<>(bsonFilter)); } break; case INSERT: result.add(new InsertOneModel<>(encodeKeyWhenUseObjectId(bulkOperation.getDocument()))); break; case REPLACE: result.add(new ReplaceOneModel<>(toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())), bulkOperation.getDocument(), new com.mongodb.client.model.UpdateOptions().upsert(bulkOperation.isUpsert()))); break; case UPDATE: Bson filter = toBson(encodeKeyWhenUseObjectId(bulkOperation.getFilter())); Bson document = toBson(encodeKeyWhenUseObjectId(bulkOperation.getDocument())); com.mongodb.client.model.UpdateOptions updateOptions = new com.mongodb.client.model.UpdateOptions() .upsert(bulkOperation.isUpsert()); if (bulkOperation.isMulti()) { result.add(new UpdateManyModel<>(filter, document, updateOptions)); } else { result.add(new UpdateOneModel<>(filter, document, updateOptions)); } break; default: throw new IllegalArgumentException("Unknown bulk operation type: " + bulkOperation.getClass()); } } return result; }