Java 类org.apache.hadoop.hbase.client.coprocessor.Batch 实例源码
项目:ditb
文件:MultiHConnection.java
/**
* Randomly pick a connection and process the batch of actions for a given table
* @param actions the actions
* @param tableName table name
* @param results the results array
* @param callback
* @throws IOException
*/
@SuppressWarnings("deprecation")
public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName,
Object[] results, Batch.Callback<R> callback) throws IOException {
// Currently used by RegionStateStore
// A deprecated method is used as multiple threads accessing RegionStateStore do a single put
// and htable is not thread safe. Alternative would be to create an Htable instance for each
// put but that is not very efficient.
// See HBASE-11610 for more details.
try {
hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback(
actions, tableName, this.batchPool, results, callback);
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
}
项目:ditb
文件:TestCoprocessorTableEndpoint.java
private static Map<byte [], Long> sum(final Table table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(ByteStringer.wrap(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(ByteStringer.wrap(qualifier));
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
});
}
项目:ditb
文件:TestCoprocessorEndpoint.java
private Map<byte [], Long> sum(final Table table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(ByteStringer.wrap(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(ByteStringer.wrap(qualifier));
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
});
}
项目:ditb
文件:TestServerCustomProtocol.java
private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.HelloResponse>();
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
if (send != null) builder.setName(send);
instance.hello(null, builder.build(), rpcCallback);
PingProtos.HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
}
});
}
项目:ditb
文件:TestServerCustomProtocol.java
private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.HelloResponse>();
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
// Call ping on same instance. Use result calling hello on same instance.
builder.setName(doPing(instance));
instance.hello(null, builder.build(), rpcCallback);
PingProtos.HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
}
});
}
项目:ditb
文件:TestServerCustomProtocol.java
private Map<byte [], String> noop(final Table table, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class, start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.NoopResponse>();
PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder();
instance.noop(null, builder.build(), rpcCallback);
rpcCallback.get();
// Looks like null is expected when void. That is what the test below is looking for
return null;
}
});
}
项目:ditb
文件:ConnectionManager.java
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:ditb
文件:HTable.java
/**
* {@inheritDoc}
*/
@Override
public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
throws ServiceException, Throwable {
final Map<byte[],R> results = Collections.synchronizedMap(
new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
@Override
public void update(byte[] region, byte[] row, R value) {
if (region != null) {
results.put(region, value);
}
}
});
return results;
}
项目:ditb
文件:AsyncProcess.java
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row,
Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
}
}
ars.sendMultiAction(actionsByServer, 1, null, false);
return ars;
}
项目:ditb
文件:AsyncProcess.java
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
* compatibility: it allows to be used with the batch interface that return an array of objects.
*
* @param pool ExecutorService to use.
* @param tableName name of the table for which the submission is made.
* @param rows the list of rows.
* @param callback the callback.
* @param results Optional array to return the results thru; backward compat.
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
Put put = (Put) r;
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
}
Action<Row> action = new Action<Row>(r, posInList);
setNonce(ng, r, action);
actions.add(action);
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
项目:ditb
文件:TestAsyncProcess.java
@Test
public void testSubmitWithCB() throws Exception {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
@Override
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
};
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1);
}
项目:LCIndex-HBase-0.94.16
文件:TestBulkDeleteProtocol.java
private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
final byte deleteType, final Long timeStamp) throws Throwable {
HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
long noOfDeletedRows = 0L;
Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable =
new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
return instance.delete(scan, deleteType, timeStamp, rowBatchSize);
}
};
Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
scan.getStartRow(), scan.getStopRow(), callable);
for (BulkDeleteResponse response : result.values()) {
noOfDeletedRows += response.getRowsDeleted();
}
return noOfDeletedRows;
}
项目:LCIndex-HBase-0.94.16
文件:TestServerCustomProtocol.java
@Test
public void testCompountCall() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[],String> results = table.coprocessorExec(PingProtocol.class,
ROW_A, ROW_C,
new Batch.Call<PingProtocol,String>() {
public String call(PingProtocol instance) {
return instance.hello(instance.ping());
}
});
verifyRegionResults(table, results, "Hello, pong", ROW_A);
verifyRegionResults(table, results, "Hello, pong", ROW_B);
verifyRegionResults(table, results, "Hello, pong", ROW_C);
}
项目:LCIndex-HBase-0.94.16
文件:TestServerCustomProtocol.java
@Test
public void testNullCall() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[],String> results = table.coprocessorExec(PingProtocol.class,
ROW_A, ROW_C,
new Batch.Call<PingProtocol,String>() {
public String call(PingProtocol instance) {
return instance.hello(null);
}
});
verifyRegionResults(table, results, "Who are you?", ROW_A);
verifyRegionResults(table, results, "Who are you?", ROW_B);
verifyRegionResults(table, results, "Who are you?", ROW_C);
}
项目:LCIndex-HBase-0.94.16
文件:TestServerCustomProtocol.java
@Test
public void testNullReturn() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[],String> results = table.coprocessorExec(PingProtocol.class,
ROW_A, ROW_C,
new Batch.Call<PingProtocol,String>(){
public String call(PingProtocol instance) {
return instance.hello("nobody");
}
});
verifyRegionResults(table, results, null, ROW_A);
verifyRegionResults(table, results, null, ROW_B);
verifyRegionResults(table, results, null, ROW_C);
}
项目:LCIndex-HBase-0.94.16
文件:TestServerCustomProtocol.java
@Test
public void testVoidReturnType() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Map<byte[],Object> results = table.coprocessorExec(PingProtocol.class,
ROW_A, ROW_C,
new Batch.Call<PingProtocol,Object>(){
public Object call(PingProtocol instance) {
instance.noop();
return null;
}
});
assertEquals("Should have results from three regions", 3, results.size());
// all results should be null
for (Object v : results.values()) {
assertNull(v);
}
}
项目:pbase
文件:MultiHConnection.java
/**
* Randomly pick a connection and process the batch of actions for a given table
* @param actions the actions
* @param tableName table name
* @param results the results array
* @param callback
* @throws IOException
* @throws InterruptedException
*/
@SuppressWarnings("deprecation")
public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName,
Object[] results, Batch.Callback<R> callback) throws IOException {
// Currently used by RegionStateStore
// A deprecated method is used as multiple threads accessing RegionStateStore do a single put
// and htable is not thread safe. Alternative would be to create an Htable instance for each
// put but that is not very efficient.
// See HBASE-11610 for more details.
try {
hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback(
actions, tableName, this.batchPool, results, callback);
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
}
项目:pbase
文件:TestCoprocessorEndpoint.java
private Map<byte [], Long> sum(final Table table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(ByteStringer.wrap(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(ByteStringer.wrap(qualifier));
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
});
}
项目:pbase
文件:TestServerCustomProtocol.java
private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.HelloResponse>();
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
if (send != null) builder.setName(send);
instance.hello(null, builder.build(), rpcCallback);
PingProtos.HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
}
});
}
项目:pbase
文件:TestServerCustomProtocol.java
private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.HelloResponse>();
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
// Call ping on same instance. Use result calling hello on same instance.
builder.setName(doPing(instance));
instance.hello(null, builder.build(), rpcCallback);
PingProtos.HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
}
});
}
项目:pbase
文件:ConnectionManager.java
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
*
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
AsyncRequestFuture ars = this.asyncProcess.submitAll(
pool, tableName, list, callback, results);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
}
项目:pbase
文件:HTable.java
/**
* {@inheritDoc}
*/
@Override
public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
throws ServiceException, Throwable {
final Map<byte[],R> results = Collections.synchronizedMap(
new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
@Override
public void update(byte[] region, byte[] row, R value) {
if (region != null) {
results.put(region, value);
}
}
});
return results;
}
项目:pbase
文件:AsyncProcess.java
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
int originalIndex = locationErrorRows.get(i);
Row row = retainedActions.get(originalIndex).getAction();
ars.manageError(originalIndex, row,
Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
}
}
ars.sendMultiAction(actionsByServer, 1, null, false);
return ars;
}
项目:pbase
文件:AsyncProcess.java
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
* compatibility: it allows to be used with the batch interface that return an array of objects.
*
* @param pool ExecutorService to use.
* @param tableName name of the table for which the submission is made.
* @param rows the list of rows.
* @param callback the callback.
* @param results Optional array to return the results thru; backward compat.
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
NonceGenerator ng = this.connection.getNonceGenerator();
for (Row r : rows) {
posInList++;
if (r instanceof Put) {
Put put = (Put) r;
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
}
Action<Row> action = new Action<Row>(r, posInList);
setNonce(ng, r, action);
actions.add(action);
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
项目:pbase
文件:TestAsyncProcess.java
@Test
public void testSubmitWithCB() throws Exception {
ClusterConnection hc = createHConnection();
final AtomicInteger updateCalled = new AtomicInteger(0);
Batch.Callback<Object> cb = new Batch.Callback<Object>() {
@Override
public void update(byte[] region, byte[] row, Object result) {
updateCalled.incrementAndGet();
}
};
AsyncProcess ap = new MyAsyncProcess(hc, conf);
List<Put> puts = new ArrayList<Put>();
puts.add(createPut(1, true));
final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false);
Assert.assertTrue(puts.isEmpty());
ars.waitUntilDone();
Assert.assertEquals(updateCalled.get(), 1);
}
项目:HIndex
文件:TestCoprocessorEndpoint.java
private Map<byte [], Long> sum(final HTable table, final byte [] family,
final byte [] qualifier, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
start, end,
new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
@Override
public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
throws IOException {
BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
ColumnAggregationProtos.SumRequest.Builder builder =
ColumnAggregationProtos.SumRequest.newBuilder();
builder.setFamily(HBaseZeroCopyByteString.wrap(family));
if (qualifier != null && qualifier.length > 0) {
builder.setQualifier(HBaseZeroCopyByteString.wrap(qualifier));
}
instance.sum(null, builder.build(), rpcCallback);
return rpcCallback.get().getSum();
}
});
}
项目:HIndex
文件:TestServerCustomProtocol.java
private Map<byte [], String> compoundOfHelloAndPing(final HTable table, final byte [] start,
final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class,
start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.HelloResponse>();
PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
// Call ping on same instance. Use result calling hello on same instance.
builder.setName(doPing(instance));
instance.hello(null, builder.build(), rpcCallback);
PingProtos.HelloResponse r = rpcCallback.get();
return r != null && r.hasResponse()? r.getResponse(): null;
}
});
}
项目:HIndex
文件:HConnectionManager.java
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
// To fulfill the original contract, we have a special callback. This callback
// will set the results in the Object array.
ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
// We're doing a submit all. This way, the originalIndex will match the initial list.
asyncProcess.submitAll(list);
asyncProcess.waitUntilDone();
if (asyncProcess.hasError()) {
throw asyncProcess.getErrors();
}
}
项目:ditb
文件:TestServerCustomProtocol.java
@Test
public void testSingleMethod() throws Throwable {
try (HTable table = new HTable(util.getConfiguration(), TEST_TABLE)) {
RegionLocator locator = table.getRegionLocator();
Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
null, ROW_A,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
new BlockingRpcCallback<PingProtos.PingResponse>();
instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
return rpcCallback.get().getPong();
}
});
// Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, ROW_A);
final String name = "NAME";
results = hello(table, name, null, ROW_A);
// Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
}
}
项目:ditb
文件:TestServerCustomProtocol.java
private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
throws ServiceException, Throwable {
return table.coprocessorService(PingProtos.PingService.class, start, end,
new Batch.Call<PingProtos.PingService, String>() {
@Override
public String call(PingProtos.PingService instance) throws IOException {
return doPing(instance);
}
});
}
项目:ditb
文件:VisibilityClient.java
/**
* @param connection the Connection instance to use.
* @param user
* @return labels, the given user is globally authorized for.
* @throws Throwable
*/
public static GetAuthsResponse getAuths(Connection connection, final String user)
throws Throwable {
try (Table table = connection.getTable(LABELS_TABLE_NAME)) {
Batch.Call<VisibilityLabelsService, GetAuthsResponse> callable =
new Batch.Call<VisibilityLabelsService, GetAuthsResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<GetAuthsResponse> rpcCallback =
new BlockingRpcCallback<GetAuthsResponse>();
public GetAuthsResponse call(VisibilityLabelsService service) throws IOException {
GetAuthsRequest.Builder getAuthReqBuilder = GetAuthsRequest.newBuilder();
getAuthReqBuilder.setUser(ByteStringer.wrap(Bytes.toBytes(user)));
service.getAuths(controller, getAuthReqBuilder.build(), rpcCallback);
GetAuthsResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response;
}
};
Map<byte[], GetAuthsResponse> result =
table.coprocessorService(VisibilityLabelsService.class,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, callable);
return result.values().iterator().next(); // There will be exactly one region for labels
// table and so one entry in result Map.
}
}
项目:ditb
文件:VisibilityClient.java
private static VisibilityLabelsResponse setOrClearAuths(Connection connection,
final String[] auths, final String user, final boolean setOrClear)
throws IOException, ServiceException, Throwable {
try (Table table = connection.getTable(LABELS_TABLE_NAME)) {
Batch.Call<VisibilityLabelsService, VisibilityLabelsResponse> callable =
new Batch.Call<VisibilityLabelsService, VisibilityLabelsResponse>() {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<VisibilityLabelsResponse> rpcCallback =
new BlockingRpcCallback<VisibilityLabelsResponse>();
public VisibilityLabelsResponse call(VisibilityLabelsService service) throws IOException {
SetAuthsRequest.Builder setAuthReqBuilder = SetAuthsRequest.newBuilder();
setAuthReqBuilder.setUser(ByteStringer.wrap(Bytes.toBytes(user)));
for (String auth : auths) {
if (auth.length() > 0) {
setAuthReqBuilder.addAuth(ByteStringer.wrap(Bytes.toBytes(auth)));
}
}
if (setOrClear) {
service.setAuths(controller, setAuthReqBuilder.build(), rpcCallback);
} else {
service.clearAuths(controller, setAuthReqBuilder.build(), rpcCallback);
}
VisibilityLabelsResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response;
}
};
Map<byte[], VisibilityLabelsResponse> result = table.coprocessorService(
VisibilityLabelsService.class, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY,
callable);
return result.values().iterator().next(); // There will be exactly one region for labels
// table and so one entry in result Map.
}
}
项目:ditb
文件:HConnection.java
/**
* Parameterized batch processing, allowing varying return types for different
* {@link Row} implementations.
* @deprecated since 0.96 - Use {@link HTableInterface#batchCallback} instead
*/
@Deprecated
public <R> void processBatchCallback(List<? extends Row> list,
final TableName tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException;
项目:ditb
文件:ConnectionManager.java
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
}
项目:ditb
文件:HTable.java
/**
* {@inheritDoc}
*/
@Override
public <R> void batchCallback(
final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
connection.processBatchCallback(actions, tableName, pool, results, callback);
}
项目:ditb
文件:HTable.java
/**
* {@inheritDoc}
* @deprecated If any exception is thrown by one of the actions, there is no way to
* retrieve the partially executed results. Use
* {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
* instead.
*/
@Deprecated
@Override
public <R> Object[] batchCallback(
final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
InterruptedException {
Object[] results = new Object[actions.size()];
batchCallback(actions, results, callback);
return results;
}
项目:ditb
文件:HTablePool.java
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
checkState();
return table.coprocessorService(service, startKey, endKey, callable);
}
项目:ditb
文件:HTablePool.java
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Callback<R> callback)
throws ServiceException, Throwable {
checkState();
table.coprocessorService(service, startKey, endKey, callable, callback);
}
项目:ditb
文件:AsyncProcess.java
@VisibleForTesting
/** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
return new AsyncRequestFutureImpl<CResult>(
tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
}
项目:ditb
文件:TestAsyncProcess.java
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<Res> callback, Object[] results, boolean needResults) {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture(
DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults);
allReqs.add(r);
callsCt.incrementAndGet();
return r;
}