/** * Convert a CachedBlockList into a DatanodeCommand with a list of blocks. * * @param list The {@link CachedBlocksList}. This function * clears the list. * @param datanode The datanode. * @param action The action to perform in the command. * @param poolId The block pool id. * @return A DatanodeCommand to be sent back to the DN, or null if * there is nothing to be done. */ private DatanodeCommand getCacheCommand(CachedBlocksList list, DatanodeDescriptor datanode, int action, String poolId) { int length = list.size(); if (length == 0) { return null; } // Read the existing cache commands. long[] blockIds = new long[length]; int i = 0; for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) { CachedBlock cachedBlock = iter.next(); blockIds[i++] = cachedBlock.getBlockId(); } return new BlockIdCommand(action, poolId, blockIds); }
private void processCacheReportImpl(final DatanodeDescriptor datanode, final List<Long> blockIds) { CachedBlocksList cached = datanode.getCached(); cached.clear(); CachedBlocksList cachedList = datanode.getCached(); CachedBlocksList pendingCachedList = datanode.getPendingCached(); for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { long blockId = iter.next(); LOG.trace("Cache report from datanode {} has block {}", datanode, blockId); CachedBlock cachedBlock = new CachedBlock(blockId, (short)0, false); CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); // Add the block ID from the cache report to the cachedBlocks map // if it's not already there. if (prevCachedBlock != null) { cachedBlock = prevCachedBlock; } else { cachedBlocks.put(cachedBlock); LOG.trace("Added block {} to cachedBlocks", cachedBlock); } // Add the block to the datanode's implicit cached block list // if it's not already there. Similarly, remove it from the pending // cached block list if it exists there. if (!cachedBlock.isPresent(cachedList)) { cachedList.add(cachedBlock); LOG.trace("Added block {} to CACHED list.", cachedBlock); } if (cachedBlock.isPresent(pendingCachedList)) { pendingCachedList.remove(cachedBlock); LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock); } } }
/** * Return true if this CachedBlock is present on the given list. */ public boolean isPresent(CachedBlocksList cachedBlocksList) { for (int i = 0; i < triplets.length; i += 3) { CachedBlocksList list = (CachedBlocksList)triplets[i]; if (list == cachedBlocksList) { return true; } } return false; }
private void testAddElementsToList(CachedBlocksList list, CachedBlock[] blocks) { Assert.assertTrue("expected list to start off empty.", !list.iterator().hasNext()); for (CachedBlock block : blocks) { Assert.assertTrue(list.add(block)); } }
private void testRemoveElementsFromList(Random r, CachedBlocksList list, CachedBlock[] blocks) { int i = 0; for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) { Assert.assertEquals(blocks[i], iter.next()); i++; } if (r.nextBoolean()) { LOG.info("Removing via iterator"); for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext() ;) { iter.next(); iter.remove(); } } else { LOG.info("Removing in pseudo-random order"); CachedBlock[] remainingBlocks = Arrays.copyOf(blocks, blocks.length); for (int removed = 0; removed < remainingBlocks.length; ) { int toRemove = r.nextInt(remainingBlocks.length); if (remainingBlocks[toRemove] != null) { Assert.assertTrue(list.remove(remainingBlocks[toRemove])); remainingBlocks[toRemove] = null; removed++; } } } Assert.assertTrue("expected list to be empty after everything " + "was removed.", !list.iterator().hasNext()); }
/** * Convert a CachedBlockList into a DatanodeCommand with a list of blocks. * * @param list The {@link CachedBlocksList}. This function * clears the list. * @param action The action to perform in the command. * @param poolId The block pool id. * @return A DatanodeCommand to be sent back to the DN, or null if * there is nothing to be done. */ private DatanodeCommand getCacheCommand(CachedBlocksList list, int action, String poolId) { int length = list.size(); if (length == 0) { return null; } // Read the existing cache commands. long[] blockIds = new long[length]; int i = 0; for (CachedBlock cachedBlock : list) { blockIds[i++] = cachedBlock.getBlockId(); } return new BlockIdCommand(action, poolId, blockIds); }
private void processCacheReportImpl(final DatanodeDescriptor datanode, final List<Long> blockIds) { CachedBlocksList cached = datanode.getCached(); cached.clear(); CachedBlocksList cachedList = datanode.getCached(); CachedBlocksList pendingCachedList = datanode.getPendingCached(); for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { long blockId = iter.next(); CachedBlock cachedBlock = new CachedBlock(blockId, (short)0, false); CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); // Add the block ID from the cache report to the cachedBlocks map // if it's not already there. if (prevCachedBlock != null) { cachedBlock = prevCachedBlock; } else { cachedBlocks.put(cachedBlock); } // Add the block to the datanode's implicit cached block list // if it's not already there. Similarly, remove it from the pending // cached block list if it exists there. if (!cachedBlock.isPresent(cachedList)) { cachedList.add(cachedBlock); } if (cachedBlock.isPresent(pendingCachedList)) { pendingCachedList.remove(cachedBlock); } } }
/** * Get a list of the datanodes which this block is cached, * planned to be cached, or planned to be uncached on. * * @param type If null, this parameter is ignored. * If it is non-null, we match only datanodes which * have it on this list. * See {@link DatanodeDescriptor.CachedBlocksList.Type} * for a description of all the lists. * * @return The list of datanodes. Modifying this list does not * alter the state of the CachedBlock. */ public List<DatanodeDescriptor> getDatanodes(Type type) { List<DatanodeDescriptor> nodes = new LinkedList<DatanodeDescriptor>(); for (int i = 0; i < triplets.length; i += 3) { CachedBlocksList list = (CachedBlocksList)triplets[i]; if ((type == null) || (list.getType() == type)) { nodes.add(list.getDatanode()); } } return nodes; }
/** * Get a list of the datanodes which this block is cached, * planned to be cached, or planned to be uncached on. * * @param type If null, this parameter is ignored. * If it is non-null, we match only datanodes which * have it on this list. * See {@link DatanodeDescriptor#CachedBlocksList#Type} * for a description of all the lists. * * @return The list of datanodes. Modifying this list does not * alter the state of the CachedBlock. */ public List<DatanodeDescriptor> getDatanodes(Type type) { List<DatanodeDescriptor> nodes = new LinkedList<DatanodeDescriptor>(); for (int i = 0; i < triplets.length; i += 3) { CachedBlocksList list = (CachedBlocksList)triplets[i]; if ((type == null) || (list.getType() == type)) { nodes.add(list.getDatanode()); } } return nodes; }