Java 类org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException 实例源码

项目:hadoop    文件:FsDatasetImpl.java   
/**
 * Check if a block is valid.
 *
 * @param b           The block to check.
 * @param minLength   The minimum length that the block must have.  May be 0.
 * @param state       If this is null, it is ignored.  If it is non-null, we
 *                        will check that the replica has this state.
 *
 * @throws ReplicaNotFoundException          If the replica is not found 
 *
 * @throws UnexpectedReplicaStateException   If the replica is not in the 
 *                                             expected state.
 * @throws FileNotFoundException             If the block file is not found or there
 *                                              was an error locating it.
 * @throws EOFException                      If the replica length is too short.
 * 
 * @throws IOException                       May be thrown from the methods called. 
 */
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
    throws ReplicaNotFoundException, UnexpectedReplicaStateException,
    FileNotFoundException, EOFException, IOException {
  final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
      b.getLocalBlock());
  if (replicaInfo == null) {
    throw new ReplicaNotFoundException(b);
  }
  if (replicaInfo.getState() != state) {
    throw new UnexpectedReplicaStateException(b,state);
  }
  if (!replicaInfo.getBlockFile().exists()) {
    throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
  }
  long onDiskLength = getLength(b);
  if (onDiskLength < minLength) {
    throw new EOFException(b + "'s on-disk length " + onDiskLength
        + " is shorter than minLength " + minLength);
  }
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
    throws IOException {
  synchronized(this) {
    final Replica replica = volumeMap.get(block.getBlockPoolId(),
        block.getBlockId());
    if (replica == null) {
      throw new ReplicaNotFoundException(block);
    }
    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
      throw new IOException(
          "Replica generation stamp < block generation stamp, block="
          + block + ", replica=" + replica);
    } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
      block.setGenerationStamp(replica.getGenerationStamp());
    }
  }

  File datafile = getBlockFile(block);
  File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
  BlockLocalPathInfo info = new BlockLocalPathInfo(block,
      datafile.getAbsolutePath(), metafile.getAbsolutePath());
  return info;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
/**
 * Check if a block is valid.
 *
 * @param b           The block to check.
 * @param minLength   The minimum length that the block must have.  May be 0.
 * @param state       If this is null, it is ignored.  If it is non-null, we
 *                        will check that the replica has this state.
 *
 * @throws ReplicaNotFoundException          If the replica is not found 
 *
 * @throws UnexpectedReplicaStateException   If the replica is not in the 
 *                                             expected state.
 * @throws FileNotFoundException             If the block file is not found or there
 *                                              was an error locating it.
 * @throws EOFException                      If the replica length is too short.
 * 
 * @throws IOException                       May be thrown from the methods called. 
 */
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
    throws ReplicaNotFoundException, UnexpectedReplicaStateException,
    FileNotFoundException, EOFException, IOException {
  final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
      b.getLocalBlock());
  if (replicaInfo == null) {
    throw new ReplicaNotFoundException(b);
  }
  if (replicaInfo.getState() != state) {
    throw new UnexpectedReplicaStateException(b,state);
  }
  if (!replicaInfo.getBlockFile().exists()) {
    throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
  }
  long onDiskLength = getLength(b);
  if (onDiskLength < minLength) {
    throw new EOFException(b + "'s on-disk length " + onDiskLength
        + " is shorter than minLength " + minLength);
  }
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
    throws IOException {
  synchronized(this) {
    final Replica replica = volumeMap.get(block.getBlockPoolId(),
        block.getBlockId());
    if (replica == null) {
      throw new ReplicaNotFoundException(block);
    }
    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
      throw new IOException(
          "Replica generation stamp < block generation stamp, block="
          + block + ", replica=" + replica);
    } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
      block.setGenerationStamp(replica.getGenerationStamp());
    }
  }

  File datafile = getBlockFile(block);
  File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
  BlockLocalPathInfo info = new BlockLocalPathInfo(block,
      datafile.getAbsolutePath(), metafile.getAbsolutePath());
  return info;
}
项目:aliyun-oss-hadoop-fs    文件:MiniDFSCluster.java   
private int corruptBlockOnDataNodesHelper(ExtendedBlock block,
    boolean deleteBlockFile) throws IOException {
  int blocksCorrupted = 0;
  for (DataNode dn : getDataNodes()) {
    try {
      MaterializedReplica replica =
          getFsDatasetTestUtils(dn).getMaterializedReplica(block);
      if (deleteBlockFile) {
        replica.deleteData();
      } else {
        replica.corruptData();
      }
      blocksCorrupted++;
    } catch (ReplicaNotFoundException e) {
      // Ignore.
    }
  }
  return blocksCorrupted;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImplTestUtils.java   
/**
 * Return a materialized replica from the FsDatasetImpl.
 */
@Override
public MaterializedReplica getMaterializedReplica(ExtendedBlock block)
    throws ReplicaNotFoundException {
  File blockFile;
  try {
     blockFile = dataset.getBlockFile(
         block.getBlockPoolId(), block.getBlockId());
  } catch (IOException e) {
    LOG.error("Block file for " + block + " does not existed:", e);
    throw new ReplicaNotFoundException(block);
  }
  File metaFile = FsDatasetUtil.getMetaFile(
      blockFile, block.getGenerationStamp());
  return new FsDatasetImplMaterializedReplica(blockFile, metaFile);
}
项目:big-c    文件:FsDatasetImpl.java   
/**
 * Check if a block is valid.
 *
 * @param b           The block to check.
 * @param minLength   The minimum length that the block must have.  May be 0.
 * @param state       If this is null, it is ignored.  If it is non-null, we
 *                        will check that the replica has this state.
 *
 * @throws ReplicaNotFoundException          If the replica is not found 
 *
 * @throws UnexpectedReplicaStateException   If the replica is not in the 
 *                                             expected state.
 * @throws FileNotFoundException             If the block file is not found or there
 *                                              was an error locating it.
 * @throws EOFException                      If the replica length is too short.
 * 
 * @throws IOException                       May be thrown from the methods called. 
 */
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
    throws ReplicaNotFoundException, UnexpectedReplicaStateException,
    FileNotFoundException, EOFException, IOException {
  final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
      b.getLocalBlock());
  if (replicaInfo == null) {
    throw new ReplicaNotFoundException(b);
  }
  if (replicaInfo.getState() != state) {
    throw new UnexpectedReplicaStateException(b,state);
  }
  if (!replicaInfo.getBlockFile().exists()) {
    throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
  }
  long onDiskLength = getLength(b);
  if (onDiskLength < minLength) {
    throw new EOFException(b + "'s on-disk length " + onDiskLength
        + " is shorter than minLength " + minLength);
  }
}
项目:big-c    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
    throws IOException {
  synchronized(this) {
    final Replica replica = volumeMap.get(block.getBlockPoolId(),
        block.getBlockId());
    if (replica == null) {
      throw new ReplicaNotFoundException(block);
    }
    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
      throw new IOException(
          "Replica generation stamp < block generation stamp, block="
          + block + ", replica=" + replica);
    } else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
      block.setGenerationStamp(replica.getGenerationStamp());
    }
  }

  File datafile = getBlockFile(block);
  File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
  BlockLocalPathInfo info = new BlockLocalPathInfo(block,
      datafile.getAbsolutePath(), metafile.getAbsolutePath());
  return info;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
/**
 * Check if a block is valid.
 *
 * @param b           The block to check.
 * @param minLength   The minimum length that the block must have.  May be 0.
 * @param state       If this is null, it is ignored.  If it is non-null, we
 *                        will check that the replica has this state.
 *
 * @throws ReplicaNotFoundException          If the replica is not found 
 *
 * @throws UnexpectedReplicaStateException   If the replica is not in the 
 *                                             expected state.
 * @throws FileNotFoundException             If the block file is not found or there
 *                                              was an error locating it.
 * @throws EOFException                      If the replica length is too short.
 * 
 * @throws IOException                       May be thrown from the methods called. 
 */
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
    throws ReplicaNotFoundException, UnexpectedReplicaStateException,
    FileNotFoundException, EOFException, IOException {
  final ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
      b.getLocalBlock());
  if (replicaInfo == null) {
    throw new ReplicaNotFoundException(b);
  }
  if (replicaInfo.getState() != state) {
    throw new UnexpectedReplicaStateException(b,state);
  }
  if (!replicaInfo.getBlockFile().exists()) {
    throw new FileNotFoundException(replicaInfo.getBlockFile().getPath());
  }
  long onDiskLength = getLength(b);
  if (onDiskLength < minLength) {
    throw new EOFException(b + "'s on-disk length " + onDiskLength
        + " is shorter than minLength " + minLength);
  }
}
项目:hadoop    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:hadoop    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:big-c    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:big-c    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:big-c    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaHandler append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  FsVolumeReference ref = replicaInfo.getVolume().obtainReference();
  ReplicaBeingWritten replica = null;
  try {
    replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
        b.getNumBytes());
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(replica, ref);
}
项目:hadoop-plus    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:hadoop-plus    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:hadoop-plus    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaInPipeline append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
      b.getNumBytes());
}
项目:FlexMap    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:FlexMap    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:FlexMap    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaInPipeline append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
      b.getNumBytes());
}
项目:hops    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaInPipeline append(ExtendedBlock b, long newGS,
    long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS +
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo +
        " with a length of " + replicaInfo.getNumBytes() +
        " expected length is " + expectedBlockLen);
  }

  return append(b.getBlockPoolId(), (FinalizedReplica) replicaInfo, newGS,
      b.getNumBytes());
}
项目:hadoop-TCP    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:hadoop-TCP    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:hadoop-TCP    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaInPipeline append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
      b.getNumBytes());
}
项目:hardfs    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. To find a block,
 * block pool Id, block Id and generation stamp must match.
 * @param b extended block
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
ReplicaInfo getReplicaInfo(ExtendedBlock b)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
  }
  return info;
}
项目:hardfs    文件:FsDatasetImpl.java   
/**
 * Get the meta info of a block stored in volumeMap. Block is looked up
 * without matching the generation stamp.
 * @param bpid block pool Id
 * @param blkid block Id
 * @return the meta replica information; null if block was not found
 * @throws ReplicaNotFoundException if no entry is in the map or 
 *                        there is a generation stamp mismatch
 */
private ReplicaInfo getReplicaInfo(String bpid, long blkid)
    throws ReplicaNotFoundException {
  ReplicaInfo info = volumeMap.get(bpid, blkid);
  if (info == null) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_EXISTENT_REPLICA + bpid + ":" + blkid);
  }
  return info;
}
项目:hardfs    文件:FsDatasetImpl.java   
@Override  // FsDatasetSpi
public synchronized ReplicaInPipeline append(ExtendedBlock b,
    long newGS, long expectedBlockLen) throws IOException {
  // If the block was successfully finalized because all packets
  // were successfully processed at the Datanode but the ack for
  // some of the packets were not received by the client. The client 
  // re-opens the connection and retries sending those packets.
  // The other reason is that an "append" is occurring to this block.

  // check the validity of the parameter
  if (newGS < b.getGenerationStamp()) {
    throw new IOException("The new generation stamp " + newGS + 
        " should be greater than the replica " + b + "'s generation stamp");
  }
  ReplicaInfo replicaInfo = getReplicaInfo(b);
  LOG.info("Appending to " + replicaInfo);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
  }
  if (replicaInfo.getNumBytes() != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaInfo.getNumBytes() + 
        " expected length is " + expectedBlockLen);
  }

  return append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS,
      b.getNumBytes());
}
项目:hadoop    文件:FsDatasetImpl.java   
/**
 * Move block files from one storage to another storage.
 * @return Returns the Old replicaInfo
 * @throws IOException
 */
@Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
    StorageType targetStorageType) throws IOException {
  ReplicaInfo replicaInfo = getReplicaInfo(block);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
  }
  if (replicaInfo.getNumBytes() != block.getNumBytes()) {
    throw new IOException("Corrupted replica " + replicaInfo
        + " with a length of " + replicaInfo.getNumBytes()
        + " expected length is " + block.getNumBytes());
  }
  if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
    throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
        + " already exists on storage " + targetStorageType);
  }

  if (replicaInfo.isOnTransientStorage()) {
    // Block movement from RAM_DISK will be done by LazyPersist mechanism
    throw new IOException("Replica " + replicaInfo
        + " cannot be moved from storageType : "
        + replicaInfo.getVolume().getStorageType());
  }

  try (FsVolumeReference volumeRef = volumes.getNextVolume(
      targetStorageType, block.getNumBytes())) {
    File oldBlockFile = replicaInfo.getBlockFile();
    File oldMetaFile = replicaInfo.getMetaFile();
    FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
    // Copy files to temp dir first
    File[] blockFiles = copyBlockFiles(block.getBlockId(),
        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
        targetVolume.getTmpDir(block.getBlockPoolId()),
        replicaInfo.isOnTransientStorage());

    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
        targetVolume, blockFiles[0].getParentFile(), 0);
    newReplicaInfo.setNumBytes(blockFiles[1].length());
    // Finalize the copied files
    newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);

    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
  }

  // Replace the old block if any to reschedule the scanning.
  return replicaInfo;
}
项目:hadoop    文件:FsDatasetImpl.java   
private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
    long expectedBlockLen) throws IOException {
  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check state
  if (replicaInfo.getState() != ReplicaState.FINALIZED &&
      replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
  }

  // check generation stamp
  long replicaGenerationStamp = replicaInfo.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
        + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // stop the previous writer before check a replica's length
  long replicaLen = replicaInfo.getNumBytes();
  if (replicaInfo.getState() == ReplicaState.RBW) {
    ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
    // kill the previous writer
    rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
    rbw.setWriter(Thread.currentThread());
    // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
    if (replicaLen != rbw.getBytesOnDisk() 
        || replicaLen != rbw.getBytesAcked()) {
      throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
          "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
          rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
          ") are not the same.");
    }
  }

  // check block length
  if (replicaLen != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaLen + 
        " expected length is " + expectedBlockLen);
  }

  return replicaInfo;
}
项目:hadoop    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw(
    ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
    throws IOException {
  LOG.info("Recover RBW replica " + b);

  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check the replica's state
  if (replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
  }
  ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;

  LOG.info("Recovering " + rbw);

  // Stop the previous writer
  rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
  rbw.setWriter(Thread.currentThread());

  // check generation stamp
  long replicaGenerationStamp = rbw.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
        ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // check replica length
  long bytesAcked = rbw.getBytesAcked();
  long numBytes = rbw.getNumBytes();
  if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
    throw new ReplicaNotFoundException("Unmatched length replica " + 
        replicaInfo + ": BytesAcked = " + bytesAcked + 
        " BytesRcvd = " + numBytes + " are not in the range of [" + 
        minBytesRcvd + ", " + maxBytesRcvd + "].");
  }

  FsVolumeReference ref = rbw.getVolume().obtainReference();
  try {
    // Truncate the potentially corrupt portion.
    // If the source was client and the last node in the pipeline was lost,
    // any corrupt data written after the acked length can go unnoticed.
    if (numBytes > bytesAcked) {
      final File replicafile = rbw.getBlockFile();
      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
      rbw.setNumBytes(bytesAcked);
      rbw.setLastChecksumAndDataLen(bytesAcked, null);
    }

    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(rbw, ref);
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
/**
 * Move block files from one storage to another storage.
 * @return Returns the Old replicaInfo
 * @throws IOException
 */
@Override
public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
    StorageType targetStorageType) throws IOException {
  ReplicaInfo replicaInfo = getReplicaInfo(block);
  if (replicaInfo.getState() != ReplicaState.FINALIZED) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
  }
  if (replicaInfo.getNumBytes() != block.getNumBytes()) {
    throw new IOException("Corrupted replica " + replicaInfo
        + " with a length of " + replicaInfo.getNumBytes()
        + " expected length is " + block.getNumBytes());
  }
  if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
    throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
        + " already exists on storage " + targetStorageType);
  }

  if (replicaInfo.isOnTransientStorage()) {
    // Block movement from RAM_DISK will be done by LazyPersist mechanism
    throw new IOException("Replica " + replicaInfo
        + " cannot be moved from storageType : "
        + replicaInfo.getVolume().getStorageType());
  }

  try (FsVolumeReference volumeRef = volumes.getNextVolume(
      targetStorageType, block.getNumBytes())) {
    File oldBlockFile = replicaInfo.getBlockFile();
    File oldMetaFile = replicaInfo.getMetaFile();
    FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume();
    // Copy files to temp dir first
    File[] blockFiles = copyBlockFiles(block.getBlockId(),
        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
        targetVolume.getTmpDir(block.getBlockPoolId()),
        replicaInfo.isOnTransientStorage(), smallBufferSize, conf);

    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
        targetVolume, blockFiles[0].getParentFile(), 0);
    newReplicaInfo.setNumBytes(blockFiles[1].length());
    // Finalize the copied files
    newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);

    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
  }

  // Replace the old block if any to reschedule the scanning.
  return replicaInfo;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
private ReplicaInfo recoverCheck(ExtendedBlock b, long newGS, 
    long expectedBlockLen) throws IOException {
  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check state
  if (replicaInfo.getState() != ReplicaState.FINALIZED &&
      replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
  }

  // check generation stamp
  long replicaGenerationStamp = replicaInfo.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
        + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // stop the previous writer before check a replica's length
  long replicaLen = replicaInfo.getNumBytes();
  if (replicaInfo.getState() == ReplicaState.RBW) {
    ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
    // kill the previous writer
    rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
    rbw.setWriter(Thread.currentThread());
    // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
    if (replicaLen != rbw.getBytesOnDisk() 
        || replicaLen != rbw.getBytesAcked()) {
      throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
          "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
          rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
          ") are not the same.");
    }
  }

  // check block length
  if (replicaLen != expectedBlockLen) {
    throw new IOException("Corrupted replica " + replicaInfo + 
        " with a length of " + replicaLen + 
        " expected length is " + expectedBlockLen);
  }

  return replicaInfo;
}
项目:aliyun-oss-hadoop-fs    文件:FsDatasetImpl.java   
@Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw(
    ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
    throws IOException {
  LOG.info("Recover RBW replica " + b);

  ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());

  // check the replica's state
  if (replicaInfo.getState() != ReplicaState.RBW) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
  }
  ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;

  LOG.info("Recovering " + rbw);

  // Stop the previous writer
  rbw.stopWriter(datanode.getDnConf().getXceiverStopTimeout());
  rbw.setWriter(Thread.currentThread());

  // check generation stamp
  long replicaGenerationStamp = rbw.getGenerationStamp();
  if (replicaGenerationStamp < b.getGenerationStamp() ||
      replicaGenerationStamp > newGS) {
    throw new ReplicaNotFoundException(
        ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
        ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
        newGS + "].");
  }

  // check replica length
  long bytesAcked = rbw.getBytesAcked();
  long numBytes = rbw.getNumBytes();
  if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){
    throw new ReplicaNotFoundException("Unmatched length replica " + 
        replicaInfo + ": BytesAcked = " + bytesAcked + 
        " BytesRcvd = " + numBytes + " are not in the range of [" + 
        minBytesRcvd + ", " + maxBytesRcvd + "].");
  }

  FsVolumeReference ref = rbw.getVolume().obtainReference();
  try {
    // Truncate the potentially corrupt portion.
    // If the source was client and the last node in the pipeline was lost,
    // any corrupt data written after the acked length can go unnoticed.
    if (numBytes > bytesAcked) {
      final File replicafile = rbw.getBlockFile();
      truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked);
      rbw.setNumBytes(bytesAcked);
      rbw.setLastChecksumAndDataLen(bytesAcked, null);
    }

    // bump the replica's generation stamp to newGS
    bumpReplicaGS(rbw, newGS);
  } catch (IOException e) {
    IOUtils.cleanup(null, ref);
    throw e;
  }
  return new ReplicaHandler(rbw, ref);
}