/** * Load the list of disabled tables in ZK into local set. * @throws ZooKeeperConnectionException * @throws IOException */ private void loadDisabledTables() throws ZooKeeperConnectionException, IOException { HConnectionManager.execute(new HConnectable<Void>(getConf()) { @Override public Void connect(HConnection connection) throws IOException { ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { for (TableName tableName : ZKTableStateClientSideReader.getDisabledOrDisablingTables(zkw)) { disabledTables.add(tableName); } } catch (KeeperException ke) { throw new IOException(ke); } catch (InterruptedException e) { throw new InterruptedIOException(); } finally { zkw.close(); } return null; } }); }
/** * Load the list of disabled tables in ZK into local set. * @throws ZooKeeperConnectionException * @throws IOException */ private void loadDisabledTables() throws ZooKeeperConnectionException, IOException { HConnectionManager.execute(new HConnectable<Void>(getConf()) { @Override public Void connect(HConnection connection) throws IOException { ZooKeeperWatcher zkw = createZooKeeperWatcher(); try { for (TableName tableName : ZKTableReadOnly.getDisabledOrDisablingTables(zkw)) { disabledTables.add(tableName); } } catch (KeeperException ke) { throw new IOException(ke); } finally { zkw.close(); } return null; } }); }
/** * Scans the table and merges two adjacent regions if they are small. This * only happens when a lot of rows are deleted. * * When merging the hbase:meta region, the HBase instance must be offline. * When merging a normal table, the HBase instance must be online, but the * table must be disabled. * * @param conf - configuration object for HBase * @param fs - FileSystem where regions reside * @param tableName - Table to be compacted * @param testMasterRunning True if we are to verify master is down before * running merge * @throws IOException */ public static void merge(Configuration conf, FileSystem fs, final TableName tableName, final boolean testMasterRunning) throws IOException { boolean masterIsRunning = false; if (testMasterRunning) { masterIsRunning = HConnectionManager .execute(new HConnectable<Boolean>(conf) { @Override public Boolean connect(HConnection connection) throws IOException { return connection.isMasterRunning(); } }); } if (tableName.equals(TableName.META_TABLE_NAME)) { if (masterIsRunning) { throw new IllegalStateException( "Can not compact hbase:meta table if instance is on-line"); } // TODO reenable new OfflineMerger(conf, fs).process(); } else { if(!masterIsRunning) { throw new IllegalStateException( "HBase instance must be running to merge a normal table"); } Admin admin = new HBaseAdmin(conf); try { if (!admin.isTableDisabled(tableName)) { throw new TableNotDisabledException(tableName); } } finally { admin.close(); } new OnlineMerger(conf, fs, tableName).process(); } }
/** * Scans the table and merges two adjacent regions if they are small. This * only happens when a lot of rows are deleted. * * When merging the hbase:meta region, the HBase instance must be offline. * When merging a normal table, the HBase instance must be online, but the * table must be disabled. * * @param conf - configuration object for HBase * @param fs - FileSystem where regions reside * @param tableName - Table to be compacted * @param testMasterRunning True if we are to verify master is down before * running merge * @throws IOException */ public static void merge(Configuration conf, FileSystem fs, final TableName tableName, final boolean testMasterRunning) throws IOException { boolean masterIsRunning = false; if (testMasterRunning) { masterIsRunning = HConnectionManager .execute(new HConnectable<Boolean>(conf) { @Override public Boolean connect(HConnection connection) throws IOException { return connection.isMasterRunning(); } }); } if (tableName.equals(TableName.META_TABLE_NAME)) { if (masterIsRunning) { throw new IllegalStateException( "Can not compact hbase:meta table if instance is on-line"); } // TODO reenable new OfflineMerger(conf, fs).process(); } else { if(!masterIsRunning) { throw new IllegalStateException( "HBase instance must be running to merge a normal table"); } HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.isTableDisabled(tableName)) { throw new TableNotDisabledException(tableName); } new OnlineMerger(conf, fs, tableName).process(); } }
/** * Scans the table and merges two adjacent regions if they are small. This * only happens when a lot of rows are deleted. * * When merging the hbase:meta region, the HBase instance must be offline. * When merging a normal table, the HBase instance must be online, but the * table must be disabled. * * @param conf - configuration object for HBase * @param fs - FileSystem where regions reside * @param tableName - Table to be compacted * @param testMasterRunning True if we are to verify master is down before * running merge * @throws IOException */ public static void merge(Configuration conf, FileSystem fs, final TableName tableName, final boolean testMasterRunning) throws IOException { boolean masterIsRunning = false; if (testMasterRunning) { masterIsRunning = HConnectionManager .execute(new HConnectable<Boolean>(conf) { @Override public Boolean connect(HConnection connection) throws IOException { return connection.isMasterRunning(); } }); } if (tableName.equals(TableName.META_TABLE_NAME)) { if (masterIsRunning) { throw new IllegalStateException( "Can not compact hbase:meta table if instance is on-line"); } // TODO reenable new OfflineMerger(conf, fs).process(); } else { if(!masterIsRunning) { throw new IllegalStateException( "HBase instance must be running to merge a normal table"); } HBaseAdmin admin = new HBaseAdmin(conf); try { if (!admin.isTableDisabled(tableName)) { throw new TableNotDisabledException(tableName); } } finally { admin.close(); } new OnlineMerger(conf, fs, tableName).process(); } }
/** * Map method that compares every scanned row with the equivalent from * a distant cluster. * @param row The current table row key. * @param value The columns. * @param context The current context. * @throws IOException When something is broken with the data. */ @Override public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); long startTime = conf.getLong(NAME + ".startTime", 0); long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); String families = conf.get(NAME + ".families", null); if(families != null) { String[] fams = families.split(","); for(String fam : fams) { scan.addFamily(Bytes.toBytes(fam)); } } scan.setTimeRange(startTime, endTime); int versions = conf.getInt(NAME+".versions", -1); LOG.info("Setting number of version inside map as: " + versions); if (versions >= 0) { scan.setMaxVersions(versions); } final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); Configuration peerConf = HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); replicatedTable = new HTable(peerConf, tableName); scan.setStartRow(value.getRow()); scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); currentCompareRowInPeerTable = replicatedScanner.next(); } while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); if (rowCmpRet == 0) { // rowkey is same, need to compare the content of the row try { Result.compareResults(value, currentCompareRowInPeerTable); context.getCounter(Counters.GOODROWS).increment(1); } catch (Exception e) { logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); LOG.error("Exception while comparing row : " + e); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { // row only exists in peer table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } }
/** * Map method that compares every scanned row with the equivalent from * a distant cluster. * @param row The current table row key. * @param value The columns. * @param context The current context. * @throws IOException When something is broken with the data. */ @Override public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); long startTime = conf.getLong(NAME + ".startTime", 0); long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); String families = conf.get(NAME + ".families", null); if(families != null) { String[] fams = families.split(","); for(String fam : fams) { scan.addFamily(Bytes.toBytes(fam)); } } scan.setTimeRange(startTime, endTime); if (versions >= 0) { scan.setMaxVersions(versions); } final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); Configuration peerConf = HBaseConfiguration.create(conf); ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); // TODO: THis HTable doesn't get closed. Fix! Table replicatedTable = new HTable(peerConf, tableName); scan.setStartRow(value.getRow()); scan.setStopRow(tableSplit.getEndRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); currentCompareRowInPeerTable = replicatedScanner.next(); } while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); if (rowCmpRet == 0) { // rowkey is same, need to compare the content of the row try { Result.compareResults(value, currentCompareRowInPeerTable); context.getCounter(Counters.GOODROWS).increment(1); } catch (Exception e) { logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { // row only exists in peer table logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } }
/** * Map method that compares every scanned row with the equivalent from * a distant cluster. * @param row The current table row key. * @param value The columns. * @param context The current context. * @throws IOException When something is broken with the data. */ @Override public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); final Scan scan = new Scan(); scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); long startTime = conf.getLong(NAME + ".startTime", 0); long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); String families = conf.get(NAME + ".families", null); if(families != null) { String[] fams = families.split(","); for(String fam : fams) { scan.addFamily(Bytes.toBytes(fam)); } } scan.setTimeRange(startTime, endTime); HConnectionManager.execute(new HConnectable<Void>(conf) { @Override public Void connect(HConnection conn) throws IOException { String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); Configuration peerConf = HBaseConfiguration.create(conf); ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName")); scan.setStartRow(value.getRow()); replicatedScanner = replicatedTable.getScanner(scan); return null; } }); } Result res = replicatedScanner.next(); try { Result.compareResults(value, res); context.getCounter(Counters.GOODROWS).increment(1); } catch (Exception e) { LOG.warn("Bad row", e); context.getCounter(Counters.BADROWS).increment(1); } }