/** * Create replication peer for replicating to region replicas if needed. * @param conf configuration to use * @throws IOException */ public static void setupRegionReplicaReplication(Configuration conf) throws IOException { if (!isRegionReplicaReplicationEnabled(conf)) { return; } ReplicationAdmin repAdmin = new ReplicationAdmin(conf); try { if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); } } catch (ReplicationException ex) { throw new IOException(ex); } finally { repAdmin.close(); } }
@Test public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { // create a table with region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor( "testReplicationPeerIsCreated_no_region_replicas"); HTU.getHBaseAdmin().createTable(htd); ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); htd = HTU.createTableDescriptor("testReplicationPeerIsCreated"); htd.setRegionReplication(2); HTU.getHBaseAdmin().createTable(htd); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
@Test (timeout=240000) public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { // modify a table by adding region replicas. Check whether the replication peer is created // and replication started. ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); String peerId = "region_replica_replication"; if (admin.getPeerConfig(peerId) != null) { admin.removePeer(peerId); } HTableDescriptor htd = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable"); HTU.getHBaseAdmin().createTable(htd); // assert that replication peer is not created yet ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId); assertNull(peerConfig); HTU.getHBaseAdmin().disableTable(htd.getTableName()); htd.setRegionReplication(2); HTU.getHBaseAdmin().modifyTable(htd.getTableName(), htd); HTU.getHBaseAdmin().enableTable(htd.getTableName()); // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); }
/** * * @param configuration * @param peerName * @param tableCFs * @throws ReplicationException * @throws IOException */ protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs) throws ReplicationException, IOException { try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig() .setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration)) .setReplicationEndpointImpl(HbaseEndpoint.class.getName()); replicationAdmin.addPeer(peerName, peerConfig, tableCFs); } }
private void setupReplication() throws Exception { ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationAdmin admin2 = new ReplicationAdmin(conf2); HBaseAdmin ha = new HBaseAdmin(conf1); ha.createTable(t1_syncupSource); ha.createTable(t2_syncupSource); ha.close(); ha = new HBaseAdmin(conf2); ha.createTable(t1_syncupTarget); ha.createTable(t2_syncupTarget); ha.close(); // Get HTable from Master ht1Source = new HTable(conf1, t1_su); ht1Source.setWriteBufferSize(1024); ht2Source = new HTable(conf1, t2_su); ht1Source.setWriteBufferSize(1024); // Get HTable from Peer1 ht1TargetAtPeer1 = new HTable(conf2, t1_su); ht1TargetAtPeer1.setWriteBufferSize(1024); ht2TargetAtPeer1 = new HTable(conf2, t2_su); ht2TargetAtPeer1.setWriteBufferSize(1024); /** * set M-S : Master: utility1 Slave1: utility2 */ admin1.addPeer("1", utility2.getClusterKey()); admin1.close(); admin2.close(); }
protected void setupReplication() throws Exception { ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationAdmin admin2 = new ReplicationAdmin(conf2); Admin ha = utility1.getAdmin(); ha.createTable(t1_syncupSource); ha.createTable(t2_syncupSource); ha.close(); ha = utility2.getAdmin(); ha.createTable(t1_syncupTarget); ha.createTable(t2_syncupTarget); ha.close(); Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration()); Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration()); // Get HTable from Master ht1Source = connection1.getTable(t1_su); ht2Source = connection1.getTable(t2_su); // Get HTable from Peer1 ht1TargetAtPeer1 = connection2.getTable(t1_su); ht2TargetAtPeer1 = connection2.getTable(t2_su); /** * set M-S : Master: utility1 Slave1: utility2 */ ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); admin1.addPeer("1", rpc, null); admin1.close(); admin2.close(); }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1 = HBaseConfiguration.create(); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.setLong("replication.source.sleepforretries", 100); // Each WAL is about 120 bytes conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 200); conf1.setLong("replication.source.per.peer.node.bandwidth", 100L); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); new ZKWatcher(conf1, "cluster1", null, true); conf2 = new Configuration(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); new ZKWatcher(conf2, "cluster2", null, true); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); utility1.startMiniCluster(1, 1); utility2.startMiniCluster(1, 1); admin1.addPeer("peer1", rpc, null); admin1.addPeer("peer2", rpc, null); admin1.addPeer("peer3", rpc, null); }
@Before public void setUpBeforeTest() throws Exception { if (!firstTest) { // Delete /ngdata from zookeeper System.out.println(">>> Deleting /ngdata node from ZooKeeper"); cleanZooKeeper("localhost:" + hbaseTestUtil.getZkCluster().getClientPort(), "/ngdata"); // Delete all hbase tables System.out.println(">>> Deleting all HBase tables"); Admin admin = connection.getAdmin(); for (HTableDescriptor table : admin.listTables()) { admin.disableTable(table.getTableName()); admin.deleteTable(table.getTableName()); } admin.close(); // Delete all replication peers System.out.println(">>> Deleting all replication peers from HBase"); ReplicationAdmin replAdmin = new ReplicationAdmin(conf); for (String peerId : replAdmin.listPeerConfigs().keySet()) { replAdmin.removePeer(peerId); } replAdmin.close(); SepTestUtil.waitOnAllReplicationPeersStopped(); // Clear Solr indexes System.out.println(">>> Clearing Solr indexes"); collection1.deleteByQuery("*:*"); collection1.commit(); collection2.deleteByQuery("*:*"); collection2.commit(); } else { firstTest = false; } main = new Main(); main.startServices(conf); }
@Override public boolean hasSubscription(String name) throws IOException { ReplicationAdmin replicationAdmin = new ReplicationAdmin(hbaseConf); try { String internalName = toInternalSubscriptionName(name); return replicationAdmin.listPeerConfigs().containsKey(internalName); } finally { Closer.close(replicationAdmin); } }
@Test(timeout=300000) public void testMultiSlaveReplication() throws Exception { LOG.info("testCyclicReplication"); MiniHBaseCluster master = utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); new HBaseAdmin(conf1).createTable(table); new HBaseAdmin(conf2).createTable(table); new HBaseAdmin(conf3).createTable(table); Table htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); Table htable2 = new HTable(conf2, tableName); htable2.setWriteBufferSize(1024); Table htable3 = new HTable(conf3, tableName); htable3.setWriteBufferSize(1024); admin1.addPeer("1", utility2.getClusterKey()); // put "row" and wait 'til it got around, then delete putAndWait(row, famName, htable1, htable2); deleteAndWait(row, htable1, htable2); // check it wasn't replication to cluster 3 checkRow(row,0,htable3); putAndWait(row2, famName, htable1, htable2); // now roll the region server's logs rollWALAndWait(utility1, htable1.getName(), row2); // after the log was rolled put a new row putAndWait(row3, famName, htable1, htable2); admin1.addPeer("2", utility3.getClusterKey()); // put a row, check it was replicated to all clusters putAndWait(row1, famName, htable1, htable2, htable3); // delete and verify deleteAndWait(row1, htable1, htable2, htable3); // make sure row2 did not get replicated after // cluster 3 was added checkRow(row2,0,htable3); // row3 will get replicated, because it was in the // latest log checkRow(row3,1,htable3); Put p = new Put(row); p.add(famName, row, row); htable1.put(p); // now roll the logs again rollWALAndWait(utility1, htable1.getName(), row); // cleanup "row2", also conveniently use this to wait replication // to finish deleteAndWait(row2, htable1, htable2, htable3); // Even if the log was rolled in the middle of the replication // "row" is still replication. checkRow(row, 1, htable2); // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, // we should wait before checking. checkWithWait(row, 1, htable3); // cleanup the rest deleteAndWait(row, htable1, htable2, htable3); deleteAndWait(row3, htable1, htable2, htable3); utility3.shutdownMiniCluster(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); }
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) throws Exception { // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables" + dropTable); HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString()); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); HTU.getHBaseAdmin().createTable(htd); TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); HTU.getHBaseAdmin().createTable(htd); // both tables are created, now pause replication ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); // now that the replication is disabled, write to the table to be dropped, then drop the table. Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); Table tableToBeDisabled = connection.getTable(toBeDisabledTable); HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); AtomicLong skippedEdits = new AtomicLong(); RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); Entry entry = new Entry( new WALKey(encodedRegionName, toBeDisabledTable, 1), new WALEdit()); HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getHBaseAdmin().deleteTable(toBeDisabledTable); } sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); try { // load some data to the to-be-dropped table // load the data to the table HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); verifyReplication(tableName, regionReplication, 0, 1000); } finally { admin.close(); table.close(); rl.close(); tableToBeDisabled.close(); HTU.deleteTableIfAny(toBeDisabledTable); connection.close(); } }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1.setInt("hfile.format.version", 3); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.setInt("replication.source.size.capacity", 10240); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForTagsAtSource.class.getName()); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); replicationAdmin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.setInt("hfile.format.version", 3); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean("dfs.support.append", true); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForTagsAtSink.class.getName()); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); replicationAdmin.addPeer("2", utility2.getClusterKey()); LOG.info("Setup second Zk"); utility1.startMiniCluster(2); utility2.startMiniCluster(2); HTableDescriptor table = new HTableDescriptor(TABLE_NAME); HColumnDescriptor fam = new HColumnDescriptor(FAMILY); fam.setMaxVersions(3); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); try (Connection conn = ConnectionFactory.createConnection(conf1); Admin admin = conn.getAdmin()) { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Connection conn = ConnectionFactory.createConnection(conf2); Admin admin = conn.getAdmin()) { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } htable1 = utility1.getConnection().getTable(TABLE_NAME); htable2 = utility2.getConnection().getTable(TABLE_NAME); }
private void readTableCFsZnode() { String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs); }
/** * Removes the peer * @throws IOException * @throws ReplicationException */ private void removePeer() throws IOException, ReplicationException { try(ReplicationAdmin replicationAdmin = new ReplicationAdmin(utility.getConfiguration())) { replicationAdmin.removePeer(PEER_NAME); } }
/** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller log roll size to trigger more events conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); conf1.setInt("replication.source.size.capacity", 10240); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true); admin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt("hbase.client.retries.number", 6); conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); conf2.setBoolean("dfs.support.append", true); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); admin.addPeer("2", utility2.getClusterKey()); setIsReplication(true); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); utility1.startMiniCluster(2); utility2.startMiniCluster(2); HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); HBaseAdmin admin1 = new HBaseAdmin(conf1); HBaseAdmin admin2 = new HBaseAdmin(conf2); admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); admin2.createTable(table); htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); htable2 = new HTable(conf2, tableName); }
@Test(timeout=300000) public void testMultiSlaveReplication() throws Exception { LOG.info("testCyclicReplication"); MiniHBaseCluster master = utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); new HBaseAdmin(conf1).createTable(table); new HBaseAdmin(conf2).createTable(table); new HBaseAdmin(conf3).createTable(table); HTable htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); HTable htable2 = new HTable(conf2, tableName); htable2.setWriteBufferSize(1024); HTable htable3 = new HTable(conf3, tableName); htable3.setWriteBufferSize(1024); admin1.addPeer("1", utility2.getClusterKey()); // put "row" and wait 'til it got around, then delete putAndWait(row, famName, htable1, htable2); deleteAndWait(row, htable1, htable2); // check it wasn't replication to cluster 3 checkRow(row,0,htable3); putAndWait(row2, famName, htable1, htable2); // now roll the region server's logs new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString()); // after the log was rolled put a new row putAndWait(row3, famName, htable1, htable2); admin1.addPeer("2", utility3.getClusterKey()); // put a row, check it was replicated to all clusters putAndWait(row1, famName, htable1, htable2, htable3); // delete and verify deleteAndWait(row1, htable1, htable2, htable3); // make sure row2 did not get replicated after // cluster 3 was added checkRow(row2,0,htable3); // row3 will get replicated, because it was in the // latest log checkRow(row3,1,htable3); Put p = new Put(row); p.add(famName, row, row); htable1.put(p); // now roll the logs again new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0) .getServerName().toString()); // cleanup "row2", also conveniently use this to wait replication // to finish deleteAndWait(row2, htable1, htable2, htable3); // Even if the log was rolled in the middle of the replication // "row" is still replication. checkRow(row, 1, htable2); // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, // we should wait before checking. checkWithWait(row, 1, htable3); // cleanup the rest deleteAndWait(row, htable1, htable2, htable3); deleteAndWait(row3, htable1, htable2, htable3); utility3.shutdownMiniCluster(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); }
/** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller log roll size to trigger more events conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); conf1.setInt("replication.source.size.capacity", 10240); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setLong("replication.sleep.before.failover", 2000); conf1.setInt("replication.source.maxretriesmultiplier", 10); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true); admin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); conf2.setBoolean("dfs.support.append", true); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); admin.addPeer("2", utility2.getClusterKey()); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); utility1.startMiniCluster(2); utility2.startMiniCluster(2); HTableDescriptor table = new HTableDescriptor(tableName); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setMaxVersions(3); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } utility1.waitUntilAllRegionsAssigned(tableName); utility2.waitUntilAllRegionsAssigned(tableName); htable1 = connection1.getTable(tableName); htable1.setWriteBufferSize(1024); htable2 = connection2.getTable(tableName); }
/** * @throws java.lang.Exception */ @BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller log roll size to trigger more events conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f); conf1.setInt("replication.source.size.capacity", 10240); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true); admin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT); conf2.setBoolean("dfs.support.append", true); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true); admin.addPeer("2", utility2.getClusterKey()); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); utility1.startMiniCluster(2); utility2.startMiniCluster(2); HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor fam = new HColumnDescriptor(famName); fam.setMaxVersions(3); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); fam = new HColumnDescriptor(noRepfamName); table.addFamily(fam); HBaseAdmin admin1 = new HBaseAdmin(conf1); HBaseAdmin admin2 = new HBaseAdmin(conf2); admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); htable2 = new HTable(conf2, tableName); }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger // sufficient number of events. But we don't want to go too low because // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want // more than one batch sent to the peer cluster for better testing. conf1.setInt("replication.source.size.capacity", 102400); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setLong("replication.sleep.before.failover", 2000); conf1.setInt("replication.source.maxretriesmultiplier", 10); conf1.setFloat("replication.source.ratio", 1.0f); conf1.setBoolean("replication.source.eof.autorecovery", true); // Parameter config conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); zkw1 = new ZKWatcher(conf1, "cluster1", null, true); admin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); zkw2 = new ZKWatcher(conf2, "cluster2", null, true); LOG.info("Setup second Zk"); CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1); utility1.startMiniCluster(2); // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks // as a component in deciding maximum number of parallel batches to send to the peer cluster. utility2.startMiniCluster(4); ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build(); hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); hbaseAdmin.addReplicationPeer("2", rpc); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .addColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (ColumnFamilyDescriptor f : table.getColumnFamilies()) { scopes.put(f.getName(), f.getScope()); } Connection connection1 = ConnectionFactory.createConnection(conf1); Connection connection2 = ConnectionFactory.createConnection(conf2); try (Admin admin1 = connection1.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } utility1.waitUntilAllRegionsAssigned(tableName); utility2.waitUntilAllRegionsAssigned(tableName); htable1 = connection1.getTable(tableName); htable2 = connection2.getTable(tableName); }
@Test(timeout=300000) public void testMultiSlaveReplication() throws Exception { LOG.info("testCyclicReplication"); MiniHBaseCluster master = utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); utility1.getAdmin().createTable(table); utility2.getAdmin().createTable(table); utility3.getAdmin().createTable(table); Table htable1 = utility1.getConnection().getTable(tableName); Table htable2 = utility2.getConnection().getTable(tableName); Table htable3 = utility3.getConnection().getTable(tableName); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); admin1.addPeer("1", rpc, null); // put "row" and wait 'til it got around, then delete putAndWait(row, famName, htable1, htable2); deleteAndWait(row, htable1, htable2); // check it wasn't replication to cluster 3 checkRow(row,0,htable3); putAndWait(row2, famName, htable1, htable2); // now roll the region server's logs rollWALAndWait(utility1, htable1.getName(), row2); // after the log was rolled put a new row putAndWait(row3, famName, htable1, htable2); rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility3.getClusterKey()); admin1.addPeer("2", rpc, null); // put a row, check it was replicated to all clusters putAndWait(row1, famName, htable1, htable2, htable3); // delete and verify deleteAndWait(row1, htable1, htable2, htable3); // make sure row2 did not get replicated after // cluster 3 was added checkRow(row2,0,htable3); // row3 will get replicated, because it was in the // latest log checkRow(row3,1,htable3); Put p = new Put(row); p.addColumn(famName, row, row); htable1.put(p); // now roll the logs again rollWALAndWait(utility1, htable1.getName(), row); // cleanup "row2", also conveniently use this to wait replication // to finish deleteAndWait(row2, htable1, htable2, htable3); // Even if the log was rolled in the middle of the replication // "row" is still replication. checkRow(row, 1, htable2); // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, // we should wait before checking. checkWithWait(row, 1, htable3); // cleanup the rest deleteAndWait(row, htable1, htable2, htable3); deleteAndWait(row3, htable1, htable2, htable3); utility3.shutdownMiniCluster(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1 = HBaseConfiguration.create(); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); // smaller block size and capacity to trigger more operations // and test them conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20); conf1.setInt("replication.source.size.capacity", 1024); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes conf1.setLong("replication.source.size.capacity", 1L); conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); new ZKWatcher(conf1, "cluster1", null, true); conf2 = new Configuration(conf1); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); new ZKWatcher(conf2, "cluster2", null, true); utility1.startMiniCluster(1, 10); utility2.startMiniCluster(1, 1); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); admin1.addPeer("1", rpc, null); utility1.getAdmin().setBalancerRunning(false, true); }
public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable) throws Exception { // tests having edits from a disabled or dropped table is handled correctly by skipping those // entries and further edits after the edits from dropped/disabled table can be replicated // without problems. final TableName tableName = TableName.valueOf(name.getMethodName() + dropTable); HTableDescriptor htd = HTU.createTableDescriptor(tableName); int regionReplication = 3; htd.setRegionReplication(regionReplication); HTU.deleteTableIfAny(tableName); HTU.getAdmin().createTable(htd); TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable"); HTU.deleteTableIfAny(toBeDisabledTable); htd = HTU.createTableDescriptor(toBeDisabledTable.toString()); htd.setRegionReplication(regionReplication); HTU.getAdmin().createTable(htd); // both tables are created, now pause replication ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration()); admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); // now that the replication is disabled, write to the table to be dropped, then drop the table. Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); Table table = connection.getTable(tableName); Table tableToBeDisabled = connection.getTable(toBeDisabledTable); HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000); AtomicLong skippedEdits = new AtomicLong(); RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink = mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class); when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits); RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter = new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink, (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE); RegionLocator rl = connection.getRegionLocator(toBeDisabledTable); HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); Entry entry = new Entry( new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALEdit()); HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table if (dropTable) { HTU.getAdmin().deleteTable(toBeDisabledTable); } sinkWriter.append(toBeDisabledTable, encodedRegionName, HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry)); assertEquals(2, skippedEdits.get()); try { // load some data to the to-be-dropped table // load the data to the table HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000); // now enable the replication admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId()); verifyReplication(tableName, regionReplication, 0, 1000); } finally { admin.close(); table.close(); rl.close(); tableToBeDisabled.close(); HTU.deleteTableIfAny(toBeDisabledTable); connection.close(); } }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1.setInt("hfile.format.version", 3); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.setInt("replication.source.size.capacity", 10240); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForTagsAtSource.class.getName()); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); replicationAdmin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.setInt("hfile.format.version", 3); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForTagsAtSink.class.getName()); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); LOG.info("Setup second Zk"); utility1.startMiniCluster(2); utility2.startMiniCluster(2); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(utility2.getClusterKey()); replicationAdmin.addPeer("2", rpc, null); HTableDescriptor table = new HTableDescriptor(TABLE_NAME); HColumnDescriptor fam = new HColumnDescriptor(FAMILY); fam.setMaxVersions(3); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); try (Connection conn = ConnectionFactory.createConnection(conf1); Admin admin = conn.getAdmin()) { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } try (Connection conn = ConnectionFactory.createConnection(conf2); Admin admin = conn.getAdmin()) { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } htable1 = utility1.getConnection().getTable(TABLE_NAME); htable2 = utility2.getConnection().getTable(TABLE_NAME); }
@BeforeClass public static void setUpBeforeClass() throws Exception { conf1.setInt("hfile.format.version", 3); conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); conf1.setInt("replication.source.size.capacity", 10240); conf1.setLong("replication.source.sleepforretries", 100); conf1.setInt("hbase.regionserver.maxlogs", 10); conf1.setLong("hbase.master.logcleaner.ttl", 10); conf1.setInt("zookeeper.recovery.retry", 1); conf1.setInt("zookeeper.recovery.retry.intervalmill", 10); conf1.setBoolean("dfs.support.append", true); conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); conf1.setInt("replication.stats.thread.period.seconds", 5); conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf1.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForTagsAtSource.class.getName()); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); // Have to reget conf1 in case zk cluster location different // than default conf1 = utility1.getConfiguration(); replicationAdmin = new ReplicationAdmin(conf1); LOG.info("Setup first Zk"); // Base conf2 on conf1 so it gets the right zk cluster. conf2 = HBaseConfiguration.create(conf1); conf2.setInt("hfile.format.version", 3); conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); conf2.setBoolean("dfs.support.append", true); conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf2.setStrings(HConstants.REPLICATION_CODEC_CONF_KEY, KeyValueCodecWithTags.class.getName()); conf2.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, TestCoprocessorForTagsAtSink.class.getName()); utility2 = new HBaseTestingUtility(conf2); utility2.setZkCluster(miniZK); replicationAdmin.addPeer("2", utility2.getClusterKey()); LOG.info("Setup second Zk"); utility1.startMiniCluster(2); utility2.startMiniCluster(2); HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME)); HColumnDescriptor fam = new HColumnDescriptor(FAMILY); fam.setMaxVersions(3); fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); table.addFamily(fam); HBaseAdmin admin = null; try { admin = new HBaseAdmin(conf1); admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } finally { if (admin != null) { admin.close(); } } try { admin = new HBaseAdmin(conf2); admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } finally { if(admin != null){ admin.close(); } } htable1 = new HTable(conf1, TABLE_NAME); htable1.setWriteBufferSize(1024); htable2 = new HTable(conf2, TABLE_NAME); }
@Test(timeout=300000) public void testCyclicReplication() throws Exception { LOG.info("testCyclicReplication"); utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationAdmin admin2 = new ReplicationAdmin(conf2); ReplicationAdmin admin3 = new ReplicationAdmin(conf3); new HBaseAdmin(conf1).createTable(table); new HBaseAdmin(conf2).createTable(table); new HBaseAdmin(conf3).createTable(table); HTable htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); HTable htable2 = new HTable(conf2, tableName); htable2.setWriteBufferSize(1024); HTable htable3 = new HTable(conf3, tableName); htable3.setWriteBufferSize(1024); admin1.addPeer("1", utility2.getClusterKey()); admin2.addPeer("1", utility3.getClusterKey()); admin3.addPeer("1", utility1.getClusterKey()); // put "row" and wait 'til it got around putAndWait(row, famName, htable1, htable3); // it should have passed through table2 check(row,famName,htable2); putAndWait(row1, famName, htable2, htable1); check(row,famName,htable3); putAndWait(row2, famName, htable3, htable2); check(row,famName,htable1); deleteAndWait(row,htable1,htable3); deleteAndWait(row1,htable2,htable1); deleteAndWait(row2,htable3,htable2); assertEquals("Puts were replicated back ", 3, getCount(htable1, put)); assertEquals("Puts were replicated back ", 3, getCount(htable2, put)); assertEquals("Puts were replicated back ", 3, getCount(htable3, put)); assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete)); assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete)); assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete)); utility3.shutdownMiniCluster(); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); }
/** * Add a row to a table in each cluster, check it's replicated, * delete it, check's gone * Also check the puts and deletes are not replicated back to * the originating cluster. */ @Test(timeout=300000) public void testSimplePutDelete() throws Exception { LOG.info("testSimplePutDelete"); utility1.startMiniCluster(); utility2.startMiniCluster(); ReplicationAdmin admin1 = new ReplicationAdmin(conf1); ReplicationAdmin admin2 = new ReplicationAdmin(conf2); new HBaseAdmin(conf1).createTable(table); new HBaseAdmin(conf2).createTable(table); HTable htable1 = new HTable(conf1, tableName); htable1.setWriteBufferSize(1024); HTable htable2 = new HTable(conf2, tableName); htable2.setWriteBufferSize(1024); // set M-M admin1.addPeer("1", utility2.getClusterKey()); admin2.addPeer("1", utility1.getClusterKey()); // add rows to both clusters, // make sure they are both replication putAndWait(row, famName, htable1, htable2); putAndWait(row1, famName, htable2, htable1); // make sure "row" did not get replicated back. assertEquals("Puts were replicated back ", 2, getCount(htable1, put)); // delete "row" and wait deleteAndWait(row, htable1, htable2); // make the 2nd cluster replicated back assertEquals("Puts were replicated back ", 2, getCount(htable2, put)); deleteAndWait(row1, htable2, htable1); assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete)); utility2.shutdownMiniCluster(); utility1.shutdownMiniCluster(); }