protected void doApply(List<Record> records) { Map<List<String>, List<Record>> buckets = MigrateMap.makeComputingMap(new Function<List<String>, List<Record>>() { public List<Record> apply(List<String> names) { return Lists.newArrayList(); } }); // 根据目标库的不同,划分为多个bucket for (Record record : records) { buckets.get(Arrays.asList(record.getSchemaName(), record.getTableName())).add(record); } JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getTargetDs()); for (final List<Record> batchRecords : buckets.values()) { TableSqlUnit sqlUnit = getSqlUnit(batchRecords.get(0)); if (context.isBatchApply()) { applierByBatch(jdbcTemplate, batchRecords, sqlUnit); } else { applyOneByOne(jdbcTemplate, batchRecords, sqlUnit); } } }
public void start() { super.start(); dbType = YuGongUtils.judgeDbType(context.getTargetDs()); tableCache = MigrateMap.makeComputingMap(new Function<List<String>, Table>() { public Table apply(List<String> names) { if (names.size() != 2) { throw new YuGongException("names[" + names.toString() + "] is not valid"); } return TableMetaGenerator.getTableMeta(context.getTargetDs(), context.isIgnoreSchema() ? null : names.get(0), names.get(1)); } }); selectSqlCache = MigrateMap.makeMap(); }
protected void doApply(List<Record> records) { Map<List<String>, List<Record>> buckets = MigrateMap.makeComputingMap(new Function<List<String>, List<Record>>() { public List<Record> apply(List<String> names) { return Lists.newArrayList(); } }); // 根据目标库的不同,划分为多个bucket for (Record record : records) { buckets.get(Arrays.asList(record.getSchemaName(), record.getTableName())).add(record); } JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getTargetDs()); for (final List<Record> batchRecords : buckets.values()) { List<Record> queryRecords = null; if (context.isBatchApply()) { queryRecords = queryByBatch(jdbcTemplate, batchRecords); } else { queryRecords = queryOneByOne(jdbcTemplate, batchRecords); } diff(batchRecords, queryRecords); } }
public void start() { super.start(); batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() { public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) { return MemoryClientIdentityBatch.create(clientIdentity); } }); cursors = new MapMaker().makeMap(); destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() { public List<ClientIdentity> apply(String destination) { return Lists.newArrayList(); } }); }
public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager){ if (dataDir == null) { throw new NullPointerException("null dataDir"); } if (period <= 0) { throw new IllegalArgumentException("period must be positive, given: " + period); } if (memoryLogPositionManager == null) { throw new NullPointerException("null memoryLogPositionManager"); } this.dataDir = dataDir; this.period = period; this.memoryLogPositionManager = memoryLogPositionManager; this.dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>() { public File apply(String destination) { return getDataFile(destination); } }); this.executorService = Executors.newScheduledThreadPool(1); this.persistTasks = Collections.synchronizedSet(new HashSet<String>()); }
public void start() { super.start(); masterSqlCache = MigrateMap.makeMap(); String schemaName = context.getTableMeta().getSchema(); String tableName = context.getTableMeta().getName(); // 后去mlog表名 String mlogTableName = TableMetaGenerator.getMLogTableName(context.getSourceDs(), schemaName, tableName); if (StringUtils.isEmpty(mlogTableName)) { throw new YuGongException("not found mlog table for [" + schemaName + "." + tableName + "]"); } // 获取mlog表结构 mlogMeta = TableMetaGenerator.getTableMeta(context.getSourceDs(), context.getTableMeta().getSchema(), mlogTableName); // 构造mlog sql String colstr = SqlTemplates.COMMON.makeColumn(mlogMeta.getColumns()); mlogExtractSql = new MessageFormat(MLOG_EXTRACT_FORMAT).format(new Object[] { colstr, schemaName, mlogTableName }); mlogCleanSql = new MessageFormat(MLOG_CLEAN_FORMAT).format(new Object[] { schemaName, mlogTableName }); executorName = this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName(); if (executor == null) { executor = new ThreadPoolExecutor(threadSize, threadSize, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(threadSize * 2), new NamedThreadFactory(executorName), new ThreadPoolExecutor.CallerRunsPolicy()); } tracer.update(context.getTableMeta().getFullName(), ProgressStatus.INCING); }
public void start() { super.start(); dbType = YuGongUtils.judgeDbType(context.getTargetDs()); insertSqlCache = MigrateMap.makeMap(); updateSqlCache = MigrateMap.makeMap(); deleteSqlCache = MigrateMap.makeMap(); }
public void start() { super.start(); dataSources = MigrateMap.makeComputingMap(new Function<DataSourceConfig, DataSource>() { public DataSource apply(DataSourceConfig config) { return createDataSource(config.getUrl(), config.getUsername(), config.getPassword(), config.getType(), config.getProperties()); } }); }
public LoadThroughput(Identity identity){ counters = MigrateMap.makeComputingMap(new Function<Long, LoadCounter>() { public LoadCounter apply(Long pairId) { return new LoadCounter(pairId); } }); }
public void start() { if (!isStart()) { super.start(); canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() { public CanalInstance apply(String destination) { return canalInstanceGenerator.generate(destination); } }); // lastRollbackPostions = new MapMaker().makeMap(); } }
public void start() { super.start(); dbType = YuGongUtils.judgeDbType(context.getTargetDs()); applierSqlCache = MigrateMap.makeMap(); }