@Override public void storeBlock(Block block, File file) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); byte[] buf = new byte[8192]; int numRead; BufferedInputStream in = null; try { in = new BufferedInputStream(new FileInputStream(file)); while ((numRead = in.read(buf)) >= 0) { out.write(buf, 0, numRead); } } finally { if (in != null) { in.close(); } } blocks.put(block.getId(), out.toByteArray()); }
public static INode deserialize(InputStream in) throws IOException { if (in == null) { return null; } DataInputStream dataIn = new DataInputStream(in); FileType fileType = INode.FILE_TYPES[dataIn.readByte()]; switch (fileType) { case DIRECTORY: in.close(); return INode.DIRECTORY_INODE; case FILE: int numBlocks = dataIn.readInt(); Block[] blocks = new Block[numBlocks]; for (int i = 0; i < numBlocks; i++) { long id = dataIn.readLong(); long length = dataIn.readLong(); blocks[i] = new Block(id, length); } in.close(); return new INode(fileType, blocks); default: throw new IllegalArgumentException("Cannot deserialize inode."); } }
@Override public boolean delete(Path path, boolean recursive) throws IOException { Path absolutePath = makeAbsolute(path); INode inode = store.retrieveINode(absolutePath); if (inode == null) { return false; } if (inode.isFile()) { store.deleteINode(absolutePath); for (Block block: inode.getBlocks()) { store.deleteBlock(block); } } else { FileStatus[] contents = null; try { contents = listStatus(absolutePath); } catch(FileNotFoundException fnfe) { return false; } if ((contents.length !=0) && (!recursive)) { throw new IOException("Directory " + path.toString() + " is not empty."); } for (FileStatus p:contents) { if (!delete(p.getPath(), recursive)) { return false; } } store.deleteINode(absolutePath); } return true; }
private static long findLength(INode inode) { if (!inode.isDirectory()) { long length = 0L; for (Block block : inode.getBlocks()) { length += block.getLength(); } return length; } return 0; }
private synchronized void endBlock() throws IOException { // // Done with local copy // backupStream.close(); // // Send it to S3 // // TODO: Use passed in Progressable to report progress. nextBlockOutputStream(); store.storeBlock(nextBlock, backupFile); Block[] arr = new Block[blocks.size()]; arr = blocks.toArray(arr); store.storeINode(path, new INode(INode.FILE_TYPES[1], arr)); // // Delete local backup, start new one // boolean b = backupFile.delete(); if (!b) { LOG.warn("Ignoring failed delete"); } backupFile = newBackupFile(); backupStream = new FileOutputStream(backupFile); bytesWrittenToBlock = 0; }
private synchronized void nextBlockOutputStream() throws IOException { long blockId = r.nextLong(); while (store.blockExists(blockId)) { blockId = r.nextLong(); } nextBlock = new Block(blockId, bytesWrittenToBlock); blocks.add(nextBlock); bytesWrittenToBlock = 0; }
@Override public File retrieveBlock(Block block, long byteRangeStart) throws IOException { byte[] data = blocks.get(block.getId()); File file = createTempFile(); BufferedOutputStream out = null; try { out = new BufferedOutputStream(new FileOutputStream(file)); out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart); } finally { if (out != null) { out.close(); } } return file; }
public INode(FileType fileType, Block[] blocks) { this.fileType = fileType; if (isDirectory() && blocks != null) { throw new IllegalArgumentException("A directory cannot contain blocks."); } this.blocks = blocks; }
private static long findBlocksize(INode inode) { final Block[] ret = inode.getBlocks(); return ret == null ? 0L : ret[0].getLength(); }
@Override public void deleteBlock(Block block) throws IOException { blocks.remove(block.getId()); }
public Block[] getBlocks() { return blocks; }