/** * Builds a single {@link Increment} object for a row, with one-many cell * increments in that row * * @param rowKey The rowKey of the row to be updated * @param cells A list of objects containing the column qualifier and cell * increment value * @return The completed {@link Increment} object */ private Increment createIncrementOperation(final RowKey rowKey, final List<CountCellIncrementHolder> cells) { LOGGER.trace(() -> String.format("createIncrementOperation called for rowKey: %s with cell count %s", rowKey.toString(), cells.size())); final Increment increment = new Increment(rowKey.asByteArray()); // TODO HBase 2.0 has Increment.setReturnResults to allow you to prevent // the return of the new // value to improve performance. In our case we don't care about the new // value so when we // upgrade to HBase 2.0 we need to add this line in. // increment.setReturnResults(false); //if we have multiple CCIHs for the same rowKey/colQual then hbase seems to only process one of them //Due to the way the data is passed through to this method we should not get multiple increments for the //same rowKey/colQual so we will not check for it due to the cost of doing that. for (final CountCellIncrementHolder cell : cells) { increment.addColumn(EventStoreColumnFamily.COUNTS.asByteArray(), cell.getColumnQualifier().getBytes(), cell.getCellIncrementValue()); } return increment; }
public static Increment incrementFromThrift(TIncrement in) throws IOException { Increment out = new Increment(in.getRow()); for (TColumnIncrement column : in.getColumns()) { out.addColumn(column.getFamily(), column.getQualifier(), column.getAmount()); } if (in.isSetAttributes()) { addAttributes(out,in.getAttributes()); } if (in.isSetDurability()) { out.setDurability(durabilityFromThrift(in.getDurability())); } if(in.getCellVisibility() != null) { out.setCellVisibility(new CellVisibility(in.getCellVisibility().getExpression())); } return out; }
@Override public void increment(TIncrement tincrement) throws IOError, TException { if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) { throw new TException("Must supply a table and a row key; can't increment"); } if (conf.getBoolean(COALESCE_INC_KEY, false)) { this.coalescer.queueIncrement(tincrement); return; } Table table = null; try { table = getTable(tincrement.getTable()); Increment inc = ThriftUtilities.incrementFromThrift(tincrement); table.increment(inc); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(Throwables.getStackTraceAsString(e)); } finally{ closeTable(table); } }
@Override public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c, final Increment increment) throws IOException { if (increment.getAttribute(CHECK_COVERING_PERM) != null) { // We had failure with table, cf and q perm checks and now giving a chance for cell // perm check TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); AuthResult authResult = null; if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(), increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) { authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set", getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap()); } else { authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set", getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap()); } logResult(authResult); if (authorizationEnabled && !authResult.isAllowed()) { throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString()); } } return null; }
private void verifyUserDeniedForIncrementMultipleVersions(final User user, final byte[] row, final byte[] q1) throws IOException, InterruptedException { user.runAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { try (Connection connection = ConnectionFactory.createConnection(conf)) { try (Table t = connection.getTable(TEST_TABLE.getTableName())) { Increment inc = new Increment(row); inc.setTimeRange(0, 127); inc.addColumn(TEST_FAMILY1, q1, 2L); t.increment(inc); fail(user.getShortName() + " cannot do the increment."); } catch (Exception e) { } } return null; } }); }
@Test (timeout=300000) public void testIncrementHook() throws IOException { TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook"); Table table = util.createTable(tableName, new byte[][] {A, B, C}); try { Increment inc = new Increment(Bytes.toBytes(0)); inc.addColumn(A, A, 1); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"}, tableName, new Boolean[] {false, false, false} ); table.increment(inc); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreIncrement", "hadPostIncrement", "hadPreIncrementAfterRowLock"}, tableName, new Boolean[] {true, true, true} ); } finally { util.deleteTable(tableName); table.close(); } }
@Test public void testIncrWithReadOnlyTable() throws Exception { byte[] TABLE = Bytes.toBytes("readOnlyTable"); this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily")); boolean exceptionCaught = false; Increment inc = new Increment(Bytes.toBytes("somerow")); inc.setDurability(Durability.SKIP_WAL); inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L); try { region.increment(inc); } catch (IOException e) { exceptionCaught = true; } finally { HRegion.closeHRegion(this.region); this.region = null; } assertTrue(exceptionCaught == true); }
@Test public void testIncrementTimestampsAreMonotonic() throws IOException { HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); edge.setValue(10); Increment inc = new Increment(row); inc.setDurability(Durability.SKIP_WAL); inc.addColumn(fam1, qual1, 1L); region.increment(inc); Result result = region.get(new Get(row)); Cell c = result.getColumnLatestCell(fam1, qual1); assertNotNull(c); assertEquals(c.getTimestamp(), 10L); edge.setValue(1); // clock goes back region.increment(inc); result = region.get(new Get(row)); c = result.getColumnLatestCell(fam1, qual1); assertEquals(c.getTimestamp(), 10L); assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L); }
@Test public void testIncrementWithReturnResultsSetToFalse() throws Exception { byte[] row1 = Bytes.toBytes("row1"); byte[] col1 = Bytes.toBytes("col1"); // Setting up region final WALFactory wals = new WALFactory(CONF, null, "testIncrementWithReturnResultsSetToFalse"); byte[] tableName = Bytes.toBytes("testIncrementWithReturnResultsSetToFalse"); final WAL wal = wals.getWAL(tableName); HRegion region = createHRegion(tableName, "increment", wal, Durability.USE_DEFAULT); Increment inc1 = new Increment(row1); inc1.setReturnResults(false); inc1.addColumn(FAMILY, col1, 1); Result res = region.increment(inc1); assertNull(res); }
@Override public long incrementColumnValue(byte[] rowId, byte[] colFamily, byte[] colQualifier, long l) throws IOException { // ColumnManager validation Increment increment = null; if (includedInRepositoryProcessing) { increment = new Increment(rowId).addColumn(colFamily, colQualifier, l); if (mTableDescriptor.hasColDescriptorWithColDefinitionsEnforced()) { repository.validateColumns(mTableDescriptor, increment); } } // Standard HBase processing (with aliasing, if necessary) long returnedLong; if (includedInRepositoryProcessing && mTableDescriptor.hasColDescriptorWithColAliasesEnabled()) { returnedLong = wrappedTable.incrementColumnValue(rowId, colFamily, repository.getAlias(mTableDescriptor, colFamily, colQualifier), l); } else { returnedLong = wrappedTable.incrementColumnValue(rowId, colFamily, colQualifier, l); } // ColumnManager auditing if (includedInRepositoryProcessing) { repository.putColumnAuditorSchemaEntities(mTableDescriptor, increment); } return returnedLong; }
NavigableMap<byte[], NavigableMap<byte[], byte[]>> getFamilyQualifierToAliasMap( MTableDescriptor mTableDescriptor, Mutation mutation) throws IOException { NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyQualifierToAliasMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); Class<?> mutationClass = mutation.getClass(); if (Append.class.isAssignableFrom(mutationClass)) { familyQualifierToAliasMap = getFamilyQualifierToAliasMap(mTableDescriptor, (Append)mutation); } else if (Increment.class.isAssignableFrom(mutationClass)) { familyQualifierToAliasMap = getFamilyQualifierToAliasMap(mTableDescriptor, (Increment)mutation); } else if (Delete.class.isAssignableFrom(mutationClass) || Put.class.isAssignableFrom(mutationClass) || RowMutations.class.isAssignableFrom(mutationClass)) { // ignore: familyQualifierToAliasMap not passed to alias-processing for these mutation-types } return familyQualifierToAliasMap; }
Row convertQualifiersToAliases(MTableDescriptor mTableDescriptor, final Row originalRow, NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyQualifierToAliasMap, int intForUniqueSignature) throws IOException { // Append, Delete, Get, Increment, Mutation, Put, RowMutations Class<?> originalRowClass = originalRow.getClass(); if (Append.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases( mTableDescriptor, (Append)originalRow, familyQualifierToAliasMap); } else if (Delete.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases(mTableDescriptor, (Delete)originalRow); } else if (Get.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases( mTableDescriptor, (Get)originalRow, familyQualifierToAliasMap); } else if (Increment.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases( mTableDescriptor, (Increment)originalRow, familyQualifierToAliasMap); } else if (Put.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases(mTableDescriptor, (Put)originalRow); } else if (RowMutations.class.isAssignableFrom(originalRowClass)) { return convertQualifiersToAliases(mTableDescriptor, (RowMutations)originalRow); } return null; }
private void testBatchProcessing(Table table) throws IOException, InterruptedException { List<Row> actions = new LinkedList<>(); actions.add(new Append(ROW_ID_02) .add(CF01, COLQUALIFIER03, Bytes.toBytes("appendedStringViaBatch"))); actions.add(new Delete(ROW_ID_03).addColumn(CF01, COLQUALIFIER04)); actions.add(new Increment(ROW_ID_02).addColumn(CF01, COLQUALIFIER05, 14)); actions.add(new Put(ROW_ID_05). addColumn(CF01, COLQUALIFIER04, TABLE_PUT_WITH_LIST). addColumn(CF02, COLQUALIFIER02, TABLE_PUT_WITH_LIST)); actions.add(new Get(ROW_ID_01).addColumn(CF01, COLQUALIFIER02)); Object[] returnedObjects = new Object[actions.size()]; table.batch(actions, returnedObjects); int index = 0; for (Object returnedObject : returnedObjects) { assertTrue("Table#batch action failed for " + actions.get(index).getClass().getSimpleName(), returnedObject != null); if (Get.class.isAssignableFrom(actions.get(index).getClass())) { Result resultFromGet = (Result)returnedObject; assertTrue("Table#batch Get action returned unexpected Result: expected <" + Bytes.toString(TABLE_PUT_WITH_LIST) + ">, returned <" + Bytes.toString(resultFromGet.getValue(CF01, COLQUALIFIER02)) + ">", Bytes.equals(TABLE_PUT_WITH_LIST, resultFromGet.getValue(CF01, COLQUALIFIER02))); } index++; } }
public void updateProfileCountsForSaleInHBase(Long buyerId, Long sellerId, ItemSaleEvent event) throws IOException, InterruptedException { HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE); ArrayList<Row> actions = new ArrayList<Row>(); Increment buyerValueIncrement = new Increment(generateProfileRowKey(buyerId)); buyerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, event.getItemValue()); buyerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, event.getItemValue()); actions.add(buyerValueIncrement); Increment sellerValueIncrement = new Increment(generateProfileRowKey(sellerId)); sellerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, event.getItemValue()); sellerValueIncrement.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, event.getItemValue()); actions.add(sellerValueIncrement); profileTable.batch(actions); }
public void logInProfileInHBase(long userId, String ipAddress) throws IOException, Exception { HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE); ArrayList<Row> actions = new ArrayList<Row>(); byte[] profileRowKey = generateProfileRowKey(userId); Delete delete = new Delete(profileRowKey); delete.deleteColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL); delete.deleteColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL); actions.add(delete); Increment increment = new Increment(profileRowKey); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1); actions.add(increment); Put put = new Put(profileRowKey); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis())); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress)); actions.add(put); profileTable.batch(actions); }
@Override public void createProfile(long userId, ProfilePojo pojo, String ipAddress) throws Exception { HTableInterface profileTable = hTablePool.getTable(DataModelConsts.PROFILE_TABLE); ArrayList<Row> actions = new ArrayList<Row>(); byte[] rowKey = generateProfileRowKey(userId); Put put = new Put(rowKey); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.FIXED_INFO_COL, Bytes.toBytes(pojo.getUsername() + "|" + pojo.getAge() + "|" + System.currentTimeMillis())); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_IP_ADDERSSES, Bytes.toBytes(ipAddress)); put.add(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LAST_LOG_IN_COL, Bytes.toBytes(System.currentTimeMillis())); actions.add(put); Increment increment = new Increment(rowKey); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.LOG_IN_COUNT_COL, 1); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_SELLS_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_PURCHASES_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_PURCHASES_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.TOTAL_VALUE_OF_PAST_SELLS_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_SELLS_VALUE_COL, 0); increment.addColumn(DataModelConsts.PROFILE_COLUMN_FAMILY, DataModelConsts.CURRENT_LOG_IN_PURCHASES_VALUE_COL, 0); actions.add(increment); profileTable.batch(actions); }
@Override public void increment(TIncrement tincrement) throws IOError, TException { if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) { throw new TException("Must supply a table and a row key; can't increment"); } if (conf.getBoolean(COALESCE_INC_KEY, false)) { this.coalescer.queueIncrement(tincrement); return; } try { HTable table = getTable(tincrement.getTable()); Increment inc = ThriftUtilities.incrementFromThrift(tincrement); table.increment(inc); } catch (IOException e) { LOG.warn(e.getMessage(), e); throw new IOError(e.getMessage()); } }
/** * @param increment increment object * @return result to return to client if default operation should be * bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(Increment increment) throws IOException { boolean bypass = false; Result result = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } } return bypass ? result : null; }
/** * @param increment increment object * @param result the result returned by postIncrement * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } if (ctx.shouldComplete()) { break; } } } return result; }
public void testIncrWithReadOnlyTable() throws Exception { byte[] TABLE = Bytes.toBytes("readOnlyTable"); this.region = initHRegion(TABLE, getName(), conf, true, Bytes.toBytes("somefamily")); boolean exceptionCaught = false; Increment inc = new Increment(Bytes.toBytes("somerow")); inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L); try { region.increment(inc, false); } catch (IOException e) { exceptionCaught = true; } finally { HRegion.closeHRegion(this.region); this.region = null; } assertTrue(exceptionCaught == true); }
ListenableFuture<? extends GeneratedMessage> issueRequest(Row row) { if (row instanceof Put) { return issuePutRequest((Put) row); } else if (row instanceof Delete) { return issueDeleteRequest((Delete) row); } else if (row instanceof Append) { return issueAppendRequest((Append) row); } else if (row instanceof Increment) { return issueIncrementRequest((Increment) row); } else if (row instanceof Get) { return issueGetRequest((Get) row); } else if (row instanceof RowMutations) { return issueRowMutationsRequest((RowMutations) row); } LOG.error("Encountered unknown action type %s", row.getClass()); return Futures.immediateFailedFuture( new IllegalArgumentException("Encountered unknown action type: " + row.getClass())); }
@Test public void testSingleIncrement() throws IOException { byte[] rowKey = dataHelper.randomData("rk1-"); byte[] family = Bytes.toBytes("family"); byte[] qualifier = Bytes.toBytes("qualifier"); long amount = 1234; Increment incr = new Increment(rowKey); incr.addColumn(family, qualifier, amount); ReadModifyWriteRowRequest.Builder requestBuilder = incrementAdapter.adapt(incr); Assert.assertEquals(1, requestBuilder.getRulesCount()); ReadModifyWriteRule rule = requestBuilder.getRules(0); Assert.assertEquals("qualifier", rule.getColumnQualifier().toStringUtf8()); Assert.assertEquals("family", rule.getFamilyName()); Assert.assertEquals(amount, rule.getIncrementAmount()); }
/** * Requirement 6.6 - Increment should fail on non-64-bit values, and succeed on any 64-bit value. */ @Test @Category(KnownGap.class) public void testFailOnIncrementInt() throws IOException { // Initialize Table table = getConnection().getTable(TABLE_NAME); byte[] rowKey = dataHelper.randomData("testrow-"); byte[] qual = dataHelper.randomData("qual-"); int value = new Random().nextInt(); Put put = new Put(rowKey).addColumn(COLUMN_FAMILY, qual, Bytes.toBytes(value)); table.put(put); // Increment Increment increment = new Increment(rowKey).addColumn(COLUMN_FAMILY, qual, 1L); expectedException.expect(DoNotRetryIOException.class); expectedException.expectMessage("Attempted to increment field that isn't 64 bits wide"); table.increment(increment); }
/** * Requirement 6.6 */ @Test @Category(KnownGap.class) public void testFailOnIncrementString() throws IOException { // Initialize Table table = getConnection().getTable(TABLE_NAME); byte[] rowKey = dataHelper.randomData("testrow-"); byte[] qual = dataHelper.randomData("qual-"); byte[] value = dataHelper.randomData("value-"); Put put = new Put(rowKey).addColumn(COLUMN_FAMILY, qual, value); table.put(put); // Increment Increment increment = new Increment(rowKey).addColumn(COLUMN_FAMILY, qual, 1L); expectedException.expect(DoNotRetryIOException.class); expectedException.expectMessage("Attempted to increment field that isn't 64 bits wide"); table.increment(increment); }
@Override public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c, final Increment increment) throws IOException { if (increment.getAttribute(CHECK_COVERING_PERM) != null) { // We had failure with table, cf and q perm checks and now giving a chance for cell // perm check TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); AuthResult authResult = null; if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(), increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) { authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set", getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap()); } else { authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set", getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap()); } logResult(authResult); if (!authResult.isAllowed()) { throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString()); } } return null; }
/** * Creates a HBase {@link Increment} from a Storm {@link Tuple} * * @param tuple * The {@link Tuple} * @param increment * The amount to increment the counter by * @return {@link Increment} */ public Increment getIncrementFromTuple(final Tuple tuple, final long increment) { byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField)); Increment inc = new Increment(rowKey); inc.setDurability(durability); if (columnFamilies.size() > 0) { for (String cf : columnFamilies.keySet()) { byte[] cfBytes = Bytes.toBytes(cf); for (String cq : columnFamilies.get(cf)) { byte[] val; try { val = Bytes.toBytes(tuple.getStringByField(cq)); } catch (IllegalArgumentException ex) { // if cq isn't a tuple field, use cq for counter instead of tuple // value val = Bytes.toBytes(cq); } inc.addColumn(cfBytes, val, increment); } } } return inc; }
@Override public Object run() throws Exception { try { if (table == null) { table = new HTable(conf, tableName); } if (m instanceof Increment) { table.increment((Increment) m); } else if (m instanceof Append) { table.append((Append) m); } else if (m instanceof Put) { table.checkAndPut(row, cf, q, v, (Put) m); } else if (m instanceof Delete) { table.checkAndDelete(row, cf, q, v, (Delete) m); } else { throw new IllegalArgumentException("unsupported mutation " + m.getClass().getSimpleName()); } totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); } catch (IOException e) { recordFailure(m, keyBase, start, e); } return null; }
@Override public void run() { for (int i=0; i<numIncrements; i++) { try { Increment inc = new Increment(row); inc.addColumn(fam1, qual1, amount); inc.addColumn(fam1, qual2, amount*2); inc.addColumn(fam2, qual3, amount*3); inc.setDurability(Durability.ASYNC_WAL); region.increment(inc); // verify: Make sure we only see completed increments Get g = new Get(row); Result result = region.get(g); assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); } catch (IOException e) { e.printStackTrace(); } } }
/** * @param increment increment object * @return result to return to client if default operation should be * bypassed, null otherwise * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(final Increment increment) throws IOException { boolean bypass = false; Result result = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } } return bypass ? result : null; }
/** * @param increment increment object * @param result the result returned by postIncrement * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { ObserverContext<RegionCoprocessorEnvironment> ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } if (ctx.shouldComplete()) { break; } } } return result; }
@Test public void testIncrementHook() throws IOException { TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook"); HTable table = util.createTable(tableName, new byte[][] {A, B, C}); try { Increment inc = new Increment(Bytes.toBytes(0)); inc.addColumn(A, A, 1); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreIncrement", "hadPostIncrement"}, tableName, new Boolean[] {false, false} ); table.increment(inc); verifyMethodResult(SimpleRegionObserver.class, new String[] {"hadPreIncrement", "hadPostIncrement"}, tableName, new Boolean[] {true, true} ); } finally { util.deleteTable(tableName); table.close(); } }
/** * Add a Mutation such as a Put or Increment to the batch. The Mutation is only queued for * later execution. * * @param rowKey The row key of the Mutation. * @param cols The columns affected by the Mutation. * @param durability The durability of the mutation. */ public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) { if (cols.hasColumns()) { Put put = createPut(rowKey, cols, durability); mutations.add(put); } if (cols.hasCounters()) { Increment inc = createIncrement(rowKey, cols, durability); mutations.add(inc); } if (mutations.isEmpty()) { mutations.add(new Put(rowKey)); } }
@Override public void run() { for (int i=0; i<numIncrements; i++) { try { Increment inc = new Increment(row); inc.addColumn(fam1, qual1, amount); inc.addColumn(fam1, qual2, amount*2); inc.addColumn(fam2, qual3, amount*3); region.increment(inc); // verify: Make sure we only see completed increments Get g = new Get(row); Result result = region.get(g); assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, Bytes.toLong(result.getValue(fam1, qual2))); assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, Bytes.toLong(result.getValue(fam2, qual3))); } catch (IOException e) { e.printStackTrace(); } } }
@Override public List<Increment> getIncrements() { List<Increment> incs = new LinkedList<Increment>(); if (incCol != null) { Increment inc = new Increment(incrementRow); inc.addColumn(cf, incCol, 1); incs.add(inc); } return incs; }
@Override @SuppressWarnings("unchecked") public void onAfterCoalesce(Iterable<Increment> increments) { for (Increment inc : increments) { byte[] row = inc.getRow(); Map<byte[], NavigableMap<byte[], Long>> families = null; try { families = (Map<byte[], NavigableMap<byte[], Long>>) refGetFamilyMap.invoke(inc); } catch (Exception e) { Throwables.propagate(e); } for (byte[] family : families.keySet()) { NavigableMap<byte[], Long> qualifiers = families.get(family); for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) { byte[] qualifier = entry.getKey(); Long count = entry.getValue(); StringBuilder b = new StringBuilder(20); b.append(new String(row, Charsets.UTF_8)); b.append(':'); b.append(new String(qualifier, Charsets.UTF_8)); String key = b.toString(); Assert.assertEquals("Expected counts don't match observed for " + key, expectedCounts.get(key), count); } } } }
@Override public List<Increment> getIncrements() { List<Increment> increments = Lists.newArrayList(); String body = new String(event.getBody(), Charsets.UTF_8); String[] pieces = body.split(":"); String row = pieces[0]; String qualifier = pieces[1]; Increment inc = new Increment(row.getBytes(Charsets.UTF_8)); inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L); increments.add(inc); return increments; }