Java 类org.apache.hadoop.hdfs.protocol.LocatedBlock 实例源码
项目:hadoop
文件:TestDataNodeVolumeFailure.java
/**
* go to each block on the 2nd DataNode until it fails...
* @param path
* @param size
* @throws IOException
*/
private void triggerFailure(String path, long size) throws IOException {
NamenodeProtocols nn = cluster.getNameNodeRpc();
List<LocatedBlock> locatedBlocks =
nn.getBlockLocations(path, 0, size).getLocatedBlocks();
for (LocatedBlock lb : locatedBlocks) {
DatanodeInfo dinfo = lb.getLocations()[1];
ExtendedBlock b = lb.getBlock();
try {
accessBlock(dinfo, lb);
} catch (IOException e) {
System.out.println("Failure triggered, on block: " + b.getBlockId() +
"; corresponding volume should be removed by now");
break;
}
}
}
项目:hadoop
文件:DFSOutputStream.java
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
String[] favoredNodes) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForAppend", src);
try {
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
progress, lastBlock, stat, checksum);
if (favoredNodes != null && favoredNodes.length != 0) {
out.streamer.setFavoredNodes(favoredNodes);
}
out.start();
return out;
} finally {
scope.close();
}
}
项目:hadoop
文件:TestDFSClientRetries.java
private LocatedBlocks makeBadBlockList(LocatedBlocks goodBlockList) {
LocatedBlock goodLocatedBlock = goodBlockList.get(0);
LocatedBlock badLocatedBlock = new LocatedBlock(
goodLocatedBlock.getBlock(),
new DatanodeInfo[] {
DFSTestUtil.getDatanodeInfo("1.2.3.4", "bogus", 1234)
},
goodLocatedBlock.getStartOffset(),
false);
List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
badBlocks.add(badLocatedBlock);
return new LocatedBlocks(goodBlockList.getFileLength(), false,
badBlocks, null, true,
null);
}
项目:hadoop
文件:TestNameNodeMetrics.java
/** Test to ensure metrics reflects missing blocks */
@Test
public void testMissingBlock() throws Exception {
// Create a file with single block with two replicas
Path file = getTestPath("testMissingBlocks");
createFile(file, 100, (short)1);
// Corrupt the only replica of the block to result in a missing block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
"STORAGE_ID", "TEST");
} finally {
cluster.getNamesystem().writeUnlock();
}
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("UnderReplicatedBlocks", 1L, rb);
assertGauge("MissingBlocks", 1L, rb);
assertGauge("MissingReplOneBlocks", 1L, rb);
fs.delete(file, true);
waitForDnMetricValue(NS_METRICS, "UnderReplicatedBlocks", 0L);
}
项目:hadoop
文件:NNThroughputBenchmark.java
private ExtendedBlock addBlocks(String fileName, String clientName)
throws IOException {
ExtendedBlock prevBlock = null;
for(int jdx = 0; jdx < blocksPerFile; jdx++) {
LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
prevBlock = loc.getBlock();
for(DatanodeInfo dnInfo : loc.getLocations()) {
int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());
datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
ReceivedDeletedBlockInfo[] rdBlocks = { new ReceivedDeletedBlockInfo(
loc.getBlock().getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, null) };
StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
datanodes[dnIdx].storage.getStorageID(), rdBlocks) };
nameNodeProto.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
.getBlock().getBlockPoolId(), report);
}
}
return prevBlock;
}
项目:hadoop
文件:DFSInputStream.java
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
final int hedgedReadId) {
final Span parentSpan = Trace.currentSpan();
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
TraceScope scope =
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
try {
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
return bb;
} finally {
scope.close();
}
}
};
}
项目:hadoop
文件:DFSInputStream.java
/**
* DFSInputStream reports checksum failure.
* Case I : client has tried multiple data nodes and at least one of the
* attempts has succeeded. We report the other failures as corrupted block to
* namenode.
* Case II: client has tried out all data nodes, but all failed. We
* only report if the total number of replica is 1. We do not
* report otherwise since this maybe due to the client is a handicapped client
* (who can not read).
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
private void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
int dataNodeCount) {
if (corruptedBlockMap.isEmpty()) {
return;
}
Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap
.entrySet().iterator();
Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
ExtendedBlock blk = entry.getKey();
Set<DatanodeInfo> dnSet = entry.getValue();
if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0))
|| ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) {
DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
int i = 0;
for (DatanodeInfo dn:dnSet) {
locs[i++] = dn;
}
LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) };
dfsClient.reportChecksumFailure(src, lblocks);
}
corruptedBlockMap.clear();
}
项目:hadoop
文件:DFSClient.java
/**
* Connect to the given datanode's datantrasfer port, and return
* the resulting IOStreamPair. This includes encryption wrapping, etc.
*/
private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
LocatedBlock lb) throws IOException {
boolean success = false;
Socket sock = null;
try {
sock = socketFactory.createSocket();
String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock);
InputStream unbufIn = NetUtils.getInputStream(sock);
IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
lb.getBlockToken(), dn);
success = true;
return ret;
} finally {
if (!success) {
IOUtils.closeSocket(sock);
}
}
}
项目:hadoop
文件:BlockStorageLocationUtil.java
/**
* Helper method to combine a list of {@link LocatedBlock} with associated
* {@link VolumeId} information to form a list of {@link BlockStorageLocation}
* .
*/
static BlockStorageLocation[] convertToVolumeBlockLocations(
List<LocatedBlock> blocks,
Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
// Construct the final return value of VolumeBlockLocation[]
BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
List<BlockStorageLocation> volumeBlockLocs =
new ArrayList<BlockStorageLocation>(locations.length);
for (int i = 0; i < locations.length; i++) {
LocatedBlock locBlock = blocks.get(i);
List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
BlockStorageLocation bsLoc = new BlockStorageLocation(locations[i],
volumeIds.toArray(new VolumeId[0]));
volumeBlockLocs.add(bsLoc);
}
return volumeBlockLocs.toArray(new BlockStorageLocation[] {});
}
项目:hadoop
文件:FSNamesystem.java
/**
* The client would like to obtain an additional block for the indicated
* filename (which is being written-to). Return an array that consists
* of the block, plus a set of machines. The first on this list should
* be where the client writes data. Subsequent items in the list must
* be provided in the connection to the first datanode.
*
* Make sure the previous blocks have been reported by datanodes and
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
ExtendedBlock previous, Set<Node> excludedNodes,
List<String> favoredNodes) throws IOException {
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
if (targets == null) {
assert onRetryBlock[0] != null : "Retry block is null";
// This is a retry. Just return the last block.
return onRetryBlock[0];
}
LocatedBlock newBlock = storeAllocatedBlock(
src, fileId, clientName, previous, targets);
return newBlock;
}
项目:hadoop
文件:NameNodeRpcServer.java
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes)
throws IOException {
checkNNStartup();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
+ " fileId=" + fileId + " for " + clientName);
}
Set<Node> excludedNodesSet = null;
if (excludedNodes != null) {
excludedNodesSet = new HashSet<Node>(excludedNodes.length);
for (Node node : excludedNodes) {
excludedNodesSet.add(node);
}
}
List<String> favoredNodesList = (favoredNodes == null) ? null
: Arrays.asList(favoredNodes);
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodesSet, favoredNodesList);
if (locatedBlock != null)
metrics.incrAddBlockOps();
return locatedBlock;
}
项目:hadoop
文件:TestBalancerWithMultipleNameNodes.java
private static ExtendedBlock[][] generateBlocks(Suite s, long size
) throws IOException, InterruptedException, TimeoutException {
final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
for(int n = 0; n < s.clients.length; n++) {
final long fileLen = size/s.replication;
createFile(s, n, fileLen);
final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
FILE_NAME, 0, fileLen).getLocatedBlocks();
final int numOfBlocks = locatedBlocks.size();
blocks[n] = new ExtendedBlock[numOfBlocks];
for(int i = 0; i < numOfBlocks; i++) {
final ExtendedBlock b = locatedBlocks.get(i).getBlock();
blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
b.getNumBytes(), b.getGenerationStamp());
}
}
return blocks;
}
项目:hadoop
文件:TestDefaultBlockPlacementPolicy.java
private void testPlacement(String clientMachine,
String clientRack) throws IOException {
// write 5 files and check whether all times block placed
for (int i = 0; i < 5; i++) {
String src = "/test-" + i;
// Create the file with client machine
HdfsFileStatus fileStatus = namesystem.startFile(src, perm,
clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true,
REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false);
LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine,
null, null, fileStatus.getFileId(), null);
assertEquals("Block should be allocated sufficient locations",
REPLICATION_FACTOR, locatedBlock.getLocations().length);
if (clientRack != null) {
assertEquals("First datanode should be rack local", clientRack,
locatedBlock.getLocations()[0].getNetworkLocation());
}
nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
src, clientMachine);
}
}
项目:hadoop
文件:ClientDatanodeProtocolTranslatorPB.java
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
// Since we're creating a new UserGroupInformation here, we know that no
// future RPC proxies will be able to re-use the same connection. And
// usages of this proxy tend to be one-off calls.
//
// This is a temporary fix: callers should really achieve this by using
// RPC.stopProxy() on the resulting object, but this is currently not
// working in trunk. See the discussion on HDFS-1965.
Configuration confWithNoIpcIdle = new Configuration(conf);
confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
UserGroupInformation ticket = UserGroupInformation
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
ticket.addToken(locatedBlock.getBlockToken());
return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
项目:hadoop
文件:ClientNamenodeProtocolServerSideTranslatorPB.java
@Override
public AddBlockResponseProto addBlock(RpcController controller,
AddBlockRequestProto req) throws ServiceException {
try {
List<DatanodeInfoProto> excl = req.getExcludeNodesList();
List<String> favor = req.getFavoredNodesList();
LocatedBlock result = server.addBlock(
req.getSrc(),
req.getClientName(),
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
(excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
(favor == null || favor.size() == 0) ? null : favor
.toArray(new String[favor.size()]));
return AddBlockResponseProto.newBuilder()
.setBlock(PBHelper.convert(result)).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
项目:hadoop
文件:ClientNamenodeProtocolTranslatorPB.java
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
String[] favoredNodes)
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException {
AddBlockRequestProto.Builder req = AddBlockRequestProto.newBuilder()
.setSrc(src).setClientName(clientName).setFileId(fileId);
if (previous != null)
req.setPrevious(PBHelper.convert(previous));
if (excludeNodes != null)
req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
if (favoredNodes != null) {
req.addAllFavoredNodes(Arrays.asList(favoredNodes));
}
try {
return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
项目:hadoop
文件:ClientNamenodeProtocolTranslatorPB.java
@Override
public LocatedBlock getAdditionalDatanode(String src, long fileId,
ExtendedBlock blk, DatanodeInfo[] existings, String[] existingStorageIDs,
DatanodeInfo[] excludes,
int numAdditionalNodes, String clientName) throws AccessControlException,
FileNotFoundException, SafeModeException, UnresolvedLinkException,
IOException {
GetAdditionalDatanodeRequestProto req = GetAdditionalDatanodeRequestProto
.newBuilder()
.setSrc(src)
.setFileId(fileId)
.setBlk(PBHelper.convert(blk))
.addAllExistings(PBHelper.convert(existings))
.addAllExistingStorageUuids(Arrays.asList(existingStorageIDs))
.addAllExcludes(PBHelper.convert(excludes))
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)
.build();
try {
return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req)
.getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
项目:hadoop
文件:TestAddBlockRetry.java
@Test
public void testAddBlockRetryShouldReturnBlockWithLocations()
throws Exception {
final String src = "/testAddBlockRetryShouldReturnBlockWithLocations";
NamenodeProtocols nameNodeRpc = cluster.getNameNodeRpc();
// create file
nameNodeRpc.create(src, FsPermission.getFileDefault(), "clientName",
new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE)), true,
(short) 3, 1024, null);
// start first addBlock()
LOG.info("Starting first addBlock for " + src);
LocatedBlock lb1 = nameNodeRpc.addBlock(src, "clientName", null, null,
INodeId.GRANDFATHER_INODE_ID, null);
assertTrue("Block locations should be present",
lb1.getLocations().length > 0);
cluster.restartNameNode();
nameNodeRpc = cluster.getNameNodeRpc();
LocatedBlock lb2 = nameNodeRpc.addBlock(src, "clientName", null, null,
INodeId.GRANDFATHER_INODE_ID, null);
assertEquals("Blocks are not equal", lb1.getBlock(), lb2.getBlock());
assertTrue("Wrong locations with retry", lb2.getLocations().length > 0);
}
项目:hadoop
文件:TestStorageMover.java
private void verifyFile(final Path parent, final HdfsFileStatus status,
final Byte expectedPolicyId) throws Exception {
HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
byte policyId = fileStatus.getStoragePolicy();
BlockStoragePolicy policy = policies.getPolicy(policyId);
if (expectedPolicyId != null) {
Assert.assertEquals((byte)expectedPolicyId, policy.getId());
}
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
for(LocatedBlock lb : fileStatus.getBlockLocations().getLocatedBlocks()) {
final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
lb.getStorageTypes());
Assert.assertTrue(fileStatus.getFullName(parent.toString())
+ " with policy " + policy + " has non-empty overlap: " + diff
+ ", the corresponding block is " + lb.getBlock().getLocalBlock(),
diff.removeOverlap(true));
}
}
项目:hadoop
文件:TestStorageMover.java
private Replication getOrVerifyReplication(Path file, Replication expected)
throws IOException {
final List<LocatedBlock> lbs = dfs.getClient().getLocatedBlocks(
file.toString(), 0).getLocatedBlocks();
Assert.assertEquals(1, lbs.size());
LocatedBlock lb = lbs.get(0);
StringBuilder types = new StringBuilder();
final Replication r = new Replication();
for(StorageType t : lb.getStorageTypes()) {
types.append(t).append(", ");
if (t == StorageType.DISK) {
r.disk++;
} else if (t == StorageType.ARCHIVE) {
r.archive++;
} else {
Assert.fail("Unexpected storage type " + t);
}
}
if (expected != null) {
final String s = "file = " + file + "\n types = [" + types + "]";
Assert.assertEquals(s, expected, r);
}
return r;
}
项目:hadoop
文件:TestRetryCacheWithHA.java
@Override
void prepare() throws Exception {
final Path filePath = new Path(file);
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
// append to the file and leave the last block under construction
out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND),
null, null);
byte[] appendContent = new byte[100];
new Random().nextBytes(appendContent);
out.write(appendContent);
((HdfsDataOutputStream) out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
LocatedBlocks blks = dfs.getClient()
.getLocatedBlocks(file, BlockSize + 1);
assertEquals(1, blks.getLocatedBlocks().size());
nodes = blks.get(0).getLocations();
oldBlock = blks.get(0).getBlock();
LocatedBlock newLbk = client.getNamenode().updateBlockForPipeline(
oldBlock, client.getClientName());
newBlock = new ExtendedBlock(oldBlock.getBlockPoolId(),
oldBlock.getBlockId(), oldBlock.getNumBytes(),
newLbk.getBlock().getGenerationStamp());
}
项目:hadoop-oss
文件:NuCypherExtUtilClient.java
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
connectToDnViaHostname, locatedBlock);
}
项目:hadoop
文件:TestDirectoryScanner.java
@Test (timeout=300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
.numDataNodes(1)
.build();
try {
cluster.waitActive();
DataNode dataNode = cluster.getDataNodes().get(0);
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
scanner = new DirectoryScanner(dataNode, fds, CONF);
scanner.setRetainDiffs(true);
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
// Add a file with 1 block
List<LocatedBlock> blocks =
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
// Ensure no difference between volumeMap and disk.
scan(1, 0, 0, 0, 0, 0);
// Make a copy of the block on RAM_DISK and ensure that it is
// picked up by the scanner.
duplicateBlock(blocks.get(0).getBlock().getBlockId());
scan(2, 1, 0, 0, 0, 0, 1);
verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
scan(1, 0, 0, 0, 0, 0);
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
cluster.shutdown();
cluster = null;
}
}
项目:hadoop
文件:TestInterDatanodeProtocol.java
public static LocatedBlock getLastLocatedBlock(
ClientProtocol namenode, String src) throws IOException {
//get block info for the last block
LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
List<LocatedBlock> blocks = locations.getLocatedBlocks();
DataNode.LOG.info("blocks.size()=" + blocks.size());
assertTrue(blocks.size() > 0);
return blocks.get(blocks.size() - 1);
}
项目:hadoop
文件:DFSUtil.java
/**
* Convert a List<LocatedBlock> to BlockLocation[]
* @param blocks A List<LocatedBlock> to be converted
* @return converted array of BlockLocation
*/
public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
if (blocks == null) {
return new BlockLocation[0];
}
int nrBlocks = blocks.size();
BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
if (nrBlocks == 0) {
return blkLocations;
}
int idx = 0;
for (LocatedBlock blk : blocks) {
assert idx < nrBlocks : "Incorrect index";
DatanodeInfo[] locations = blk.getLocations();
String[] hosts = new String[locations.length];
String[] xferAddrs = new String[locations.length];
String[] racks = new String[locations.length];
for (int hCnt = 0; hCnt < locations.length; hCnt++) {
hosts[hCnt] = locations[hCnt].getHostName();
xferAddrs[hCnt] = locations[hCnt].getXferAddr();
NodeBase node = new NodeBase(xferAddrs[hCnt],
locations[hCnt].getNetworkLocation());
racks[hCnt] = node.toString();
}
DatanodeInfo[] cachedLocations = blk.getCachedLocations();
String[] cachedHosts = new String[cachedLocations.length];
for (int i=0; i<cachedLocations.length; i++) {
cachedHosts[i] = cachedLocations[i].getHostName();
}
blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
racks,
blk.getStartOffset(),
blk.getBlockSize(),
blk.isCorrupt());
idx++;
}
return blkLocations;
}
项目:hadoop
文件:TestPBHelper.java
@Test
public void testConvertLocatedBlockArray() {
LocatedBlock [] lbl = new LocatedBlock[3];
for (int i=0;i<3;i++) {
lbl[i] = createLocatedBlock();
}
LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl);
LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl);
assertEquals(lbl.length, lbl2.length);
for (int i=0;i<lbl.length;i++) {
compare(lbl[i], lbl2[i]);
}
}
项目:hadoop
文件:DFSInputStream.java
/**
* Get block at the specified position.
* Fetch it from the namenode if not cached.
*
* @param offset block corresponding to this offset in file is returned
* @return located block
* @throws IOException
*/
private LocatedBlock getBlockAt(long offset) throws IOException {
synchronized(infoLock) {
assert (locatedBlocks != null) : "locatedBlocks is null";
final LocatedBlock blk;
//check offset
if (offset < 0 || offset >= getFileLength()) {
throw new IOException("offset < 0 || offset >= getFileLength(), offset="
+ offset
+ ", locatedBlocks=" + locatedBlocks);
}
else if (offset >= locatedBlocks.getFileLength()) {
// offset to the portion of the last block,
// which is not known to the name-node yet;
// getting the last block
blk = locatedBlocks.getLastLocatedBlock();
}
else {
// search cached blocks first
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
// fetch more blocks
final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
assert (newBlocks != null) : "Could not find target position " + offset;
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
blk = locatedBlocks.get(targetBlockIdx);
}
return blk;
}
}
项目:hadoop
文件:DFSInputStream.java
/**
* Get blocks in the specified range.
* Fetch them from the namenode if not cached. This function
* will not get a read request beyond the EOF.
* @param offset starting offset in file
* @param length length of data
* @return consequent segment of located blocks
* @throws IOException
*/
private List<LocatedBlock> getBlockRange(long offset,
long length) throws IOException {
// getFileLength(): returns total file length
// locatedBlocks.getFileLength(): returns length of completed blocks
if (offset >= getFileLength()) {
throw new IOException("Offset: " + offset +
" exceeds file length: " + getFileLength());
}
synchronized(infoLock) {
final List<LocatedBlock> blocks;
final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
if (readOffsetWithinCompleteBlk) {
//get the blocks of finalized (completed) block range
blocks = getFinalizedBlockRange(offset,
Math.min(length, lengthOfCompleteBlk - offset));
} else {
blocks = new ArrayList<LocatedBlock>(1);
}
// get the blocks from incomplete block range
if (readLengthPastCompleteBlk) {
blocks.add(locatedBlocks.getLastLocatedBlock());
}
return blocks;
}
}
项目:hadoop
文件:DFSInputStream.java
/**
* Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order.
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node.
* @throws IOException
*/
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null;
StorageType storageType = null;
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same
// index to get storage type.
if (storageTypes != null && i < storageTypes.length) {
storageType = storageTypes[i];
}
break;
}
}
}
if (chosenNode == null) {
throw new IOException("No live nodes contain block " + block.getBlock() +
" after checking nodes = " + Arrays.toString(nodes) +
", ignoredNodes = " + ignoredNodes);
}
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType);
}
项目:hadoop
文件:TestPipelines.java
/**
* Creates and closes a file of certain length.
* Calls append to allow next write() operation to add to the end of it
* After write() invocation, calls hflush() to make sure that data sunk through
* the pipeline and check the state of the last block's replica.
* It supposes to be in RBW state
*
* @throws IOException in case of an error
*/
@Test
public void pipeline_01() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + METHOD_NAME);
}
Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong());
if(LOG.isDebugEnabled()) {
LOG.debug("Invoking append but doing nothing otherwise...");
}
FSDataOutputStream ofs = fs.append(filePath);
ofs.writeBytes("Some more stuff to write");
((DFSOutputStream) ofs.getWrappedStream()).hflush();
List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations(
filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();
String bpid = cluster.getNamesystem().getBlockPoolId();
for (DataNode dn : cluster.getDataNodes()) {
Replica r = DataNodeTestUtils.fetchReplicaInfo(dn, bpid, lb.get(0)
.getBlock().getBlockId());
assertTrue("Replica on DN " + dn + " shouldn't be null", r != null);
assertEquals("Should be RBW replica on " + dn
+ " after sequence of calls append()/write()/hflush()",
HdfsServerConstants.ReplicaState.RBW, r.getState());
}
ofs.close();
}
项目:hadoop
文件:TestPBHelper.java
private void compare(LocatedBlock expected, LocatedBlock actual) {
assertEquals(expected.getBlock(), actual.getBlock());
compare(expected.getBlockToken(), actual.getBlockToken());
assertEquals(expected.getStartOffset(), actual.getStartOffset());
assertEquals(expected.isCorrupt(), actual.isCorrupt());
DatanodeInfo [] ei = expected.getLocations();
DatanodeInfo [] ai = actual.getLocations();
assertEquals(ei.length, ai.length);
for (int i = 0; i < ei.length ; i++) {
compare(ei[i], ai[i]);
}
}
项目:hadoop
文件:TestFileCreation.java
private void assertBlocks(BlockManager bm, LocatedBlocks lbs,
boolean exist) {
for (LocatedBlock locatedBlock : lbs.getLocatedBlocks()) {
if (exist) {
assertTrue(bm.getStoredBlock(locatedBlock.getBlock().
getLocalBlock()) != null);
} else {
assertTrue(bm.getStoredBlock(locatedBlock.getBlock().
getLocalBlock()) == null);
}
}
}
项目:hadoop
文件:DFSClient.java
void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
try {
reportBadBlocks(lblocks);
} catch (IOException ie) {
LOG.info("Found corruption while reading " + file
+ ". Error repairing corrupt blocks. Bad blocks remain.", ie);
}
}
项目:hadoop
文件:TestBalancerWithNodeGroup.java
private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) {
Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>();
for (LocatedBlock blk : blks) {
for (DatanodeInfo di : blk.getLocations()) {
if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) {
ret.add(blk.getBlock());
break;
}
}
}
return ret;
}
项目:hadoop
文件:DataTransferTestUtil.java
/** Initialize the pipeline. */
@Override
public synchronized Pipeline initPipeline(LocatedBlock lb) {
final Pipeline pl = new Pipeline(lb);
if (pipelines.contains(pl)) {
throw new IllegalStateException("thepipeline != null");
}
pipelines.add(pl);
return pl;
}
项目:hadoop
文件:TestReplication.java
private void waitForBlockReplication(String filename,
ClientProtocol namenode,
int expected, long maxWaitSec)
throws IOException {
long start = Time.monotonicNow();
//wait for all the blocks to be replicated;
LOG.info("Checking for block replication for " + filename);
while (true) {
boolean replOk = true;
LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
Long.MAX_VALUE);
for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
iter.hasNext();) {
LocatedBlock block = iter.next();
int actual = block.getLocations().length;
if ( actual < expected ) {
LOG.info("Not enough replicas for " + block.getBlock()
+ " yet. Expecting " + expected + ", got " + actual + ".");
replOk = false;
break;
}
}
if (replOk) {
return;
}
if (maxWaitSec > 0 &&
(Time.monotonicNow() - start) > (maxWaitSec * 1000)) {
throw new IOException("Timedout while waiting for all blocks to " +
" be replicated for " + filename);
}
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {}
}
}
项目:hadoop
文件:TestDFSClientRetries.java
/** Test that timeout occurs when DN does not respond to RPC.
* Start up a server and ask it to sleep for n seconds. Make an
* RPC to the server and set rpcTimeout to less than n and ensure
* that socketTimeoutException is obtained
*/
@Test
public void testClientDNProtocolTimeout() throws IOException {
final Server server = new TestServer(1, true);
server.start();
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
DatanodeID fakeDnId = DFSTestUtil.getLocalDatanodeID(addr.getPort());
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L));
LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]);
ClientDatanodeProtocol proxy = null;
try {
proxy = DFSUtil.createClientDatanodeProtocolProxy(
fakeDnId, conf, 500, false, fakeBlock);
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));
fail ("Did not get expected exception: SocketTimeoutException");
} catch (SocketTimeoutException e) {
LOG.info("Got the expected Exception: SocketTimeoutException");
} finally {
if (proxy != null) {
RPC.stopProxy(proxy);
}
server.stop();
}
}
项目:hadoop
文件:TestDirectoryScanner.java
/** create a file with a length of <code>fileLen</code> */
private List<LocatedBlock> createFile(String fileNamePrefix,
long fileLen,
boolean isLazyPersist) throws IOException {
FileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/" + fileNamePrefix + ".dat");
DFSTestUtil.createFile(
fs, filePath, isLazyPersist, 1024, fileLen,
BLOCK_LENGTH, (short) 1, r.nextLong(), false);
return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
}
项目:hadoop
文件:BlockReportTestBase.java
private Block findBlock(Path path, long size) throws IOException {
Block ret;
List<LocatedBlock> lbs =
cluster.getNameNodeRpc()
.getBlockLocations(path.toString(),
FILE_START, size).getLocatedBlocks();
LocatedBlock lb = lbs.get(lbs.size() - 1);
// Get block from the first DN
ret = cluster.getDataNodes().get(DN_N0).
data.getStoredBlock(lb.getBlock()
.getBlockPoolId(), lb.getBlock().getBlockId());
return ret;
}
项目:hadoop
文件:TestINodeFile.java
private static void checkEquals(LocatedBlocks l1, LocatedBlocks l2) {
List<LocatedBlock> list1 = l1.getLocatedBlocks();
List<LocatedBlock> list2 = l2.getLocatedBlocks();
assertEquals(list1.size(), list2.size());
for (int i = 0; i < list1.size(); i++) {
LocatedBlock b1 = list1.get(i);
LocatedBlock b2 = list2.get(i);
assertEquals(b1.getBlock(), b2.getBlock());
assertEquals(b1.getBlockSize(), b2.getBlockSize());
}
}