@Override public List<Row> getActions() throws FlumeException { List<Row> actions = new LinkedList<Row>(); if (plCol != null) { byte[] rowKey; try { if (keyType == KeyType.TS) { rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); } else if (keyType == KeyType.RANDOM) { rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); } else if (keyType == KeyType.TSNANO) { rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); } else { rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); } Put put = new Put(rowKey); put.add(cf, plCol, payload); actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; }
/** * Method perform a HBase batch operation. */ public void call() throws Exception { List<Row> rowList = new ArrayList<>(batchSize); final Object[] results = new Object[batchSize]; for ( T object : objectCollection ) { final Row row = objectMapper.apply(object); rowList.add(row); //reach batch limit size, flush index data to HBase if ( rowList.size() >= batchSize ) { table.batch(rowList, results); throwIfBatchFailed(results); rowList.clear(); } } //save remaining index data if ( !rowList.isEmpty() ) { final Object[] errors = new Object[rowList.size()]; table.batch(rowList, errors); throwIfBatchFailed(errors); } }
/** * Randomly pick a connection and process the batch of actions for a given table * @param actions the actions * @param tableName table name * @param results the results array * @param callback * @throws IOException */ @SuppressWarnings("deprecation") public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName, Object[] results, Batch.Callback<R> callback) throws IOException { // Currently used by RegionStateStore // A deprecated method is used as multiple threads accessing RegionStateStore do a single put // and htable is not thread safe. Alternative would be to create an Htable instance for each // put but that is not very efficient. // See HBASE-11610 for more details. try { hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback( actions, tableName, this.batchPool, results, callback); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } }
Row convertQualifiersToAliases(MTableDescriptor mTableDescriptor, final Row originalRow, NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyQualifierToAliasMap, int intForUniqueSignature) throws IOException { // Append, Delete, Get, Increment, Mutation, Put, RowMutations Class<?> originalRowClass = originalRow.getClass(); if (Append.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases( mTableDescriptor, (Append)originalRow, familyQualifierToAliasMap); } else if (Delete.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases(mTableDescriptor, (Delete)originalRow); } else if (Get.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases( mTableDescriptor, (Get)originalRow, familyQualifierToAliasMap); } else if (Increment.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases( mTableDescriptor, (Increment)originalRow, familyQualifierToAliasMap); } else if (Put.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases(mTableDescriptor, (Put)originalRow); } else if (RowMutations.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases(mTableDescriptor, (RowMutations)originalRow); } return null; }
private void testBatchProcessing(Table table) throws IOException, InterruptedException { List<Row> actions = new LinkedList<>(); actions.add(new Append(ROW_ID_02) .add(CF01, COLQUALIFIER03, Bytes.toBytes("appendedStringViaBatch"))); actions.add(new Delete(ROW_ID_03).addColumn(CF01, COLQUALIFIER04)); actions.add(new Increment(ROW_ID_02).addColumn(CF01, COLQUALIFIER05, 14)); actions.add(new Put(ROW_ID_05). addColumn(CF01, COLQUALIFIER04, TABLE_PUT_WITH_LIST). addColumn(CF02, COLQUALIFIER02, TABLE_PUT_WITH_LIST)); actions.add(new Get(ROW_ID_01).addColumn(CF01, COLQUALIFIER02)); Object[] returnedObjects = new Object[actions.size()]; table.batch(actions, returnedObjects); int index = 0; for (Object returnedObject : returnedObjects) { assertTrue("Table#batch action failed for " + actions.get(index).getClass().getSimpleName(), returnedObject != null); if (Get.class.isAssignableFrom(actions.get(index).getClass())) { Result resultFromGet = (Result)returnedObject; assertTrue("Table#batch Get action returned unexpected Result: expected <" + Bytes.toString(TABLE_PUT_WITH_LIST) + ">, returned <" + Bytes.toString(resultFromGet.getValue(CF01, COLQUALIFIER02)) + ">", Bytes.equals(TABLE_PUT_WITH_LIST, resultFromGet.getValue(CF01, COLQUALIFIER02))); } index++; } }
public void updateProfileCountsForSaleInHBase(Long buyerId, Long sellerId, ItemSaleEvent event) throws IOException, InterruptedException { HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE); ArrayList<Row> actions = new ArrayList<Row>(); Increment buyerValueIncrement = new Increment(generateProfileRowKey(buyerId)); buyerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, event.getItemValue()); buyerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, event.getItemValue()); actions.add(buyerValueIncrement); Increment sellerValueIncrement = new Increment(generateProfileRowKey(sellerId)); sellerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, event.getItemValue()); sellerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, event.getItemValue()); actions.add(sellerValueIncrement); profileTable.batch(actions); }
public void logInProfileInHBase(long userId, String ipAddress) throws IOException, Exception { HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE); ArrayList<Row> actions = new ArrayList<Row>(); byte[] profileRowKey = generateProfileRowKey(userId); Delete delete = new Delete(profileRowKey); delete.deleteColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL); delete.deleteColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL); actions.add(delete); Increment increment = new Increment(profileRowKey); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1); actions.add(increment); Put put = new Put(profileRowKey); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis())); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress)); actions.add(put); profileTable.batch(actions); }
@Override public void createProfile(long userId, ProfilePojo pojo, String ipAddress) throws Exception { HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE); ArrayList<Row> actions = new ArrayList<Row>(); byte[] rowKey = generateProfileRowKey(userId); Put put = new Put(rowKey); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.FIXED_INFO_COL, Bytes.toBytes(pojo.getUsername() + "|" + pojo.getAge() + "|" + System.currentTimeMillis())); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress)); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis())); actions.add(put); Increment increment = new Increment(rowKey); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_SELLS_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_PURCHASES_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_PURCHASES_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, 0); actions.add(increment); profileTable.batch(actions); }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions * @throws IOException */ protected void batch(byte[] tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } HTableInterface table = null; try { table = this.sharedHtableCon.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows); this.metrics.appliedOpsRate.inc(rows.size()); } } catch (InterruptedException ix) { throw new IOException(ix); } finally { if (table != null) { table.close(); } } }
@Override public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { List<Result> results = new ArrayList<Result>(); for (Row r : actions) { if (r instanceof Delete) { delete((Delete) r); continue; } if (r instanceof Put) { put((Put) r); continue; } if (r instanceof Get) { results.add(get((Get) r)); } } return results.toArray(); }
@Override public ResultScanner getScanner(Scan scan) throws IOException { LOG.trace("getScanner(Scan)"); ReadRowsRequest.Builder request = scanAdapter.adapt(scan); metadataSetter.setMetadata(request); try { com.google.cloud.bigtable.grpc.ResultScanner<com.google.bigtable.v1.Row> scanner = client.readRows(request.build()); return bigtableResultScannerAdapter.adapt(scanner); } catch (Throwable throwable) { LOG.error("Encountered exception when executing getScanner.", throwable); throw new IOException( makeGenericExceptionMessage( "getScanner", options.getProjectId(), tableName.getQualifierAsString()), throwable); } }
public BatchExecutor( BigtableClient client, BigtableOptions options, TableMetadataSetter tableMetadataSetter, ListeningExecutorService service, OperationAdapter<Get, ReadRowsRequest.Builder> getAdapter, OperationAdapter<Put, MutateRowRequest.Builder> putAdapter, OperationAdapter<Delete, MutateRowRequest.Builder> deleteAdapter, RowMutationsAdapter rowMutationsAdapter, AppendAdapter appendAdapter, IncrementAdapter incrementAdapter, ResponseAdapter<com.google.bigtable.v1.Row, Result> rowToResultAdapter) { this.client = client; this.options = options; this.tableMetadataSetter = tableMetadataSetter; this.service = service; this.getAdapter = getAdapter; this.putAdapter = putAdapter; this.deleteAdapter = deleteAdapter; this.rowMutationsAdapter = rowMutationsAdapter; this.appendAdapter = appendAdapter; this.incrementAdapter = incrementAdapter; this.rowToResultAdapter = rowToResultAdapter; rowResultConverter = new RowResultConverter(rowToResultAdapter); }
ListenableFuture<? extends GeneratedMessage> issueRequest(Row row) { if (row instanceof Put) { return issuePutRequest((Put) row); } else if (row instanceof Delete) { return issueDeleteRequest((Delete) row); } else if (row instanceof Append) { return issueAppendRequest((Append) row); } else if (row instanceof Increment) { return issueIncrementRequest((Increment) row); } else if (row instanceof Get) { return issueGetRequest((Get) row); } else if (row instanceof RowMutations) { return issueRowMutationsRequest((RowMutations) row); } LOG.error("Encountered unknown action type %s", row.getClass()); return Futures.immediateFailedFuture( new IllegalArgumentException("Encountered unknown action type: " + row.getClass())); }
/** * Implementation of * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Batch.Callback)} */ public <R> Object[] batchCallback( List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException { LOG.trace("batchCallback(List<>, Batch.Callback)"); Result[] results = new Result[actions.size()]; int index = 0; List<ListenableFuture<Object>> resultFutures = new ArrayList<>(actions.size()); for (Row row : actions) { resultFutures.add(issueRowRequest(row, callback, results, index++)); } try { Futures.allAsList(resultFutures).get(); } catch (ExecutionException e) { LOG.error("Encountered exception in batchCallback(List<>, Batch.Callback). ", e); throw new IOException("batchCallback error", e); } return results; }
/** * Implementation of * {@link org.apache.hadoop.hbase.client.HTable#batchCallback(List, Object[], Batch.Callback)} */ public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { LOG.trace("batchCallback(List<>, Object[], Batch.Callback)"); Preconditions.checkArgument(results.length == actions.size(), "Result array must be the same length as actions."); int index = 0; List<ListenableFuture<Object>> resultFutures = new ArrayList<>(actions.size()); for (Row row : actions) { resultFutures.add(issueRowRequest(row, callback, results, index++)); } try { // Don't want to throw an exception for failed futures, instead the place in results is // set to null. Futures.successfulAsList(resultFutures).get(); } catch (ExecutionException e) { LOG.error("Encountered exception in batchCallback(List<>, Object[], Batch.Callback). ", e); throw new IOException("batchCallback error", e); } }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions * @throws IOException */ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } Table table = null; try { table = this.sharedHtableCon.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows); } } catch (InterruptedException ix) { throw (InterruptedIOException)new InterruptedIOException().initCause(ix); } finally { if (table != null) { table.close(); } } }
/** * Randomly pick a connection and process the batch of actions for a given table * @param actions the actions * @param tableName table name * @param results the results array * @param callback * @throws IOException * @throws InterruptedException */ @SuppressWarnings("deprecation") public <R> void processBatchCallback(List<? extends Row> actions, TableName tableName, Object[] results, Batch.Callback<R> callback) throws IOException { // Currently used by RegionStateStore // A deprecated method is used as multiple threads accessing RegionStateStore do a single put // and htable is not thread safe. Alternative would be to create an Htable instance for each // put but that is not very efficient. // See HBASE-11610 for more details. try { hConnections[ThreadLocalRandom.current().nextInt(noOfConnections)].processBatchCallback( actions, tableName, this.batchPool, results, callback); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions * @throws IOException */ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } HTableInterface table = null; try { table = this.sharedHtableCon.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows); } } catch (InterruptedException ix) { throw (InterruptedIOException)new InterruptedIOException().initCause(ix); } finally { if (table != null) { table.close(); } } }
public static void handleHBaseException( RetriesExhaustedWithDetailsException rex, Record record, Map<String, Record> rowKeyToRecord, ErrorRecordHandler errorRecordHandler ) throws StageException { for (int i = 0; i < rex.getNumExceptions(); i++) { if (rex.getCause(i) instanceof NoSuchColumnFamilyException) { Row r = rex.getRow(i); Record errorRecord = record != null ? record : rowKeyToRecord.get(Bytes.toString(r.getRow())); OnRecordErrorException exception = new OnRecordErrorException(errorRecord, Errors.HBASE_10, getErrorDescription(rex.getCause(i), r, rex.getHostnamePort(i))); errorRecordHandler.onError(exception); } else { // If at least 1 non NoSuchColumnFamilyException exception, // consider as stage exception throw new StageException(Errors.HBASE_02, rex); } } }
private static void demonstrateBatchPut(Connection connection) throws IOException, InterruptedException { List<Row> batch = new ArrayList<Row>(); Put put1 = new Put(Bytes.toBytes("elton|jericho|201512")); put1.addColumn(Bytes.toBytes("t"), Bytes.toBytes("0109"), Bytes.toBytes("670")); batch.add(put1); Put put2 = new Put(Bytes.toBytes("elton|jericho|201601")); put2.addColumn(Bytes.toBytes("t"), Bytes.toBytes("0110"), Bytes.toBytes("110")); batch.add(put2); Put put3 = new Put(Bytes.toBytes("elton|jericho|201602")); put3.addColumn(Bytes.toBytes("t"), Bytes.toBytes("0206"), Bytes.toBytes("500")); batch.add(put3); Table access_logs = connection.getTable(TableName.valueOf("access-logs")); Object[] results = new Object[batch.size()]; access_logs.batch(batch, results); }
/** * @param actions * @deprecated */ @Deprecated @Override public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { List<Result> results = new ArrayList<Result>(); for (Row r : actions) { if (r instanceof Delete) { delete((Delete) r); continue; } if (r instanceof Put) { put((Put) r); continue; } if (r instanceof Get) { results.add(get((Get) r)); } } return results.toArray(); }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions * @throws IOException */ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } Table table = null; try { Connection connection = getConnection(); table = connection.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows, null); } } catch (RetriesExhaustedWithDetailsException rewde) { for (Throwable ex : rewde.getCauses()) { if (ex instanceof TableNotFoundException) { throw new TableNotFoundException("'"+tableName+"'"); } } } catch (InterruptedException ix) { throw (InterruptedIOException) new InterruptedIOException().initCause(ix); } finally { if (table != null) { table.close(); } } }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions * @throws IOException */ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } HTableInterface table = null; try { table = this.sharedHtableCon.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows); } } catch (InterruptedException ix) { throw new IOException(ix); } finally { if (table != null) { table.close(); } } }
@Override public Status process() throws EventDeliveryException { Status status = Status.READY; Channel channel = getChannel(); Transaction txn = channel.getTransaction(); List<Row> actions = new LinkedList<Row>(); List<Increment> incs = new LinkedList<Increment>(); txn.begin(); for(long i = 0; i < batchSize; i++) { Event event = channel.take(); if(event == null){ status = Status.BACKOFF; counterGroup.incrementAndGet("channel.underflow"); break; } else { serializer.initialize(event, columnFamily); actions.addAll(serializer.getActions()); incs.addAll(serializer.getIncrements()); } } putEventsAndCommit(actions, incs, txn); return status; }
@Override public List<Row> getActions() throws FlumeException { List<Row> actions = new LinkedList<Row>(); if(plCol != null){ byte[] rowKey; try { if (keyType == KeyType.TS) { rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); } else if(keyType == KeyType.RANDOM) { rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); } else if(keyType == KeyType.TSNANO) { rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); } else { rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); } Put put = new Put(rowKey); put.add(cf, plCol, payload); actions.add(put); } catch (Exception e){ throw new FlumeException("Could not get row key!", e); } } return actions; }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param rows list of actions * @throws IOException */ private void batch(byte[] tableName, List<Row> rows) throws IOException { if (rows.isEmpty()) { return; } HTableInterface table = null; try { table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool); table.batch(rows); this.metrics.appliedOpsRate.inc(rows.size()); } catch (InterruptedException ix) { throw new IOException(ix); } finally { if (table != null) { table.close(); } } }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param rows list of actions * @throws IOException */ private void batch(byte[] tableName, List<Row> rows) throws IOException { if (rows.isEmpty()) { return; } HTableInterface table = null; try { table = new HTable(tableName, this.sharedHtableCon, this.sharedThreadPool); table.batch(rows); } catch (InterruptedException ix) { throw new IOException(ix); } finally { if (table != null) { table.close(); } } }
/** * Wraps a HBase batch call. Gets the Table for this table, calls batch then * closes the Table */ void doBatch(final List<? extends Row> actions, final Object[] results) { final Table tableInterface = getTable(); try { tableInterface.batch(actions, results); } catch (final Exception e) { closeTable(tableInterface); throw new HBaseException(e.getMessage(), e); } finally { closeTable(tableInterface); } }
@Override public List<Row> getActions() throws FlumeException { List<Row> actions = Lists.newArrayList(); byte[] rowKey; Matcher m = inputPattern.matcher(new String(payload, charset)); if (!m.matches()) { return Lists.newArrayList(); } if (m.groupCount() != colNames.size()) { return Lists.newArrayList(); } try { if (rowKeyIndex < 0) { rowKey = getRowKey(); } else { rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8); } Put put = new Put(rowKey); for (int i = 0; i < colNames.size(); i++) { if (i != rowKeyIndex) { put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8)); } } if (depositHeaders) { for (Map.Entry<String, String> entry : headers.entrySet()) { put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset)); } } actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } return actions; }
@Override public List<Row> getActions() throws FlumeException { if (throwException) { throw new FlumeException("Exception for testing"); } return super.getActions(); }
@Test /** Ensure that when no config is specified, the a catch-all regex is used * with default column name. */ public void testDefaultBehavior() throws Exception { RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); Context context = new Context(); s.configure(context); String logMsg = "The sky is falling!"; Event e = EventBuilder.withBody(Bytes.toBytes(logMsg)); s.initialize(e, "CF".getBytes()); List<Row> actions = s.getActions(); assertTrue(actions.size() == 1); assertTrue(actions.get(0) instanceof Put); Put put = (Put) actions.get(0); assertTrue(put.getFamilyMap().containsKey(s.cf)); List<KeyValue> kvPairs = put.getFamilyMap().get(s.cf); assertTrue(kvPairs.size() == 1); Map<String, String> resultMap = Maps.newHashMap(); for (KeyValue kv : kvPairs) { resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); } assertTrue(resultMap.containsKey( RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT)); assertEquals("The sky is falling!", resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT)); }
/** * Do the changes and handle the pool * @param tableName table to insert into * @param allRows list of actions * @throws IOException */ protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException { if (allRows.isEmpty()) { return; } Table table = null; try { // See https://en.wikipedia.org/wiki/Double-checked_locking Connection connection = this.sharedHtableCon; if (connection == null) { synchronized (sharedHtableConLock) { connection = this.sharedHtableCon; if (connection == null) { connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf); } } } table = connection.getTable(tableName); for (List<Row> rows : allRows) { table.batch(rows); } } catch (InterruptedException ix) { throw (InterruptedIOException)new InterruptedIOException().initCause(ix); } finally { if (table != null) { table.close(); } } }
@Override @Deprecated public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { Object[] results = new Object[actions.size()]; batch(actions, results); return results; }