Java 类org.apache.hadoop.hdfs.protocol.ExtendedBlock 实例源码
项目:hadoop
文件:TestDataNodeVolumeFailure.java
/**
* go to each block on the 2nd DataNode until it fails...
* @param path
* @param size
* @throws IOException
*/
private void triggerFailure(String path, long size) throws IOException {
NamenodeProtocols nn = cluster.getNameNodeRpc();
List<LocatedBlock> locatedBlocks =
nn.getBlockLocations(path, 0, size).getLocatedBlocks();
for (LocatedBlock lb : locatedBlocks) {
DatanodeInfo dinfo = lb.getLocations()[1];
ExtendedBlock b = lb.getBlock();
try {
accessBlock(dinfo, lb);
} catch (IOException e) {
System.out.println("Failure triggered, on block: " + b.getBlockId() +
"; corresponding volume should be removed by now");
break;
}
}
}
项目:hadoop
文件:TestPBHelper.java
private LocatedBlock createLocatedBlock() {
DatanodeInfo[] dnInfos = {
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h1",
AdminStates.DECOMMISSION_INPROGRESS),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
AdminStates.DECOMMISSIONED),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL),
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4",
AdminStates.NORMAL),
};
String[] storageIDs = {"s1", "s2", "s3", "s4"};
StorageType[] media = {
StorageType.DISK,
StorageType.SSD,
StorageType.DISK,
StorageType.RAM_DISK
};
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53),
dnInfos, storageIDs, media, 5, false, new DatanodeInfo[]{});
lb.setBlockToken(new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service")));
return lb;
}
项目:hadoop
文件:DFSInputStream.java
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
final Span parentSpan = Trace.currentSpan();
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
return bb;
} finally {
scope.close();
}
}
};
}
项目:hadoop
文件:DFSInputStream.java
/**
* DFSInputStream reports checksum failure.
* Case I : client has tried multiple data nodes and at least one of the
* attempts has succeeded. We report the other failures as corrupted block to
* namenode.
* Case II: client has tried out all data nodes, but all failed. We
* only report if the total number of replica is 1. We do not
* report otherwise since this maybe due to the client is a handicapped client
* (who can not read).
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
private void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
return;
}
Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
.entrySet().iterator();
Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
ExtendedBlock blk = entry.getKey();
Set<DatanodeInfo> dnSet = entry.getValue();
if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
|| ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
int i = 0;
for (DatanodeInfo dn:dnSet) {
locs[i++] = dn;
}
LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
dfsClient.reportChecksumFailure(src, lblocks);
}
corruptedBlockMap.clear();
}
项目:hadoop
文件:ClientNamenodeProtocolTranslatorPB.java
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException {
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
.setSrc(src).setClientName(clientName).setFileId(fileId);
if (previous != null)
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
if (favoredNodes != null) {
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
try {
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
项目:hadoop
文件:BlockTokenSecretManager.java
/** Check if access should be allowed. userID is not checked if null */
public void checkAccess(Token<BlockTokenIdentifier> token, String userId,
ExtendedBlock block, AccessMode mode) throws InvalidToken {
BlockTokenIdentifier id = new BlockTokenIdentifier();
try {
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
} catch (IOException e) {
throw new InvalidToken(
"Unable to de-serialize block token identifier for user=" + userId
+ ", block=" + block + ", access mode=" + mode);
}
checkAccess(id, userId, block, mode);
if (!Arrays.equals(retrievePassword(id), token.getPassword())) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't have the correct token password");
}
}
项目:hadoop
文件:BlockReaderLocalLegacy.java
LocalDatanodeInfo() {
final int cacheSize = 10000;
final float hashTableLoadFactor = 0.75f;
int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
cache = Collections
.synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
hashTableCapacity, hashTableLoadFactor, true) {
private static final long serialVersionUID = 1;
@Override
protected boolean removeEldestEntry(
Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
return size() > cacheSize;
}
});
}
项目:hadoop
文件:TestWriteToReplica.java
@Test
public void testAppend() throws Exception {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
// set up replicasMap
String bpid = cluster.getNamesystem().getBlockPoolId();
ExtendedBlock[] blocks = setup(bpid, dataSet);
// test append
testAppend(bpid, dataSet, blocks);
} finally {
cluster.shutdown();
}
}
项目:hadoop
文件:FsDatasetImpl.java
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
synchronized(this) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
+ block + ", replica=" + replica);
} else if (replica.getGenerationStamp() > block.getGenerationStamp()) {
block.setGenerationStamp(replica.getGenerationStamp());
}
}
File datafile = getBlockFile(block);
File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
BlockLocalPathInfo info = new BlockLocalPathInfo(block,
datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}
项目:hadoop
文件:TestDataTransferProtocol.java
private void writeZeroLengthPacket(ExtendedBlock block, String description)
throws IOException {
PacketHeader hdr = new PacketHeader(
8, // size of packet
block.getNumBytes(), // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
0, // chunk length
false); // sync block
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
sendResponse(Status.SUCCESS, "", null, recvOut);
new PipelineAck(100, new int[] {PipelineAck.combineHeader
(PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
(recvOut);
sendRecvData(description, false);
}
项目:hadoop
文件:TestBalancerWithMultipleNameNodes.java
private static ExtendedBlock[][] generateBlocks(Suite s, long size
) throws IOException, InterruptedException, TimeoutException {
final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
for(int n = 0; n < s.clients.length; n++) {
final long fileLen = size/s.replication;
createFile(s, n, fileLen);
final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
FILE_NAME, 0, fileLen).getLocatedBlocks();
final int numOfBlocks = locatedBlocks.size();
blocks[n] = new ExtendedBlock[numOfBlocks];
for(int i = 0; i < numOfBlocks; i++) {
final ExtendedBlock b = locatedBlocks.get(i).getBlock();
blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
b.getNumBytes(), b.getGenerationStamp());
}
}
return blocks;
}
项目:hadoop
文件:TestBlockRecovery.java
/** Sync two replicas */
private void testSyncReplicas(ReplicaRecoveryInfo replica1,
ReplicaRecoveryInfo replica2,
InterDatanodeProtocol dn1,
InterDatanodeProtocol dn2,
long expectLen) throws IOException {
DatanodeInfo[] locs = new DatanodeInfo[]{
mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
RecoveringBlock rBlock = new RecoveringBlock(block,
locs, RECOVERY_ID);
ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
BlockRecord record1 = new BlockRecord(
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn1, replica1);
BlockRecord record2 = new BlockRecord(
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234), dn2, replica2);
syncList.add(record1);
syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong(), anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong(), anyLong())).thenReturn("storage2");
dn.syncBlock(rBlock, syncList);
}
项目:hadoop
文件:FSNamesystem.java
/**
* The client would like to obtain an additional block for the indicated
* filename (which is being written-to). Return an array that consists
* of the block, plus a set of machines. The first on this list should
* be where the client writes data. Subsequent items in the list must
* be provided in the connection to the first datanode.
*
* Make sure the previous blocks have been reported by datanodes and
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
ExtendedBlock previous, Set<Node> excludedNodes,
List<String> favoredNodes) throws IOException {
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
if (targets == null) {
assert onRetryBlock[0] != null : "Retry block is null";
// This is a retry. Just return the last block.
return onRetryBlock[0];
}
LocatedBlock newBlock = storeAllocatedBlock(
src, fileId, clientName, previous, targets);
return newBlock;
}
项目:hadoop
文件:TestBPOfferService.java
private ReceivedDeletedBlockInfo[] waitForBlockReceived(
final ExtendedBlock fakeBlock,
final DatanodeProtocolClientSideTranslatorPB mockNN) throws Exception {
final String fakeBlockPoolId = fakeBlock.getBlockPoolId();
final ArgumentCaptor<StorageReceivedDeletedBlocks[]> captor =
ArgumentCaptor.forClass(StorageReceivedDeletedBlocks[].class);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
Mockito.verify(mockNN).blockReceivedAndDeleted(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.eq(fakeBlockPoolId),
captor.capture());
return true;
} catch (Throwable t) {
return false;
}
}
}, 100, 10000);
return captor.getValue()[0].getBlocks();
}
项目:hadoop
文件:SimulatedFSDataset.java
@Override
public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
throws IOException {
final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
if (map == null) {
throw new IOException("Block pool not found, temporary=" + temporary);
}
final BInfo r = map.get(temporary.getLocalBlock());
if (r == null) {
throw new IOException("Block not found, temporary=" + temporary);
} else if (r.isFinalized()) {
throw new IOException("Replica already finalized, temporary="
+ temporary + ", r=" + r);
}
return r;
}
项目:hadoop
文件:BlockManager.java
/**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
* @param storageID if known, null otherwise.
* @param reason a textual reason why the block should be marked corrupt,
* for logging purposes
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn, String storageID, String reason) throws IOException {
assert namesystem.hasWriteLock();
final BlockInfoContiguous storedBlock = getStoredBlock(blk.getLocalBlock());
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
// thread of Datanode reports bad block before Block reports are sent
// by the Datanode on startup
blockLog.info("BLOCK* findAndMarkBlockAsCorrupt: {} not found", blk);
return;
}
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot mark " + blk
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
+ ") does not exist");
}
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
storageID == null ? null : node.getStorageInfo(storageID),
node);
}
项目:hadoop
文件:TestCommitBlockSynchronization.java
@Test
public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget()
throws IOException {
INodeFile file = mockFileUnderConstruction();
Block block = new Block(blockId, length, genStamp);
FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
DatanodeID[] newTargets = new DatanodeID[]{
new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0, 0)};
ExtendedBlock lastBlock = new ExtendedBlock();
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true,
false, newTargets, null);
// Repeat the call to make sure it returns true
namesystemSpy.commitBlockSynchronization(
lastBlock, genStamp, length, true, false, newTargets, null);
}
项目:hadoop
文件:BlockScanner.java
/**
* Mark a block as "suspect."
*
* This means that we should try to rescan it soon. Note that the
* VolumeScanner keeps a list of recently suspicious blocks, which
* it uses to avoid rescanning the same block over and over in a short
* time frame.
*
* @param storageId The ID of the storage where the block replica
* is being stored.
* @param block The block's ID and block pool id.
*/
synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
if (!isEnabled()) {
LOG.info("Not scanning suspicious block {} on {}, because the block " +
"scanner is disabled.", block, storageId);
return;
}
VolumeScanner scanner = scanners.get(storageId);
if (scanner == null) {
// This could happen if the volume is in the process of being removed.
// The removal process shuts down the VolumeScanner, but the volume
// object stays around as long as there are references to it (which
// should not be that long.)
LOG.info("Not scanning suspicious block {} on {}, because there is no " +
"volume scanner for that storageId.", block, storageId);
return;
}
scanner.markSuspectBlock(block);
}
项目:hadoop
文件:BlockReportTestBase.java
/**
* Test writes a file and closes it.
* Block reported is generated with an extra block.
* Block report is forced and the check for # of pendingdeletion
* blocks is performed.
*
* @throws IOException in case of an error
*/
@Test(timeout=300000)
public void blockReport_04() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath,
FILE_SIZE, REPL_FACTOR, rand.nextLong());
DataNode dn = cluster.getDataNodes().get(DN_N0);
// all blocks belong to the same file, hence same BP
String poolId = cluster.getNamesystem().getBlockPoolId();
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
sendBlockReports(dnR, poolId, reports);
printStats();
assertThat("Wrong number of corrupt blocks",
cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
assertThat("Wrong number of PendingDeletion blocks",
cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
}
项目:hadoop
文件:TestDNFencing.java
private void waitForTrueReplication(final MiniDFSCluster cluster,
final ExtendedBlock block, final int waitFor) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
return getTrueReplication(cluster, block) == waitFor;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}, 500, 10000);
}
项目:hadoop
文件:TestBlockToken.java
private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm,
ExtendedBlock block,
EnumSet<BlockTokenSecretManager.AccessMode> accessModes)
throws IOException {
Token<BlockTokenIdentifier> token = sm.generateToken(block, accessModes);
BlockTokenIdentifier id = sm.createIdentifier();
id.readFields(new DataInputStream(new ByteArrayInputStream(token
.getIdentifier())));
return id;
}
项目:hadoop
文件:SimulatedFSDataset.java
@Override // FsDatasetSpi
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newBlockId,
long newlength) {
// Caller does not care about the exact Storage UUID returned.
return datanodeUuid;
}
项目:hadoop
文件:DFSInputStream.java
/**
* Add corrupted block replica into map.
*/
private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
Set<DatanodeInfo> dnSet = null;
if((corruptedBlockMap.containsKey(blk))) {
dnSet = corruptedBlockMap.get(blk);
}else {
dnSet = new HashSet<DatanodeInfo>();
}
if (!dnSet.contains(node)) {
dnSet.add(node);
corruptedBlockMap.put(blk, dnSet);
}
}
项目:hadoop
文件:TestFsck.java
public void removeBlocks(MiniDFSCluster cluster)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
for (int corruptIdx : blocksToCorrupt) {
// Corrupt a block by deleting it
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
name, blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
for (int i = 0; i < numDataNodes; i++) {
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
assertTrue(blockFile.delete());
}
}
}
}
项目:hadoop
文件:FsDatasetImpl.java
/**
* Complete the block write!
*/
@Override // FsDatasetSpi
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
return;
}
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
项目:hadoop
文件:MiniDFSCluster.java
/**
* Get the block data file for a block from a given datanode
* @param dnIndex Index of the datanode to get block files for
* @param block block for which corresponding files are needed
*/
public File getBlockFile(int dnIndex, ExtendedBlock block) {
// Check for block file in the two storage directories of the datanode
for (int i = 0; i <=1 ; i++) {
File storageDir = getStorageDir(dnIndex, i);
File blockFile = getBlockFile(storageDir, block);
if (blockFile.exists()) {
return blockFile;
}
}
return null;
}
项目:hadoop
文件:BlockTokenSecretManager.java
/** Generate a block token for a specified user */
public Token<BlockTokenIdentifier> generateToken(String userId,
ExtendedBlock block, EnumSet<AccessMode> modes) throws IOException {
BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
.getBlockPoolId(), block.getBlockId(), modes);
return new Token<BlockTokenIdentifier>(id, this);
}
项目:hadoop
文件:BlockTokenSecretManager.java
/**
* Check if access should be allowed. userID is not checked if null. This
* method doesn't check if token password is correct. It should be used only
* when token password has already been verified (e.g., in the RPC layer).
*/
public void checkAccess(BlockTokenIdentifier id, String userId,
ExtendedBlock block, AccessMode mode) throws InvalidToken {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking access for user=" + userId + ", block=" + block
+ ", access mode=" + mode + " using " + id.toString());
}
if (userId != null && !userId.equals(id.getUserId())) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't belong to user " + userId);
}
if (!id.getBlockPoolId().equals(block.getBlockPoolId())) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't apply to block " + block);
}
if (id.getBlockId() != block.getBlockId()) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't apply to block " + block);
}
if (isExpired(id.getExpiryDate())) {
throw new InvalidToken("Block token with " + id.toString()
+ " is expired.");
}
if (!id.getAccessModes().contains(mode)) {
throw new InvalidToken("Block token with " + id.toString()
+ " doesn't have " + mode + " permission");
}
}
项目:hadoop
文件:TestBlockRecovery.java
/**
* BlockRecoveryFI_11. a replica's recovery id does not match new GS.
*
* @throws IOException in case of an error
*/
@Test
public void testNotMatchedReplicaID() throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
StorageType.DEFAULT, block, false).getReplica();
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
} catch (IOException e) {
e.getMessage().startsWith("Cannot recover ");
}
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class), any(String[].class));
} finally {
streams.close();
}
}
项目:hadoop
文件:DataNode.java
/**
* After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner
* @param block block to close
* @param delHint hint on which excess block to delete
* @param storageUuid UUID of the storage where block is stored
*/
void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
}
项目:hadoop
文件:TestProcessCorruptBlocks.java
/**
* The corrupt block has to be removed when the number of valid replicas
* matches replication factor for the file. In this the above condition is
* tested by reducing the replication factor
* The test strategy :
* Bring up Cluster with 3 DataNodes
* Create a file of replication factor 3
* Corrupt one replica of a block of the file
* Verify that there are still 2 good replicas and 1 corrupt replica
* (corrupt replica should not be removed since number of good
* replicas (2) is less than replication factor (3))
* Set the replication factor to 2
* Verify that the corrupt replica is removed.
* (corrupt replica should not be removed since number of good
* replicas (2) is equal to replication factor (2))
*/
@Test
public void testWhenDecreasingReplication() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, Integer.toString(2));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
try {
final Path fileName = new Path("/foo1");
DFSTestUtil.createFile(fs, fileName, 2, (short) 3, 0L);
DFSTestUtil.waitReplication(fs, fileName, (short) 3);
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
corruptBlock(cluster, fs, fileName, 0, block);
DFSTestUtil.waitReplication(fs, fileName, (short) 2);
assertEquals(2, countReplicas(namesystem, block).liveReplicas());
assertEquals(1, countReplicas(namesystem, block).corruptReplicas());
namesystem.setReplication(fileName.toString(), (short) 2);
// wait for 3 seconds so that all block reports are processed.
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
}
assertEquals(2, countReplicas(namesystem, block).liveReplicas());
assertEquals(0, countReplicas(namesystem, block).corruptReplicas());
} finally {
cluster.shutdown();
}
}
项目:hadoop
文件:DataNode.java
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
LOG.info(who + " calls recoverBlock(" + block
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
}
项目:hadoop
文件:FsDatasetImpl.java
/**
* Sets the offset in the meta file so that the
* last checksum will be overwritten.
*/
@Override // FsDatasetSpi
public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams,
int checksumSize) throws IOException {
FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
FileChannel channel = file.getChannel();
long oldPos = channel.position();
long newPos = oldPos - checksumSize;
if (LOG.isDebugEnabled()) {
LOG.debug("Changing meta file offset of block " + b + " from " +
oldPos + " to " + newPos);
}
channel.position(newPos);
}
项目:hadoop
文件:TestCachingStrategy.java
@Test(timeout=120000)
public void testFadviseAfterWriteThenRead() throws Exception {
// start a cluster
LOG.info("testFadviseAfterWriteThenRead");
tracker.clear();
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true);
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = cluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true);
// verify that we dropped everything from the cache.
Assert.assertNotNull(stats);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
项目:hadoop
文件:DataNode.java
/**
* Transfer a replica to the datanode targets.
* @param b the block to transfer.
* The corresponding replica must be an RBW or a Finalized.
* Its GS and numBytes will be set to
* the stored GS and the visible length.
* @param targets targets to transfer the block to
* @param client client name
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
//get replica information
synchronized(data) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {
throw new IOException(b + " not found in datanode.");
}
storedGS = storedBlock.getGenerationStamp();
if (storedGS < b.getGenerationStamp()) {
throw new IOException(storedGS
+ " = storedGS < b.getGenerationStamp(), b=" + b);
}
// Update the genstamp with storedGS
b.setGenerationStamp(storedGS);
if (data.isValidRbw(b)) {
stage = BlockConstructionStage.TRANSFER_RBW;
} else if (data.isValidBlock(b)) {
stage = BlockConstructionStage.TRANSFER_FINALIZED;
} else {
final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
}
visible = data.getReplicaVisibleLength(b);
}
//set visible length
b.setNumBytes(visible);
if (targets.length > 0) {
new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
}
}
项目:hadoop
文件:FSNamesystem.java
/**
* Update a pipeline for a block under construction
*
* @param clientName the name of the client
* @param oldBlock and old block
* @param newBlock a new block with a new generation stamp and length
* @param newNodes datanodes in the pipeline
* @throws IOException if any error occurs
*/
void updatePipeline(
String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
throws IOException {
checkOperation(OperationCategory.WRITE);
LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
+ ", newGS=" + newBlock.getGenerationStamp()
+ ", newLength=" + newBlock.getNumBytes()
+ ", newNodes=" + Arrays.asList(newNodes)
+ ", client=" + clientName
+ ")");
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Pipeline not updated");
assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+ oldBlock + " has different block identifier";
updatePipelineInternal(clientName, oldBlock, newBlock, newNodes,
newStorageIDs, logRetryCache);
} finally {
writeUnlock();
}
getEditLog().logSync();
LOG.info("updatePipeline(" + oldBlock.getLocalBlock() + " => "
+ newBlock.getLocalBlock() + ") success");
}
项目:hadoop
文件:FsDatasetImpl.java
@Override // FsDatasetSpi
public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"replica.getGenerationStamp() < block.getGenerationStamp(), block="
+ block + ", replica=" + replica);
}
return replica.getVisibleLength();
}
项目:hadoop
文件:TestClientReportBadBlock.java
/**
* Corrupt a block on a data node. Replace the block file content with content
* of 1, 2, ...BLOCK_SIZE.
*
* @param block
* the ExtendedBlock to be corrupted
* @param dn
* the data node where the block needs to be corrupted
* @throws FileNotFoundException
* @throws IOException
*/
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
throws FileNotFoundException, IOException {
final File f = DataNodeTestUtils.getBlockFile(
dn, block.getBlockPoolId(), block.getLocalBlock());
final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
final byte[] bytes = new byte[(int) BLOCK_SIZE];
for (int i = 0; i < BLOCK_SIZE; i++) {
bytes[i] = (byte) (i);
}
raFile.write(bytes);
raFile.close();
}
项目:hadoop
文件:TestBlockScanner.java
@Test(timeout=120000)
public void testCorruptBlockHandling() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 5;
final int CORRUPT_INDEX = 3;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4);
ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX);
ctx.cluster.corruptBlockOnDataNodes(badBlock);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
synchronized (info) {
info.shouldRun = true;
info.notify();
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
return info.blocksScanned == NUM_EXPECTED_BLOCKS;
}
}
}, 3, 30000);
synchronized (info) {
assertTrue(info.badBlocks.contains(badBlock));
for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
if (i != CORRUPT_INDEX) {
ExtendedBlock block = ctx.getFileBlock(0, i);
assertTrue(info.goodBlocks.contains(block));
}
}
}
ctx.close();
}
项目:hadoop
文件:FsDatasetImpl.java
/**
* Return the File associated with a block, without first
* checking that it exists. This should be used when the
* next operation is going to open the file for read anyway,
* and thus the exists check is redundant.
*
* @param touch if true then update the last access timestamp of the
* block. Currently used for blocks on transient storage.
*/
private File getBlockFileNoExistsCheck(ExtendedBlock b,
boolean touch)
throws IOException {
final File f;
synchronized(this) {
f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
}
if (f == null) {
throw new IOException("Block " + b + " is not valid");
}
return f;
}