/** * Send the queries in parallel on the different region servers. Retries on failures. * If the method returns it means that there is no error, and the 'results' array will * contain no exception. On error, an exception is thrown, and the 'results' array will * contain results and exceptions. * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead */ @Override @Deprecated public <R> void processBatchCallback( List<? extends Row> list, TableName tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { AsyncRequestFuture ars = this.asyncProcess.submitAll( pool, tableName, list, callback, results); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } }
@Test public void testSubmitWithCB() throws Exception { ClusterConnection hc = createHConnection(); final AtomicInteger updateCalled = new AtomicInteger(0); Batch.Callback<Object> cb = new Batch.Callback<Object>() { @Override public void update(byte[] region, byte[] row, Object result) { updateCalled.incrementAndGet(); } }; AsyncProcess ap = new MyAsyncProcess(hc, conf); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); Assert.assertEquals(updateCalled.get(), 1); }
@Test public void testFail() throws Exception { MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); Put p = createPut(1, false); puts.add(p); AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); verifyResult(ars, false); Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); Assert.assertEquals(1, ars.getErrors().exceptions.size()); Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), failure.equals(ars.getErrors().exceptions.get(0))); Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), failure.equals(ars.getErrors().exceptions.get(0))); Assert.assertEquals(1, ars.getFailedOperations().size()); Assert.assertTrue("was: " + ars.getFailedOperations().get(0), p.equals(ars.getFailedOperations().get(0))); }
@Test public void testFailAndSuccess() throws Exception { MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); puts.add(createPut(1, true)); puts.add(createPut(1, true)); AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); verifyResult(ars, false, true, true); Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); ap.callsCt.set(0); Assert.assertEquals(1, ars.getErrors().actions.size()); puts.add(createPut(1, true)); // Wait for AP to be free. While ars might have the result, ap counters are decreased later. ap.waitUntilDone(); ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); Assert.assertEquals(2, ap.callsCt.get()); verifyResult(ars, true); }
@Test public void testFlush() throws Exception { MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); puts.add(createPut(1, true)); puts.add(createPut(1, true)); AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); ars.waitUntilDone(); verifyResult(ars, false, true, true); Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); Assert.assertEquals(1, ars.getFailedOperations().size()); }
/** * Send the queries in parallel on the different region servers. Retries on failures. * If the method returns it means that there is no error, and the 'results' array will * contain no exception. On error, an exception is thrown, and the 'results' array will * contain results and exceptions. * * @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead */ @Override @Deprecated public <R> void processBatchCallback( List<? extends Row> list, TableName tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { AsyncRequestFuture ars = this.asyncProcess.submitAll( pool, tableName, list, callback, results); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } }
@Test public void testSubmitWithCB() throws Exception { ClusterConnection hc = createHConnection(); final AtomicInteger updateCalled = new AtomicInteger(0); Batch.Callback<Object> cb = new Batch.Callback<Object>() { public void update(byte[] region, byte[] row, Object result) { updateCalled.incrementAndGet(); } }; AsyncProcess ap = new MyAsyncProcess(hc, conf); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, true)); final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); Assert.assertEquals(updateCalled.get(), 1); }
@Test public void testFail() throws Exception { MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); Put p = createPut(1, false); puts.add(p); AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); verifyResult(ars, false); Assert.assertEquals(2L, ap.getRetriesRequested()); Assert.assertEquals(1, ars.getErrors().exceptions.size()); Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), failure.equals(ars.getErrors().exceptions.get(0))); Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), failure.equals(ars.getErrors().exceptions.get(0))); Assert.assertEquals(1, ars.getFailedOperations().size()); Assert.assertTrue("was: " + ars.getFailedOperations().get(0), p.equals(ars.getFailedOperations().get(0))); }
@Test public void testFailAndSuccess() throws Exception { MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); puts.add(createPut(1, true)); puts.add(createPut(1, true)); AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); verifyResult(ars, false, true, true); Assert.assertEquals(2, ap.getRetriesRequested()); Assert.assertEquals(1, ars.getErrors().actions.size()); puts.add(createPut(1, true)); // Wait for AP to be free. While ars might have the result, ap counters are decreased later. ap.waitUntilDone(); ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); Assert.assertEquals(2, ap.getRetriesRequested()); verifyResult(ars, true); }
@Test public void testFlush() throws Exception { MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); List<Put> puts = new ArrayList<Put>(); puts.add(createPut(1, false)); puts.add(createPut(1, true)); puts.add(createPut(1, true)); AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); ars.waitUntilDone(); verifyResult(ars, false, true, true); Assert.assertEquals(2, ap.getRetriesRequested()); Assert.assertEquals(1, ars.getFailedOperations().size()); }
/** * {@inheritDoc} */ @Override public void batch(final List<? extends Row> actions, final Object[] results) throws InterruptedException, IOException { AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } }
@Override public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, boolean atLeastOne, Callback<Res> callback, boolean needResults) throws InterruptedIOException { // We use results in tests to check things, so override to always save them. return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); }
private void doHTableFailedPut(boolean bufferOn) throws Exception { ClusterConnection conn = createHConnection(); HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); ht.mutator.ap = ap; if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); } else { ht.setWriteBufferSize(0L); } Put put = createPut(1, false); Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); try { ht.put(put); if (bufferOn) { ht.flushCommits(); } Assert.fail(); } catch (RetriesExhaustedException expected) { } Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { if (someReqs.getResults().length == 0) continue; Assert.assertTrue(ars == null); ars = someReqs; } Assert.assertTrue(ars != null); verifyResult(ars, false); // This should not raise any exception, puts have been 'received' before by the catch. ht.close(); }
@Test public void testReplicaReplicaSuccess() throws Exception { // Main call takes too long so replicas succeed, except for one region w/o replicas. // One region has no replica, so the main call succeeds for it. MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE); Assert.assertEquals(2, ap.getReplicaCallCount()); }
@Test public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception { // Main call succeeds before replica calls are kicked off. MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]); verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE); Assert.assertEquals(0, ap.getReplicaCallCount()); }
@Test public void testReplicaParallelCallsSucceed() throws Exception { // Either main or replica can succeed. MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE); long replicaCalls = ap.getReplicaCallCount(); Assert.assertTrue(replicaCalls >= 0); Assert.assertTrue(replicaCalls <= 2); }
@Test public void testReplicaPartialReplicaCall() throws Exception { // One server is slow, so the result for its region comes from replica, whereas // the result for other region comes from primary before replica calls happen. // There should be no replica call for that region at all. MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0); ap.setPrimaryCallDelay(sn2, 2000); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.FALSE, RR.TRUE); Assert.assertEquals(1, ap.getReplicaCallCount()); }
@Test public void testReplicaMainFailsBeforeReplicaCalls() throws Exception { // Main calls fail before replica calls can start - this is currently not handled. // It would probably never happen if we can get location (due to retries), // and it would require additional synchronization. MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1); ap.addFailures(hri1, hri2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.FAILED, RR.FAILED); Assert.assertEquals(0, ap.getReplicaCallCount()); }
@Test public void testReplicaReplicaSuccessWithParallelFailures() throws Exception { // Main calls fails after replica calls start. For two-replica region, one replica call // also fails. Regardless, we get replica results for both regions. MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1); ap.addFailures(hri1, hri1r2, hri2); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.TRUE, RR.TRUE); Assert.assertEquals(2, ap.getReplicaCallCount()); }
@Test public void testReplicaAllCallsFailForOneRegion() throws Exception { // For one of the region, all 3, main and replica, calls fail. For the other, replica // call fails but its exception should not be visible as it did succeed. MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1); ap.addFailures(hri1, hri1r1, hri1r2, hri2r1); List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2); AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]); verifyReplicaResult(ars, RR.FAILED, RR.FALSE); // We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1 Assert.assertEquals(3, ars.getErrors().getNumExceptions()); for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) { Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow()); } }
private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception { Object[] actual = ars.getResults(); Assert.assertEquals(expected.length, actual.length); for (int i = 0; i < expected.length; ++i) { Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable)); } }
private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception { Object[] actuals = ars.getResults(); Assert.assertEquals(expecteds.length, actuals.length); for (int i = 0; i < expecteds.length; ++i) { Object actual = actuals[i]; RR expected = expecteds[i]; Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable); if (expected != RR.FAILED && expected != RR.DONT_CARE) { Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale()); } } }
private void doHTableFailedPut(boolean bufferOn) throws Exception { ClusterConnection conn = createHConnection(); HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); ht.mutator.ap = ap; if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); } else { ht.setWriteBufferSize(0L); } Put put = createPut(1, false); Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); try { ht.put(put); if (bufferOn) { ht.flushCommits(); } Assert.fail(); } catch (RetriesExhaustedException expected) { } Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { if (someReqs.getResults().length == 0) continue; Assert.assertTrue(ars == null); ars = someReqs; } Assert.assertTrue(ars != null); verifyResult(ars, false); // This should not raise any exception, puts have been 'received' before by the catch. ht.close(); }