/** * Recover blocks that were being written when the datanode * was earlier shut down. These blocks get re-inserted into * ongoingCreates. Also, send a blockreceived message to the NN * for each of these blocks because these are not part of a * block report. */ private void recoverBlocksBeingWritten(File bbw) throws IOException { FSDir fsd = new FSDir(namespaceId, bbw, this.volume); LightWeightHashSet<BlockAndFile> blockSet = new LightWeightHashSet<BlockAndFile>(); fsd.getBlockAndFileInfo(blockSet); for (BlockAndFile b : blockSet) { File f = b.pathfile; // full path name of block file lock.writeLock().lock(); try { volumeMap.add(namespaceId, b.block, new DatanodeBlockInfo(volume, f, DatanodeBlockInfo.UNFINALIZED)); volumeMap.addOngoingCreates(namespaceId, b.block, ActiveFile.createStartupRecoveryFile(f)); } finally { lock.writeLock().unlock(); } if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block + "namespaceId: "+namespaceId); } } }
/** Return the block file for the given ID */ public File findBlockFile(int namespaceId, long blockId) { lock.readLock().lock(); try { final Block eb = new Block(blockId); File blockfile = null; ActiveFile activefile = volumeMap.getOngoingCreates(namespaceId, eb); if (activefile != null) { blockfile = activefile.file; } if (blockfile == null) { blockfile = getFile(namespaceId, eb); } if (blockfile == null) { if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("volumeMap=" + volumeMap); } } return blockfile; } finally { lock.readLock().unlock(); } }
/** * Return a list of active writer threads for the given block. * @return null if there are no such threads or the file is * not being created */ private ArrayList<Thread> getActiveThreads(int namespaceId, Block block) { lock.writeLock().lock(); try { //check ongoing create threads final ActiveFile activefile = volumeMap.getOngoingCreates(namespaceId, block); if (activefile != null && !activefile.threads.isEmpty()) { //remove dead threads for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) { final Thread t = i.next(); if (!t.isAlive()) { i.remove(); } } //return living threads if (!activefile.threads.isEmpty()) { return new ArrayList<Thread>(activefile.threads); } } } finally { lock.writeLock().unlock(); } return null; }
/** * Remove the temporary block file (if any) */ public void unfinalizeBlock(int namespaceId, Block b) throws IOException { lock.writeLock().lock(); try { // remove the block from in-memory data structure ActiveFile activefile = volumeMap.removeOngoingCreates(namespaceId, b); if (activefile == null) { return; } volumeMap.remove(namespaceId, b); // delete the on-disk temp file if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) { DataNode.LOG.warn("Block " + b + " unfinalized and removed. " ); } } finally { lock.writeLock().unlock(); } }
ActiveFile getOngoingCreates(int namespaceId, Block block) { checkBlock(block); NamespaceMap nm = getNamespaceMap(namespaceId); if (nm == null) { return null; } return nm.getOngoingCreates(block); }
ActiveFile removeOngoingCreates(int namespaceId, Block block) { checkBlock(block); NamespaceMap nm = getNamespaceMap(namespaceId); if (nm == null) { return null; } return nm.removeOngoingCreates(block); }
ActiveFile addOngoingCreates(int namespaceId, Block block, ActiveFile af) { checkBlock(block); NamespaceMap nm = getNamespaceMap(namespaceId); if (nm == null) { return null; } return nm.addOngoingCreates(block, af); }
NamespaceMap(int namespaceId) { numBucket = NUM_BUCKETS; this.namespaceId = namespaceId; ongoingCreates = new ConcurrentHashMap<Block, ActiveFile>(); blockBuckets = new BlockBucket[numBucket]; for (int i = 0; i < numBucket; i++) { blockBuckets[i] = new BlockBucket(i); } }
ActiveFile(File f, List<Thread> list, long expectedSize) throws IOException { this(f, false, expectedSize); if (list != null) { threads.addAll(list); } threads.add(Thread.currentThread()); }
private ActiveFile(File f, boolean recovery, long expectedSize) throws IOException { file = f; long fileLength = f.length(); if (expectedSize != UNKNOWN_SIZE && fileLength != expectedSize) { throw new IOException("File " + f + " on disk size " + fileLength + " doesn't match expected size " + expectedSize); } bytesAcked = bytesOnDisk = fileLength; wasRecoveredOnStartup = recovery; }
@Override public long getOnDiskLength(int namespaceId, Block b) throws IOException { ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b); if (activeFile != null) { return activeFile.getBytesOnDisk(); } else { return getFinalizedBlockLength(namespaceId, b); } }
@Override public long getVisibleLength(int namespaceId, Block b) throws IOException { ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b); if (activeFile != null) { return activeFile.getBytesAcked(); } else { return getFinalizedBlockLength(namespaceId, b); } }
/** * Complete the block write! */ public void finalizeBlockInternal(int namespaceId, Block b, boolean reFinalizeOk) throws IOException { lock.writeLock().lock(); DatanodeBlockInfo replicaInfo = volumeMap.get(namespaceId, b); try { ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b); if (activeFile == null) { if (reFinalizeOk) { return; } else { throw new IOException("Block " + b + " is already finalized."); } } File f = activeFile.file; if (f == null || !f.exists()) { throw new IOException("No temporary file " + f + " for block " + b); } FSVolume v = replicaInfo.getVolume(); if (v == null) { throw new IOException("No volume for temporary file " + f + " for block " + b); } File dest = null; dest = v.addBlock(namespaceId, b, f); volumeMap.add(namespaceId, b, new DatanodeBlockInfo(v, dest, activeFile.getBytesOnDisk())); volumeMap.removeOngoingCreates(namespaceId, b); } finally { lock.writeLock().unlock(); } }
private boolean isBlockFinalizedInternal(int namespaceId, Block b, boolean validate) { DatanodeBlockInfo blockInfo = volumeMap.get(namespaceId, b); // We skip the check for validate case to avoid redundant codes // but keep old codes' behavior. Though it looks like a bug, but we // would fix it in a separate patch. // if (!validate && blockInfo == null) { return false; // block is not finalized } FSVolume v = blockInfo.getVolume(); if (v == null) { DataNode.LOG.warn("No volume for block " + b); return false; // block is not finalized } ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, b); if (activeFile != null) { if (validate) { File f = activeFile.file; if (f == null || !f.exists()) { // we should never get into this position. DataNode.LOG.warn("No temporary file " + f + " for block " + b); } } return false; // block is not finalized } return true; // block is finalized }
synchronized void initNamespace(int namespaceId) { Map<Block, DatanodeBlockInfo> m = namespaceMap.get(namespaceId); if(m != null){ return; } m = new HashMap<Block, DatanodeBlockInfo>(); namespaceMap.put(namespaceId, m); Map<Block, ActiveFile> oc = new HashMap<Block, ActiveFile>(); ongoingCreates.put(namespaceId, oc); }
ActiveFile getOngoingCreates(int namespaceId, Block block) { checkBlock(block); synchronized(this){ Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId); return m != null ? m.get(block) : null; } }
ActiveFile removeOngoingCreates(int namespaceId, Block block) { checkBlock(block); synchronized(this){ Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId); return m != null ? m.remove(block) : null; } }
ActiveFile addOngoingCreates(int namespaceId, Block block, ActiveFile af) { checkBlock(block); synchronized(this){ Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId); return m.put(block, af); } }
/** * If there is an ActiveFile object for the block, create a copy of the * old one and replace the old one. This is to make sure that the VisibleLength * applied to the old object will have no impact to the local map. In * that way, BlockReceiver can directly update visible length without * holding the lock. * * @param namespaceId * @param block * @throws CloneNotSupportedException */ void copyOngoingCreates(int namespaceId, Block block) throws CloneNotSupportedException { checkBlock(block); synchronized(this){ Map<Block, ActiveFile> m = ongoingCreates.get(namespaceId); ActiveFile af = m.get(block); if (af == null) { return; } m.put(block, af.getClone()); } }
ActiveFile getOngoingCreates(Block block) { return ongoingCreates.get(block); }
ActiveFile removeOngoingCreates(Block block) { return ongoingCreates.remove(block); }
ActiveFile addOngoingCreates(Block block, ActiveFile af) { return ongoingCreates.put(block, af); }
private void fakeBeingCreated(Block b) { ongoingCreates.put(b, new ActiveFile(blockFile(b), new ArrayList<Thread>())); }
ActiveFile(File f, List<Thread> list) throws IOException { this(f, list, UNKNOWN_SIZE); }
public ActiveFile getClone() throws CloneNotSupportedException { return (ActiveFile) super.clone(); }
@Override public BlockRecoveryInfo startBlockRecovery(int namespaceId, long blockId) throws IOException { Block stored = getStoredBlock(namespaceId, blockId, true); if (stored == null) { return null; } // It's important that this loop not be synchronized - otherwise // this will deadlock against the thread it's joining against! while (true) { DataNode.LOG.debug( "Interrupting active writer threads for block " + stored); List<Thread> activeThreads = getActiveThreads(namespaceId, stored); if (activeThreads == null) break; if (interruptAndJoinThreads(activeThreads)) break; } lock.readLock().lock(); try { // now that writers are stopped, re-fetch the block's meta info stored = getStoredBlock(namespaceId, blockId, true); if (stored == null) { return null; } ActiveFile activeFile = volumeMap.getOngoingCreates(namespaceId, stored); boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup; BlockRecoveryInfo info = new BlockRecoveryInfo(stored, isRecovery); if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored + " length " + stored.getNumBytes() + " genstamp " + stored.getGenerationStamp()); } // paranoia! verify that the contents of the stored block // matches the block file on disk. validateBlockMetadata(namespaceId, stored); return info; } finally { lock.readLock().unlock(); } }
VolumeMap(int numNamespaces) { namespaceMap = new HashMap<Integer, Map<Block, DatanodeBlockInfo>>(numNamespaces); ongoingCreates = new HashMap<Integer, Map<Block, ActiveFile>>(numNamespaces); }
/** * If there is an ActiveFile object for the block, create a copy of the * old one and replace the old one. This is to make sure that the VisibleLength * applied to the old object will have no impact to the local map. In * that way, BlockReceiver can directly update visible length without * holding the lock. * * @param block * @throws CloneNotSupportedException */ void copyOngoingCreates(Block block) throws CloneNotSupportedException { ActiveFile af = ongoingCreates.get(block); if (af == null) { return; } ongoingCreates.put(block, af.getClone()); }
/** * Adds a file to the ongoingCreates datastructure to indicate we are creating * a file. * * @param dstNamespaceId * namespace id for dstBlock * @param dstBlock * the block that we are going to create * @param dstVol * the volume on which the file is to be created. * @return the temporary file for the block * @throws IOException */ private File addToOngoingCreates(int dstNamespaceId, Block dstBlock, FSVolume dstVol) throws IOException { List<Thread> threads = null; // We do not want to create a BBW, hence treat this as a replication // request. File dstBlockFile = createTmpFile(dstNamespaceId, dstVol, dstBlock, true); volumeMap.addOngoingCreates(dstNamespaceId, dstBlock, new ActiveFile(dstBlockFile, threads)); return dstBlockFile; }
/** * Create an ActiveFile from a file on disk during DataNode startup. * This factory method is just to make it clear when the purpose * of this constructor is. * @throws IOException */ public static ActiveFile createStartupRecoveryFile(File f) throws IOException { return new ActiveFile(f, true, UNKNOWN_SIZE); }