/** * Configure the reducer: open the _index and _masterindex files for writing */ public void configure(JobConf conf) { this.conf = conf; tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf); masterIndex = new Path(tmpOutputDir, HarFileSystem.MASTER_INDEX_NAME); index = new Path(tmpOutputDir, HarFileSystem.INDEX_NAME); try { fs = masterIndex.getFileSystem(conf); if (fs.exists(masterIndex)) { fs.delete(masterIndex, false); } if (fs.exists(index)) { fs.delete(index, false); } indexStream = fs.create(index); outStream = fs.create(masterIndex); String version = VERSION + " \n"; outStream.write(version.getBytes()); } catch(IOException e) { throw new RuntimeException(e); } }
/** * Configure the reducer: open the _index and _masterindex files for writing */ public void configure(JobConf conf) { this.conf = conf; outputDir = FileOutputFormat.getWorkOutputPath(this.conf); masterIndex = new Path(outputDir, HarFileSystem.MASTER_INDEX_NAME); index = new Path(outputDir, HarFileSystem.INDEX_NAME); try { fs = masterIndex.getFileSystem(conf); if (fs.exists(masterIndex)) { fs.delete(masterIndex, false); } if (fs.exists(index)) { fs.delete(index, false); } indexStream = fs.create(index); outStream = fs.create(masterIndex); String version = VERSION + " \n"; outStream.write(version.getBytes()); } catch(IOException e) { throw new RuntimeException(e); } }
public void configure(JobConf conf) { this.conf = conf; tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf); masterIndex = new Path(tmpOutputDir, "_masterindex"); index = new Path(tmpOutputDir, "_index"); try { fs = masterIndex.getFileSystem(conf); if (fs.exists(masterIndex)) { fs.delete(masterIndex, false); } if (fs.exists(index)) { fs.delete(index, false); } indexStream = fs.create(index); outStream = fs.create(masterIndex); String version = HarFileSystem.VERSION + " \n"; outStream.write(version.getBytes()); } catch(IOException e) { throw new RuntimeException(e); } }
/** * Create an empty Har archive in the FileSystem fs at the Path p. * * @param fs the file system to create the Har archive in * @param p the path to create the Har archive at * @throws IOException in the event of error */ private static void createEmptyHarArchive(FileSystem fs, Path p) throws IOException { fs.mkdirs(p); OutputStream out = fs.create(new Path(p, "_masterindex")); out.write(Integer.toString(HarFileSystem.VERSION).getBytes()); out.close(); fs.create(new Path(p, "_index")).close(); }
public void map(LongWritable key, HarEntry value, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { Path relPath = new Path(value.path); int hash = HarFileSystem.getHarHash(relPath); String towrite = null; Path srcPath = realPath(relPath, rootPath); long startPos = partStream.getPos(); FileSystem srcFs = srcPath.getFileSystem(conf); FileStatus srcStatus = srcFs.getFileStatus(srcPath); String propStr = encodeProperties(srcStatus); if (value.isDir()) { towrite = encodeName(relPath.toString()) + " dir " + propStr + " 0 0 "; StringBuffer sbuff = new StringBuffer(); sbuff.append(towrite); for (String child: value.children) { sbuff.append(encodeName(child) + " "); } towrite = sbuff.toString(); //reading directories is also progress reporter.progress(); } else { FSDataInputStream input = srcFs.open(srcStatus.getPath()); reporter.setStatus("Copying file " + srcStatus.getPath() + " to archive."); copyData(srcStatus.getPath(), input, partStream, reporter); towrite = encodeName(relPath.toString()) + " file " + partname + " " + startPos + " " + srcStatus.getLen() + " " + propStr + " "; } out.collect(new IntWritable(hash), new Text(towrite)); }
@Test /* * Tests copying from archive file system to a local file system */ public void testCopyToLocal() throws Exception { final String fullHarPathStr = makeArchive(); // make path to copy the file to: final String tmpDir = System.getProperty("test.build.data","build/test/data") + "/work-dir/har-fs-tmp"; final Path tmpPath = new Path(tmpDir); final LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); localFs.delete(tmpPath, true); localFs.mkdirs(tmpPath); assertTrue(localFs.exists(tmpPath)); // Create fresh HarFs: final HarFileSystem harFileSystem = new HarFileSystem(fs); try { final URI harUri = new URI(fullHarPathStr); harFileSystem.initialize(harUri, fs.getConf()); final Path sourcePath = new Path(fullHarPathStr + Path.SEPARATOR + "a"); final Path targetPath = new Path(tmpPath, "straus"); // copy the Har file to a local file system: harFileSystem.copyToLocalFile(false, sourcePath, targetPath); FileStatus straus = localFs.getFileStatus(targetPath); // the file should contain just 1 character: assertEquals(1, straus.getLen()); } finally { harFileSystem.close(); localFs.delete(tmpPath, true); } }
public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { String line = value.toString(); MapStat mstat = new MapStat(line); Path relPath = new Path(mstat.pathname); int hash = HarFileSystem.getHarHash(relPath); String towrite = null; Path srcPath = realPath(relPath, rootPath); long startPos = partStream.getPos(); if (mstat.isDir) { towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " "; StringBuffer sbuff = new StringBuffer(); sbuff.append(towrite); for (String child: mstat.children) { sbuff.append(child + " "); } towrite = sbuff.toString(); //reading directories is also progress reporter.progress(); } else { FileSystem srcFs = srcPath.getFileSystem(conf); FileStatus srcStatus = srcFs.getFileStatus(srcPath); FSDataInputStream input = srcFs.open(srcStatus.getPath()); reporter.setStatus("Copying file " + srcStatus.getPath() + " to archive."); copyData(srcStatus.getPath(), input, partStream, reporter); towrite = relPath.toString() + " file " + partname + " " + startPos + " " + srcStatus.getLen() + " "; } out.collect(new IntWritable(hash), new Text(towrite)); }
public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { reporter.setStatus("Passing file " + value + " to archive."); reporter.progress(); HarStatus harStatus = new HarStatus(value.toString()); int hash = HarFileSystem.getHarHash(harStatus.getName()); out.collect(new IntWritable(hash), value); }
public void map(LongWritable key, HarEntry value, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { Path relativePath = new Path(value.path); int hash = HarFileSystem.getHarHash(relativePath.toString()); String towrite = null; Path srcPath = realPath(relativePath, rootPath); long startPos = partStream.getPos(); FileSystem srcFs = srcPath.getFileSystem(conf); HarProperties properties = value.getProperties(); String propStr = properties.serialize(); if (value.isDir()) { towrite = HarFileSystem.encode(relativePath.toString()) + " dir " + propStr + " 0 0 "; StringBuffer sbuff = new StringBuffer(); sbuff.append(towrite); for (String child: value.children) { sbuff.append(HarFileSystem.encode(child) + " "); } towrite = sbuff.toString(); //reading directories is also progress reporter.progress(); } else { FSDataInputStream input = srcFs.open(srcPath); reporter.setStatus("Copying file " + srcPath + " to archive."); copyData(srcPath, input, partStream, reporter); long len = partStream.getPos() - startPos; towrite = HarFileSystem.encode(relativePath.toString()) + " file " + partName + " " + startPos + " " + len + " " + propStr + " "; } out.collect(new IntWritable(hash), new Text(towrite)); }
/** * gets the parity blocks corresponding to file * returns the parity blocks in case of DFS * and the part blocks containing parity blocks * in case of HAR FS */ private static BlockLocation[] getParityBlocks(final Path filePath, final long blockSize, final long numStripes, final RaidInfo raidInfo) throws IOException { FileSystem parityFS = raidInfo.parityPair.getFileSystem(); // get parity file metadata FileStatus parityFileStatus = raidInfo.parityPair.getFileStatus(); long parityFileLength = parityFileStatus.getLen(); if (parityFileLength != numStripes * raidInfo.parityBlocksPerStripe * blockSize) { throw new IOException("expected parity file of length" + (numStripes * raidInfo.parityBlocksPerStripe * blockSize) + " but got parity file of length " + parityFileLength); } BlockLocation[] parityBlocks = parityFS.getFileBlockLocations(parityFileStatus, 0L, parityFileLength); if (parityFS instanceof DistributedFileSystem || parityFS instanceof DistributedRaidFileSystem) { long parityBlockSize = parityFileStatus.getBlockSize(); if (parityBlockSize != blockSize) { throw new IOException("file block size is " + blockSize + " but parity file block size is " + parityBlockSize); } } else if (parityFS instanceof HarFileSystem) { LOG.debug("HAR FS found"); } else { LOG.warn("parity file system is not of a supported type"); } return parityBlocks; }
public void map(LongWritable key, HarEntry value, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { Path relPath = new Path(value.path); int hash = HarFileSystem.getHarHash(relPath); String towrite = null; Path srcPath = realPath(relPath, rootPath); long startPos = partStream.getPos(); FileSystem srcFs = srcPath.getFileSystem(conf); FileStatus srcStatus = srcFs.getFileStatus(srcPath); String propStr = URLEncoder.encode( srcStatus.getModificationTime() + " " + srcStatus.getAccessTime() + " " + srcStatus.getPermission().toShort() + " " + URLEncoder.encode(srcStatus.getOwner(), "UTF-8") + " " + URLEncoder.encode(srcStatus.getGroup(), "UTF-8"), "UTF-8"); if (value.isDir()) { towrite = URLEncoder.encode(relPath.toString(),"UTF-8") + " dir " + propStr + " 0 0 "; StringBuffer sbuff = new StringBuffer(); sbuff.append(towrite); for (String child: value.children) { sbuff.append(URLEncoder.encode(child,"UTF-8") + " "); } towrite = sbuff.toString(); //reading directories is also progress reporter.progress(); } else { FSDataInputStream input = srcFs.open(srcStatus.getPath()); reporter.setStatus("Copying file " + srcStatus.getPath() + " to archive."); copyData(srcStatus.getPath(), input, partStream, reporter); towrite = URLEncoder.encode(relPath.toString(),"UTF-8") + " file " + partname + " " + startPos + " " + srcStatus.getLen() + " " + propStr + " "; } out.collect(new IntWritable(hash), new Text(towrite)); }
public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> out, Reporter reporter) throws IOException { String line = value.toString(); MapStat mstat = new MapStat(line); Path srcPath = new Path(mstat.pathname); String towrite = null; Path relPath = makeRelative(srcPath); int hash = HarFileSystem.getHarHash(relPath); long startPos = partStream.getPos(); if (mstat.isDir) { towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " "; StringBuffer sbuff = new StringBuffer(); sbuff.append(towrite); for (String child: mstat.children) { sbuff.append(child + " "); } towrite = sbuff.toString(); //reading directories is also progress reporter.progress(); } else { FileSystem srcFs = srcPath.getFileSystem(conf); FileStatus srcStatus = srcFs.getFileStatus(srcPath); FSDataInputStream input = srcFs.open(srcStatus.getPath()); reporter.setStatus("Copying file " + srcStatus.getPath() + " to archive."); copyData(srcStatus.getPath(), input, partStream, reporter); towrite = relPath.toString() + " file " + partname + " " + startPos + " " + srcStatus.getLen() + " "; } out.collect(new IntWritable(hash), new Text(towrite)); }