Java 类org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture 实例源码
项目:ditb
文件:ConnectionManager.java
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testSubmitWithCB() throws Exception {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
@Override
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
};
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1);
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testFail() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
Put p = createPut(1, false);
puts.add(p);
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
verifyResult(ars, false);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getErrors().exceptions.size());
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertEquals(1, ars.getFailedOperations().size());
Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
p.equals(ars.getFailedOperations().get(0)));
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testFailAndSuccess() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
ap.callsCt.set(0);
Assert.assertEquals(1, ars.getErrors().actions.size());
puts.add(createPut(1, true));
// Wait for AP to be free. While ars might have the result, ap counters are decreased later.
ap.waitUntilDone();
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
Assert.assertEquals(2, ap.callsCt.get());
verifyResult(ars, true);
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testFlush() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getFailedOperations().size());
}
项目:pbase
文件:ConnectionManager.java
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
*
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testSubmitWithCB() throws Exception {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
@Override
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
};
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1);
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testFail() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
Put p = createPut(1, false);
puts.add(p);
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
verifyResult(ars, false);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getErrors().exceptions.size());
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertEquals(1, ars.getFailedOperations().size());
Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
p.equals(ars.getFailedOperations().get(0)));
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testFailAndSuccess() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
ap.callsCt.set(0);
Assert.assertEquals(1, ars.getErrors().actions.size());
puts.add(createPut(1, true));
// Wait for AP to be free. While ars might have the result, ap counters are decreased later.
ap.waitUntilDone();
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
Assert.assertEquals(2, ap.callsCt.get());
verifyResult(ars, true);
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testFlush() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
Assert.assertEquals(1, ars.getFailedOperations().size());
}
项目:PyroDB
文件:ConnectionManager.java
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:PyroDB
文件:TestAsyncProcess.java
@Test
public void testSubmitWithCB() throws Exception {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
};
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1);
}
项目:PyroDB
文件:TestAsyncProcess.java
@Test
public void testFail() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
Put p = createPut(1, false);
puts.add(p);
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
verifyResult(ars, false);
Assert.assertEquals(2L, ap.getRetriesRequested());
Assert.assertEquals(1, ars.getErrors().exceptions.size());
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0),
failure.equals(ars.getErrors().exceptions.get(0)));
Assert.assertEquals(1, ars.getFailedOperations().size());
Assert.assertTrue("was: " + ars.getFailedOperations().get(0),
p.equals(ars.getFailedOperations().get(0)));
}
项目:PyroDB
文件:TestAsyncProcess.java
@Test
public void testFailAndSuccess() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(1, ars.getErrors().actions.size());
puts.add(createPut(1, true));
// Wait for AP to be free. While ars might have the result, ap counters are decreased later.
ap.waitUntilDone();
ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
Assert.assertEquals(0, puts.size());
ars.waitUntilDone();
Assert.assertEquals(2, ap.getRetriesRequested());
verifyResult(ars, true);
}
项目:PyroDB
文件:TestAsyncProcess.java
@Test
public void testFlush() throws Exception {
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, false));
puts.add(createPut(1, true));
puts.add(createPut(1, true));
AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true);
ars.waitUntilDone();
verifyResult(ars, false, true, true);
Assert.assertEquals(2, ap.getRetriesRequested());
Assert.assertEquals(1, ars.getFailedOperations().size());
}
项目:ditb
文件:HTable.java
/**
* {@inheritDoc}
*/
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:ditb
文件:TestAsyncProcess.java
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
throws InterruptedIOException {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
项目:ditb
文件:TestAsyncProcess.java
private void doHTableFailedPut(boolean bufferOn) throws Exception {
ClusterConnection conn = createHConnection();
HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
ht.mutator.ap = ap;
if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L);
} else {
ht.setWriteBufferSize(0L);
}
Put put = createPut(1, false);
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
try {
ht.put(put);
if (bufferOn) {
ht.flushCommits();
}
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get());
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
if (someReqs.getResults().length == 0) continue;
Assert.assertTrue(ars == null);
ars = someReqs;
}
Assert.assertTrue(ars != null);
verifyResult(ars, false);
// This should not raise any exception, puts have been 'received' before by the catch.
ht.close();
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaReplicaSuccess() throws Exception {
// Main call takes too long so replicas succeed, except for one region w/o replicas.
// One region has no replica, so the main call succeeds for it.
MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
// Main call succeeds before replica calls are kicked off.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaParallelCallsSucceed() throws Exception {
// Either main or replica can succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
long replicaCalls = ap.getReplicaCallCount();
Assert.assertTrue(replicaCalls >= 0);
Assert.assertTrue(replicaCalls <= 2);
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaPartialReplicaCall() throws Exception {
// One server is slow, so the result for its region comes from replica, whereas
// the result for other region comes from primary before replica calls happen.
// There should be no replica call for that region at all.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
ap.setPrimaryCallDelay(sn2, 2000);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
Assert.assertEquals(1, ap.getReplicaCallCount());
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
// Main calls fail before replica calls can start - this is currently not handled.
// It would probably never happen if we can get location (due to retries),
// and it would require additional synchronization.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
ap.addFailures(hri1, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
// Main calls fails after replica calls start. For two-replica region, one replica call
// also fails. Regardless, we get replica results for both regions.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
ap.addFailures(hri1, hri1r2, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testReplicaAllCallsFailForOneRegion() throws Exception {
// For one of the region, all 3, main and replica, calls fail. For the other, replica
// call fails but its exception should not be visible as it did succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
// We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
Assert.assertEquals(3, ars.getErrors().getNumExceptions());
for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
}
}
项目:ditb
文件:TestAsyncProcess.java
private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
Object[] actual = ars.getResults();
Assert.assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; ++i) {
Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
}
}
项目:ditb
文件:TestAsyncProcess.java
private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
Object[] actuals = ars.getResults();
Assert.assertEquals(expecteds.length, actuals.length);
for (int i = 0; i < expecteds.length; ++i) {
Object actual = actuals[i];
RR expected = expecteds[i];
Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
if (expected != RR.FAILED && expected != RR.DONT_CARE) {
Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
}
}
}
项目:pbase
文件:HTable.java
/**
* {@inheritDoc}
*/
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:pbase
文件:TestAsyncProcess.java
@Override
public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
boolean atLeastOne, Callback<Res> callback, boolean needResults)
throws InterruptedIOException {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
项目:pbase
文件:TestAsyncProcess.java
private void doHTableFailedPut(boolean bufferOn) throws Exception {
ClusterConnection conn = createHConnection();
HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE));
MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true);
ht.mutator.ap = ap;
if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L);
} else {
ht.setWriteBufferSize(0L);
}
Put put = createPut(1, false);
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
try {
ht.put(put);
if (bufferOn) {
ht.flushCommits();
}
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
if (someReqs.getResults().length == 0) continue;
Assert.assertTrue(ars == null);
ars = someReqs;
}
Assert.assertTrue(ars != null);
verifyResult(ars, false);
// This should not raise any exception, puts have been 'received' before by the catch.
ht.close();
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaReplicaSuccess() throws Exception {
// Main call takes too long so replicas succeed, except for one region w/o replicas.
// One region has no replica, so the main call succeeds for it.
MyAsyncProcessWithReplicas ap = createReplicaAp(10, 1000, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE, RR.FALSE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaPrimarySuccessWoReplicaCalls() throws Exception {
// Main call succeeds before replica calls are kicked off.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 10, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2, DUMMY_BYTES_3);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[3]);
verifyReplicaResult(ars, RR.FALSE, RR.FALSE, RR.FALSE);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaParallelCallsSucceed() throws Exception {
// Either main or replica can succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 0, 0);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.DONT_CARE, RR.DONT_CARE);
long replicaCalls = ap.getReplicaCallCount();
Assert.assertTrue(replicaCalls >= 0);
Assert.assertTrue(replicaCalls <= 2);
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaPartialReplicaCall() throws Exception {
// One server is slow, so the result for its region comes from replica, whereas
// the result for other region comes from primary before replica calls happen.
// There should be no replica call for that region at all.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0);
ap.setPrimaryCallDelay(sn2, 2000);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FALSE, RR.TRUE);
Assert.assertEquals(1, ap.getReplicaCallCount());
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaMainFailsBeforeReplicaCalls() throws Exception {
// Main calls fail before replica calls can start - this is currently not handled.
// It would probably never happen if we can get location (due to retries),
// and it would require additional synchronization.
MyAsyncProcessWithReplicas ap = createReplicaAp(1000, 0, 0, 1);
ap.addFailures(hri1, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FAILED);
Assert.assertEquals(0, ap.getReplicaCallCount());
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaReplicaSuccessWithParallelFailures() throws Exception {
// Main calls fails after replica calls start. For two-replica region, one replica call
// also fails. Regardless, we get replica results for both regions.
MyAsyncProcessWithReplicas ap = createReplicaAp(0, 1000, 1000, 1);
ap.addFailures(hri1, hri1r2, hri2);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.TRUE, RR.TRUE);
Assert.assertEquals(2, ap.getReplicaCallCount());
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testReplicaAllCallsFailForOneRegion() throws Exception {
// For one of the region, all 3, main and replica, calls fail. For the other, replica
// call fails but its exception should not be visible as it did succeed.
MyAsyncProcessWithReplicas ap = createReplicaAp(500, 1000, 0, 1);
ap.addFailures(hri1, hri1r1, hri1r2, hri2r1);
List<Get> rows = makeTimelineGets(DUMMY_BYTES_1, DUMMY_BYTES_2);
AsyncRequestFuture ars = ap.submitAll(DUMMY_TABLE, rows, null, new Object[2]);
verifyReplicaResult(ars, RR.FAILED, RR.FALSE);
// We should get 3 exceptions, for main + 2 replicas for DUMMY_BYTES_1
Assert.assertEquals(3, ars.getErrors().getNumExceptions());
for (int i = 0; i < ars.getErrors().getNumExceptions(); ++i) {
Assert.assertArrayEquals(DUMMY_BYTES_1, ars.getErrors().getRow(i).getRow());
}
}
项目:pbase
文件:TestAsyncProcess.java
private void verifyResult(AsyncRequestFuture ars, boolean... expected) throws Exception {
Object[] actual = ars.getResults();
Assert.assertEquals(expected.length, actual.length);
for (int i = 0; i < expected.length; ++i) {
Assert.assertEquals(expected[i], !(actual[i] instanceof Throwable));
}
}
项目:pbase
文件:TestAsyncProcess.java
private void verifyReplicaResult(AsyncRequestFuture ars, RR... expecteds) throws Exception {
Object[] actuals = ars.getResults();
Assert.assertEquals(expecteds.length, actuals.length);
for (int i = 0; i < expecteds.length; ++i) {
Object actual = actuals[i];
RR expected = expecteds[i];
Assert.assertEquals(actual.toString(), expected == RR.FAILED, actual instanceof Throwable);
if (expected != RR.FAILED && expected != RR.DONT_CARE) {
Assert.assertEquals(expected == RR.TRUE, ((Result)actual).isStale());
}
}
}
项目:PyroDB
文件:HTable.java
/**
* {@inheritDoc}
*/
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}