public void insertData(String schemaName, String tableName, DBObject naive, DBObject complete) { int i = 0; DBObject logObj = (DBObject) ObjectUtils.clone(complete); //保存原始数据 try { String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.INSERT.getNumber(); i++; naiveMongoTemplate.getCollection(tableName).insert(naive); i++; SpringUtil.doEvent(path, complete); i++; } catch (MongoClientException | MongoSocketException clientException) { //客户端连接异常抛出,阻塞同步,防止mongodb宕机 throw clientException; } catch (Exception e) { logError(schemaName, tableName, 1, i, logObj, e); } }
public void updateData(String schemaName, String tableName, DBObject query, DBObject obj) { String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.UPDATE.getNumber(); int i = 0; DBObject newObj = (DBObject) ObjectUtils.clone(obj); DBObject logObj = (DBObject) ObjectUtils.clone(obj); //保存原始数据 try { obj.removeField("id"); i++; naiveMongoTemplate.getCollection(tableName).update(query, obj); i++; SpringUtil.doEvent(path, newObj); i++; } catch (MongoClientException | MongoSocketException clientException) { //客户端连接异常抛出,阻塞同步,防止mongodb宕机 throw clientException; } catch (Exception e) { logError(schemaName, tableName, 2, i, logObj, e); } }
public void deleteData(String schemaName, String tableName, DBObject obj) { int i = 0; String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.DELETE.getNumber(); DBObject newObj = (DBObject) ObjectUtils.clone(obj); DBObject logObj = (DBObject) ObjectUtils.clone(obj); //保存原始数据 try { i++; if (obj.containsField("id")) { naiveMongoTemplate.getCollection(tableName).remove(new BasicDBObject("_id", obj.get("id"))); } i++; SpringUtil.doEvent(path, newObj); } catch (MongoClientException | MongoSocketException clientException) { //客户端连接异常抛出,阻塞同步,防止mongodb宕机 throw clientException; } catch (Exception e) { logError(schemaName, tableName, 3, i, logObj, e); } }
private MongoDatabase createMongoDatabase( Stage.Context context, List<Stage.ConfigIssue> issues, ReadPreference readPreference, WriteConcern writeConcern ) { MongoDatabase mongoDatabase = null; try { if (readPreference != null) { mongoDatabase = mongoClient.getDatabase(database).withReadPreference(readPreference); } else if (writeConcern != null) { mongoDatabase = mongoClient.getDatabase(database).withWriteConcern(writeConcern); } } catch (MongoClientException e) { issues.add(context.createConfigIssue( Groups.MONGODB.name(), MONGO_CONFIG_PREFIX + "database", Errors.MONGODB_02, database, e.toString() )); } return mongoDatabase; }
private MongoCollection createMongoCollection( Stage.Context context, List<Stage.ConfigIssue> issues, ReadPreference readPreference, WriteConcern writeConcern ) { MongoCollection mongoCollection = null; try { if (readPreference != null) { mongoCollection = mongoDatabase.getCollection(collection).withReadPreference(readPreference); } else if (writeConcern != null) { mongoCollection = mongoDatabase.getCollection(collection).withWriteConcern(writeConcern); } } catch (MongoClientException e) { issues.add(context.createConfigIssue( Groups.MONGODB.name(), MONGO_CONFIG_PREFIX + "collection", Errors.MONGODB_03, collection, e.toString() )); } return mongoCollection; }
/** * Checks if the system database, available in all MongoDB instances can be * reached. */ @Override protected Result check() throws Exception { try { final Document result = mongoDatabase.runCommand(new Document("dbStats", 1)); return Result.healthy(result.toJson()); } catch (MongoClientException exc) { logger.warn("Unhealthy database", exc); return Result.unhealthy(exc); } }
private void deleteArtifact(final GridFSDBFile dbFile) { if (dbFile != null) { try { gridFs.delete(new Query().addCriteria(Criteria.where(ID).is(dbFile.getId()))); } catch (final MongoClientException e) { throw new ArtifactStoreException(e.getMessage(), e); } } }
@Override public void deleteByTenant(final String tenant) { try { gridFs.delete(new Query().addCriteria(Criteria.where(TENANT_QUERY).is(sanitizeTenant(tenant)))); } catch (final MongoClientException e) { throw new ArtifactStoreException(e.getMessage(), e); } }
/** * Checks if the system database, which exists in all MongoDB instances can be reached. * @return A Result object * @throws Exception */ @Override protected Result check() throws Exception { try { mongoClient.getDB("system").getStats(); }catch(MongoClientException ex) { return Result.unhealthy(ex.getMessage()); } return Result.healthy(); }