Java 类org.apache.hadoop.hbase.client.Mutation 实例源码
项目:stroom-stats
文件:HBaseEventStoreTable.java
private void addMultipleCounts(final Map<RowKey, List<CountCellIncrementHolder>> rowChanges) {
LOGGER.trace(() -> String.format("addMultipleCounts called for %s rows", rowChanges.size()));
// create an action for each row we have data for
final List<Mutation> actions = rowChanges.entrySet().stream()
.map(entry -> createIncrementOperation(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
final Object[] results = null;
// don't care about what is written to results as we are doing puts send the mutations to HBase
// long startTime = System.currentTimeMillis();
doBatch(actions, results);
LOGGER.trace(() -> String.format("%s puts sent to HBase", actions.size()));
// LOGGER.info("Sent %s ADDs to HBase from thread %s in %s ms",
// cellQualifiersFromBuffer.size(),
// Thread.currentThread().getName(), (System.currentTimeMillis() -
// startTime));
}
项目:ditb
文件:DefaultVisibilityLabelServiceImpl.java
/**
* Adds the mutations to labels region and set the results to the finalOpStatus. finalOpStatus
* might have some entries in it where the OpStatus is FAILURE. We will leave those and set in
* others in the order.
* @param mutations
* @param finalOpStatus
* @return whether we need a ZK update or not.
*/
private boolean mutateLabelsRegion(List<Mutation> mutations, OperationStatus[] finalOpStatus)
throws IOException {
OperationStatus[] opStatus = this.labelsRegion.batchMutate(mutations
.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE);
int i = 0;
boolean updateZk = false;
for (OperationStatus status : opStatus) {
// Update the zk when atleast one of the mutation was added successfully.
updateZk = updateZk || (status.getOperationStatusCode() == OperationStatusCode.SUCCESS);
for (; i < finalOpStatus.length; i++) {
if (finalOpStatus[i] == null) {
finalOpStatus[i] = status;
break;
}
}
}
return updateZk;
}
项目:ditb
文件:MultiTableOutputFormat.java
/**
* Writes an action (Put or Delete) to the specified table.
*
* @param tableName
* the table being updated.
* @param action
* the update, either a put or a delete.
* @throws IllegalArgumentException
* if the action is not a put or a delete.
*/
@Override
public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
BufferedMutator mutator = getBufferedMutator(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
Put put = new Put((Put) action);
put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
: Durability.SKIP_WAL);
mutator.mutate(put);
} else if (action instanceof Delete) {
Delete delete = new Delete((Delete) action);
mutator.mutate(delete);
} else
throw new IllegalArgumentException(
"action must be either Delete or Put");
}
项目:ditb
文件:RequestConverter.java
/**
* Create a protocol buffer MultiRequest for row mutations.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @return a data-laden RegionMutation.Builder
* @throws IOException
*/
public static RegionAction.Builder buildRegionAction(final byte [] regionName,
final RowMutations rowMutations)
throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
return builder;
}
项目:ditb
文件:MultiRowMutationProcessor.java
@Override
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
if (coprocessorHost != null) {
for (Mutation m : mutations) {
if (m instanceof Put) {
if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
// by pass everything
return;
}
} else if (m instanceof Delete) {
Delete d = (Delete) m;
region.prepareDelete(d);
if (coprocessorHost.preDelete(d, walEdit, d.getDurability())) {
// by pass everything
return;
}
}
}
}
}
项目:ditb
文件:TestRegionServerObserver.java
@Override
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException {
preMergeBeforePONRCalled = true;
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
List<Region> onlineRegions =
rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2"));
rmt = (RegionMergeTransactionImpl) new RegionMergeTransactionFactory(rs.getConfiguration())
.create(onlineRegions.get(0), onlineRegions.get(1), true);
if (!rmt.prepare(rs)) {
LOG.error("Prepare for the region merge of table "
+ onlineRegions.get(0).getTableDesc().getNameAsString()
+ " failed. So returning null. ");
ctx.bypass();
return;
}
mergedRegion = rmt.stepsBeforePONR(rs, rs, false);
rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(), regionA.getRegionInfo(),
regionB.getRegionInfo(), rs.getServerName(), metaEntries,
regionA.getTableDesc().getRegionReplication());
MetaTableAccessor.mutateMetaTable(rs.getConnection(), metaEntries);
}
项目:ditb
文件:MultiThreadedUpdaterWithACL.java
private void recordFailure(final Mutation m, final long keyBase,
final long start, IOException e) {
failedKeySet.add(keyBase);
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
+ "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
+ exceptionInfo);
}
项目:ditb
文件:LoadTestDataGeneratorWithACL.java
@Override
public Mutation beforeMutate(long rowkeyBase, Mutation m) throws IOException {
if (!(m instanceof Delete)) {
if (userNames != null && userNames.length > 0) {
int mod = ((int) rowkeyBase % this.userNames.length);
if (((int) rowkeyBase % specialPermCellInsertionFactor) == 0) {
// These cells cannot be read back when running as user userName[mod]
if (LOG.isTraceEnabled()) {
LOG.trace("Adding special perm " + rowkeyBase);
}
m.setACL(userNames[mod], new Permission(Permission.Action.WRITE));
} else {
m.setACL(userNames[mod], new Permission(Permission.Action.READ));
}
}
}
return m;
}
项目:ditb
文件:RequestConverter.java
/**
* Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells
* are carried outside of protobuf. Return references to the Cells in <code>cells</code> param.
* Does not propagate Action absolute position. Does not set atomic action on the created
* RegionAtomic. Caller should do that if wanted.
* @param regionName
* @param rowMutations
* @param cells Return in here a list of Cells as CellIterable.
* @return a region mutation minus data
* @throws IOException
*/
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final RowMutations rowMutations, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder)
throws IOException {
for (Mutation mutation: rowMutations.getMutations()) {
MutationType type = null;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation, mutationBuilder);
cells.add(mutation);
actionBuilder.clear();
regionActionBuilder.addAction(actionBuilder.setMutation(mp).build());
}
return regionActionBuilder;
}
项目:aliyun-tablestore-hbase-client
文件:TablestoreBufferedMutator.java
public TablestoreBufferedMutator(TablestoreConnection connection, TableName tableName) {
this.tableName = tableName;
this.connection = connection;
writeBuffer = new ConcurrentLinkedQueue<Mutation>();
this.writeBufferSize = this.connection.getConfiguration().getLong("hbase.client.write.buffer", 2097152);
this.currentWriteBufferSize = 0;
this.columnMapping = new ColumnMapping(tableName.getNameAsString(), this.connection.getConfiguration());
this.adapter = OTSAdapter.getInstance(this.connection.getTablestoreConf());
this.clearBufferOnFail = true;
}
项目:aliyun-tablestore-hbase-client
文件:TablestoreBufferedMutator.java
@Override
public void mutate(List<? extends Mutation> list) throws IOException {
List<OPut> flushPuts = new ArrayList<OPut>();
List<ODelete> flushDeletes = new ArrayList<ODelete>();
for (Mutation mutation : list) {
writeBuffer.add(mutation);
currentWriteBufferSize += mutation.heapSize();
}
if (currentWriteBufferSize >= writeBufferSize) {
extractOMutation(flushPuts, flushDeletes);
}
flush(flushPuts, flushDeletes);
}
项目:aliyun-tablestore-hbase-client
文件:TablestoreBufferedMutator.java
private void extractOMutation(List<OPut> flushPuts, List<ODelete> flushDeletes) {
for (Mutation mutation : writeBuffer) {
if (mutation instanceof Put) {
flushPuts.add(ElementConvertor.toOtsPut((Put)mutation, this.columnMapping));
} else if (mutation instanceof Delete) {
flushDeletes.add(ElementConvertor.toOtsDelete((Delete)mutation, this.columnMapping));
}
}
writeBuffer.clear();
currentWriteBufferSize = 0;
}
项目:ditb
文件:TestTags.java
private void updateMutationAddingTags(final Mutation m) {
byte[] attribute = m.getAttribute("visibility");
byte[] cf = null;
List<Cell> updatedCells = new ArrayList<Cell>();
if (attribute != null) {
for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (cf == null) {
cf = kv.getFamily();
}
Tag tag = new Tag((byte) 1, attribute);
List<Tag> tagList = new ArrayList<Tag>();
tagList.add(tag);
KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
kv.getValueLength(), tagList);
((List<Cell>) updatedCells).add(newKV);
}
}
m.getFamilyCellMap().remove(cf);
// Update the family map
m.getFamilyCellMap().put(cf, updatedCells);
}
}
项目:ditb
文件:VisibilityController.java
@Override
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
List<Tag> tags = Lists.newArrayList();
CellVisibility cellVisibility = null;
try {
cellVisibility = mutation.getCellVisibility();
} catch (DeserializationException e) {
throw new IOException(e);
}
if (cellVisibility == null) {
return newCell;
}
// Prepend new visibility tags to a new list of tags for the cell
// Don't check user auths for labels with Mutations when the user is super user
boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
true, authCheck));
// Save an object allocation where we can
if (newCell.getTagsLength() > 0) {
// Carry forward all other tags
Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(),
newCell.getTagsOffset(), newCell.getTagsLength());
while (tagsItr.hasNext()) {
Tag tag = tagsItr.next();
if (tag.getType() != TagType.VISIBILITY_TAG_TYPE
&& tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
tags.add(tag);
}
}
}
Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
return rewriteCell;
}
项目:ditb
文件:AccessController.java
private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
// No need to check if we're not going to throw
if (!authorizationEnabled) {
m.setAttribute(TAG_CHECK_PASSED, TRUE);
return;
}
// Superusers are allowed to store cells unconditionally.
if (Superusers.isSuperUser(user)) {
m.setAttribute(TAG_CHECK_PASSED, TRUE);
return;
}
// We already checked (prePut vs preBatchMutation)
if (m.getAttribute(TAG_CHECK_PASSED) != null) {
return;
}
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
if (cell.getTagsLength() > 0) {
Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
while (tagsItr.hasNext()) {
if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
throw new AccessDeniedException("Mutation contains cell with reserved type tag");
}
}
}
}
m.setAttribute(TAG_CHECK_PASSED, TRUE);
}
项目:ditb
文件:AccessController.java
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (cellFeaturesEnabled && !compatibleEarlyTermination) {
TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (m.getAttribute(CHECK_COVERING_PERM) != null) {
// We have a failure with table, cf and q perm checks and now giving a chance for cell
// perm check
OpType opType;
if (m instanceof Put) {
checkForReservedTagPresence(getActiveUser(), m);
opType = OpType.PUT;
} else {
opType = OpType.DELETE;
}
AuthResult authResult = null;
if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
authResult = AuthResult.allow(opType.toString(), "Covering cell set",
getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
} else {
authResult = AuthResult.deny(opType.toString(), "Covering cell set",
getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
}
logResult(authResult);
if (authorizationEnabled && !authResult.isAllowed()) {
throw new AccessDeniedException("Insufficient permissions "
+ authResult.toContextString());
}
}
}
}
}
项目:ditb
文件:WriteSinkCoprocessor.java
@Override
public void preBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
final MiniBatchOperationInProgress<Mutation> miniBatchOp)
throws IOException {
if (ops.incrementAndGet() % 20000 == 0) {
LOG.info("Wrote " + ops.get() + " times in region " + regionName);
}
for (int i = 0; i < miniBatchOp.size(); i++) {
miniBatchOp.setOperationStatus(i,
new OperationStatus(HConstants.OperationStatusCode.SUCCESS));
}
c.bypass();
}
项目:ditb
文件:WALSplitter.java
public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
this.type = type;
this.mutation = mutation;
if(this.mutation.getDurability() != Durability.SKIP_WAL) {
// using ASYNC_WAL for relay
this.mutation.setDurability(Durability.ASYNC_WAL);
}
this.nonceGroup = nonceGroup;
this.nonce = nonce;
}
项目:ditb
文件:QuotaUtil.java
public static long calculateMutationSize(final Mutation mutation) {
long size = 0;
for (Map.Entry<byte[], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
for (Cell cell : entry.getValue()) {
size += KeyValueUtil.length(cell);
}
}
return size;
}
项目:ditb
文件:TableOutputFormat.java
/**
* Writes a key/value pair into the table.
*
* @param key The key.
* @param value The value.
* @throws IOException When writing fails.
* @see RecordWriter#write(Object, Object)
*/
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
项目:ditb
文件:IntegrationTestBigLinkedList.java
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
}
项目:ditb
文件:MutationSerialization.java
@Override
public void serialize(Mutation mutation) throws IOException {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else if (mutation instanceof Delete) {
type = MutationType.DELETE;
} else {
throw new IllegalArgumentException("Only Put and Delete are supported");
}
ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out);
}
项目:ditb
文件:MultiTableOutputFormat.java
@Override
public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
conf.getBoolean(WAL_PROPERTY, WAL_ON));
}
项目:ditb
文件:RegionServerCoprocessorHost.java
public boolean preMergeCommit(final HRegion regionA, final HRegion regionB,
final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() {
@Override
public void call(RegionServerObserver oserver,
ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException {
oserver.preMergeCommit(ctx, regionA, regionB, metaEntries);
}
});
}
项目:ditb
文件:RequestConverter.java
/**
* Create a protocol buffer MutateRequest for conditioned row mutations
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param comparator
* @param compareType
* @param rowMutations
* @return a mutate request
* @throws IOException
*/
public static ClientProtos.MultiRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final ByteArrayComparable comparator,
final CompareType compareType, final RowMutations rowMutations) throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
builder.setAtomic(true);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
mutateType = MutationType.DELETE;
} else {
throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
mutation.getClass().getName());
}
mutationBuilder.clear();
MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation, mutationBuilder);
actionBuilder.clear();
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
ClientProtos.MultiRequest request =
ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
.setCondition(condition).build();
return request;
}
项目:ditb
文件:MultiRowMutationProcessor.java
@Override
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
// TODO we should return back the status of this hook run to HRegion so that those Mutations
// with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
OperationStatus[] opStatus = new OperationStatus[mutations.size()];
Arrays.fill(opStatus, OperationStatus.NOT_RUN);
WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
if (coprocessorHost != null) {
miniBatch = new MiniBatchOperationInProgress<Mutation>(
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
mutations.size());
coprocessorHost.preBatchMutate(miniBatch);
}
// Apply edits to a single WALEdit
for (int i = 0; i < mutations.size(); i++) {
if (opStatus[i] == OperationStatus.NOT_RUN) {
// Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
// itself. No need to apply again to region
if (walEditsFromCP[i] != null) {
// Add the WALEdit created by CP hook
for (Cell walCell : walEditsFromCP[i].getCells()) {
walEdit.add(walCell);
}
}
}
}
}
项目:ditb
文件:ProtobufUtil.java
/**
* Convert a MutateRequest to Mutation
*
* @param proto the protocol buffer Mutate to convert
* @return the converted Mutation
* @throws IOException
*/
public static Mutation toMutation(final MutationProto proto) throws IOException {
MutationType type = proto.getMutateType();
if (type == MutationType.APPEND) {
return toAppend(proto, null);
}
if (type == MutationType.DELETE) {
return toDelete(proto, null);
}
if (type == MutationType.PUT) {
return toPut(proto, null);
}
throw new IOException("Unknown mutation type " + type);
}
项目:ditb
文件:MultiRowMutationProcessor.java
@Override
public Durability useDurability() {
// return true when at least one mutation requested a WAL flush (default)
Durability durability = Durability.USE_DEFAULT;
for (Mutation m : mutations) {
if (m.getDurability().ordinal() > durability.ordinal()) {
durability = m.getDurability();
}
}
return durability;
}
项目:ditb
文件:SplitTransactionImpl.java
private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
ServerName serverName, List<Mutation> metaEntries, int regionReplication)
throws IOException {
List<Mutation> mutations = metaEntries;
HRegionInfo copyOfParent = new HRegionInfo(parent);
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
//Put for parent
Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
mutations.add(putParent);
//Puts for daughters
Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
addLocation(putB, serverName, 1);
mutations.add(putA);
mutations.add(putB);
// Add empty locations for region replicas of daughters so that number of replicas can be
// cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putA, i);
addEmptyLocation(putB, i);
}
MetaTableAccessor.mutateMetaTable(hConnection, mutations);
}
项目:ditb
文件:RegionMergeTransactionImpl.java
private void mergeRegionsAndPutMetaEntries(HConnection hConnection,
HRegionInfo mergedRegion, HRegionInfo regionA, HRegionInfo regionB,
ServerName serverName, List<Mutation> metaEntries,
int regionReplication) throws IOException {
prepareMutationsForMerge(mergedRegion, regionA, regionB, serverName, metaEntries,
regionReplication);
MetaTableAccessor.mutateMetaTable(hConnection, metaEntries);
}
项目:ditb
文件:RegionMergeTransactionImpl.java
public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
HRegionInfo regionB, ServerName serverName, List<Mutation> mutations,
int regionReplication) throws IOException {
HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
// use the maximum of what master passed us vs local time.
long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
// Put for parent
Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time);
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
regionA.toByteArray());
putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
regionB.toByteArray());
mutations.add(putOfMerged);
// Deletes for merging regions
Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time);
mutations.add(deleteA);
mutations.add(deleteB);
// Add empty locations for region replicas of the merged region so that number of replicas
// can be cached whenever the primary region is looked up from meta
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putOfMerged, i);
}
// The merged is a new region, openSeqNum = 1 is fine.
addLocation(putOfMerged, serverName, 1);
}
项目:ditb
文件:HRegion.java
/**
* @return Carry forward the TTL tag if the increment is carrying one
*/
private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull, final Mutation mutation) {
long ttl = mutation.getTTL();
if (ttl == Long.MAX_VALUE) return tagsOrNull;
List<Tag> tags = tagsOrNull;
// If we are making the array in here, given we are the last thing checked,
// we'll be only thing
// in the array so set its size to '1' (I saw this being done in earlier
// version of
// tag-handling).
if (tags == null) tags = new ArrayList<Tag>(1);
tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
return tags;
}
项目:ditb
文件:HRegion.java
@Override public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
throws IOException {
// As it stands, this is used for 3 things
// * batchMutate with single mutation - put/delete, separate or from
// checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs,
// and checkAnd...
return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
}
项目:ditb
文件:HRegion.java
private void doBatchMutate(Mutation mutation) throws IOException {
// Currently this is only called for puts and deletes, so no nonces.
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
}
}
项目:ditb
文件:HRegion.java
/**
* Possibly rewrite incoming cell tags.
*/
void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
// Check if we have any work to do and early out otherwise
// Update these checks as more logic is added here
if (m.getTTL() == Long.MAX_VALUE) {
return;
}
// From this point we know we have some work to do
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
List<Cell> cells = e.getValue();
assert cells instanceof RandomAccess;
int listSize = cells.size();
for (int i = 0; i < listSize; i++) {
Cell cell = cells.get(i);
List<Tag> newTags = Tag.carryForwardTags(null, cell);
newTags = carryForwardTTLTag(newTags, m);
// Rewrite the cell with the updated set of tags
cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength(), newTags));
}
}
}
项目:ditb
文件:IntegrationTestLoadAndVerify.java
@Override
public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
throws IOException {
super.setup(context);
try {
this.keysToFind = readKeysToSearch(context.getConfiguration());
LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
}
项目:ditb
文件:MetaTableAccessor.java
/**
* Execute the passed <code>mutations</code> against <code>hbase:meta</code> table.
* @param connection connection we're using
* @param mutations Puts and Deletes to execute on hbase:meta
* @throws IOException
*/
public static void mutateMetaTable(final Connection connection,
final List<Mutation> mutations)
throws IOException {
Table t = getMetaHTable(connection);
try {
t.batch(mutations);
} catch (InterruptedException e) {
InterruptedIOException ie = new InterruptedIOException(e.getMessage());
ie.initCause(e);
throw ie;
} finally {
t.close();
}
}
项目:ditb
文件:RegionCoprocessorHost.java
/**
* @param mutation - the current mutation
* @param kv - the current cell
* @param byteNow - current timestamp in bytes
* @param get - the get that could be used
* Note that the get only does not specify the family and qualifier that should be used
* @return true if default processing should be bypassed
* @exception IOException
* Exception
*/
public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
final Cell kv, final byte[] byteNow, final Get get) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
}
});
}
项目:ditb
文件:RegionCoprocessorHost.java
/**
* @param miniBatchOp
* @return true if default processing should be bypassed
* @throws IOException
*/
public boolean preBatchMutate(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.preBatchMutate(ctx, miniBatchOp);
}
});
}
项目:ditb
文件:RegionCoprocessorHost.java
/**
* @param miniBatchOp
* @throws IOException
*/
public void postBatchMutate(
final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
@Override
public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException {
oserver.postBatchMutate(ctx, miniBatchOp);
}
});
}