/** * Read the next key and return the value-stream. * * @param key * @return the valueStream if there are more keys or null otherwise. * @throws IOException */ public DataInputStream next(LogKey key) throws IOException { if (!this.atBeginning) { this.scanner.advance(); } else { this.atBeginning = false; } if (this.scanner.atEnd()) { return null; } TFile.Reader.Scanner.Entry entry = this.scanner.entry(); key.readFields(entry.getKeyStream()); // Skip META keys if (RESERVED_KEYS.containsKey(key.toString())) { return next(key); } DataInputStream valueStream = entry.getValueStream(); return valueStream; }
/** * Returns the owner of the application. * * @return the application owner. * @throws IOException */ public String getApplicationOwner() throws IOException { TFile.Reader.Scanner ownerScanner = null; try { ownerScanner = reader.createScanner(); LogKey key = new LogKey(); while (!ownerScanner.atEnd()) { TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); key.readFields(entry.getKeyStream()); if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { DataInputStream valueStream = entry.getValueStream(); return valueStream.readUTF(); } ownerScanner.advance(); } return null; } finally { IOUtils.cleanup(LOG, ownerScanner); } }
public void writeTFile(Path file, String cname) throws Exception { FSDataOutputStream fos = hdfs.create(file); TFile.Writer writer = new TFile.Writer(fos, blockSize, cname, "jclass:" + BytesWritable.Comparator.class.getName(), new Configuration()); for (int i = 0; i < testSize; i++) { String k = getKey(i); String v = getValue(); writer.append(k.getBytes(), v.getBytes()); } writer.close(); fos.close(); }
@Test public void testTFileWrite() throws Exception { Path file = Testfile.TFILE.filepath(); logger.info("Writing {} with {} key/value pairs", file, String.format("%,d", testSize)); startTimer(); writeTFile(file, TFile.COMPRESSION_NONE); logger.info("Duration: {}", stopTimer(Testfile.TFILE, "WRITE")); Assert.assertTrue(hdfs.exists(file)); ContentSummary fileInfo = hdfs.getContentSummary(file); logger.debug("Space consumed: {} bytes in {} files", String.format("%,d", fileInfo.getSpaceConsumed()), String.format("%,d", fileInfo.getFileCount())); }
@Test public void testTFileWriteGZ() throws Exception { Path file = Testfile.TFILE_GZ.filepath(); logger.info("Writing {} with {} key/value pairs", file, String.format("%,d", testSize)); startTimer(); writeTFile(file, TFile.COMPRESSION_GZ); logger.info("Duration: {}", stopTimer(Testfile.TFILE_GZ, "WRITE")); Assert.assertTrue(hdfs.exists(file)); ContentSummary fileInfo = hdfs.getContentSummary(file); logger.debug("Space consumed: {} bytes in {} files", String.format("%,d", fileInfo.getSpaceConsumed()), String.format("%,d", fileInfo.getFileCount())); }
@Test public void testTFileRead() throws Exception { Path file = Testfile.TFILE.filepath(); logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize)); writeTFile(file, TFile.COMPRESSION_NONE); startTimer(); readTFileSeq(file); logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE, "READ-SEQ")); startTimer(); readTFileSeqId(file); logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE, "READ-SEQ-ID")); startTimer(); readTFileRandom(file); logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.TFILE, "READ-RAND")); }
@Test public void testTFileReadGZ() throws Exception { Path file = Testfile.TFILE_GZ.filepath(); logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize)); writeTFile(file, TFile.COMPRESSION_GZ); startTimer(); readTFileSeq(file); logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-SEQ")); startTimer(); readTFileSeqId(file); logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-SEQ-ID")); startTimer(); readTFileRandom(file); logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-RAND")); }
private void readTFileRandom(Path file) throws IOException { Random random = new Random(); FSDataInputStream in = hdfs.open(file); long size = hdfs.getContentSummary(file).getLength(); TFile.Reader reader = new TFile.Reader(in, size, new Configuration()); Scanner scanner = reader.createScanner(); scanner.rewind(); for (int i = 0; i < testSize; i++) { // scanner.rewind(); scanner.seekTo(getKey(random.nextInt(testSize)).getBytes()); // Entry en = scanner.entry(); // en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()])); } reader.close(); }
private void readTFileSeqId(Path file) throws IOException { FSDataInputStream in = hdfs.open(file); long size = hdfs.getContentSummary(file).getLength(); TFile.Reader reader = new TFile.Reader(in, size, new Configuration()); Scanner scanner = reader.createScanner(); scanner.rewind(); for (int i = 0; i < testSize; i++) { scanner.seekTo(getKey(i).getBytes()); Entry en = scanner.entry(); en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()])); } reader.close(); }
private void readTFileSeq(Path file) throws IOException { FSDataInputStream in = hdfs.open(file); long size = hdfs.getContentSummary(file).getLength(); TFile.Reader reader = new TFile.Reader(in, size, new Configuration()); Scanner scanner = reader.createScanner(); scanner.rewind(); do { Entry en = scanner.entry(); en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()])); } while (scanner.advance() && !scanner.atEnd()); reader.close(); }
@Test public void testDTFileRead() throws Exception { Path file = Testfile.DTFILE.filepath(); logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize)); writeTFile(file, TFile.COMPRESSION_NONE); startTimer(); readDTFileSeq(file); logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE, "READ-SEQ")); startTimer(); readDTFileSeq(file); logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE, "READ-SEQ-ID")); startTimer(); readDTFileRandom(file); logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.DTFILE, "READ-RAND")); }
@Test public void testDTFileReadGZ() throws Exception { Path file = Testfile.DTFILE_GZ.filepath(); logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize)); writeTFile(file, TFile.COMPRESSION_GZ); startTimer(); readDTFileSeq(file); logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-SEQ")); startTimer(); readDTFileSeqId(file); logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-SEQ-ID")); startTimer(); readDTFileRandom(file); logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-RAND")); }
public HistoryFileWriter(Path historyFile) throws IOException { if (fs.exists(historyFile)) { fsdos = fs.append(historyFile); } else { fsdos = fs.create(historyFile); } try { fs.setPermission(historyFile, HISTORY_FILE_UMASK); writer = new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get( YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null, getConfig()); } catch (IOException e) { IOUtils.cleanup(LOG, fsdos); throw e; } }
/** * @param path * @return * @throws IOException */ private TFile.Reader.Scanner getScanner(final Path path) throws IOException { LOG.log(Level.FINE, "Creating Scanner for path {0}", path); final TFile.Reader reader = new TFile.Reader(this.fileSystem.open(path), this.fileSystem.getFileStatus(path).getLen(), this.configuration); final TFile.Reader.Scanner scanner = reader.createScanner(); for (int counter = 0; counter < 3 && !scanner.atEnd(); counter += 1) { //skip VERSION, APPLICATION_ACL, and APPLICATION_OWNER scanner.advance(); } LOG.log(Level.FINE, "Created Scanner for path {0}", path); return scanner; }
private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException { entry.getKey(keyBytesWritable); //splitpath contains the machine name. Create the key as splitPath + realKey String keyStr = new StringBuilder() .append(splitPath.getName()).append(":") .append(new String(keyBytesWritable.getBytes())) .toString(); /** * In certain cases, values can be huge (files > 2 GB). Stream is * better to handle such scenarios. */ currentValueReader = new BufferedReader( new InputStreamReader(entry.getValueStream())); key.set(keyStr); String line = currentValueReader.readLine(); value.set((line == null) ? "" : line); }
public LogWriter(final Configuration conf, final Path remoteAppLogFile, UserGroupInformation userUgi) throws IOException { try { this.fsDataOStream = userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { @Override public FSDataOutputStream run() throws Exception { fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); fc.setUMask(APP_LOG_FILE_UMASK); return fc.create( remoteAppLogFile, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[] {}); } }); } catch (InterruptedException e) { throw new IOException(e); } // Keys are not sorted: null arg // 256KB minBlockSize : Expected log size for each container too this.writer = new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); //Write the version string writeVersion(); }
public LogReader(Configuration conf, Path remoteAppLogFile) throws IOException { FileContext fileContext = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); this.fsDataIStream = fileContext.open(remoteAppLogFile); reader = new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( remoteAppLogFile).getLen(), conf); this.scanner = reader.createScanner(); }
/** * Returns the owner of the application. * * @return the application owner. * @throws IOException */ public String getApplicationOwner() throws IOException { TFile.Reader.Scanner ownerScanner = reader.createScanner(); LogKey key = new LogKey(); while (!ownerScanner.atEnd()) { TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); key.readFields(entry.getKeyStream()); if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { DataInputStream valueStream = entry.getValueStream(); return valueStream.readUTF(); } ownerScanner.advance(); } return null; }
public HistoryFileReader(Path historyFile) throws IOException { fsdis = fs.open(historyFile); reader = new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(), getConfig()); reset(); }
public Entry next() throws IOException { TFile.Reader.Scanner.Entry entry = scanner.entry(); DataInputStream dis = entry.getKeyStream(); HistoryDataKey key = new HistoryDataKey(); key.readFields(dis); dis = entry.getValueStream(); byte[] value = new byte[entry.getValueLength()]; dis.read(value); scanner.advance(); return new Entry(key, value); }
public HistoryFileWriter(Path historyFile) throws IOException { if (fs.exists(historyFile)) { fsdos = fs.append(historyFile); } else { fsdos = fs.create(historyFile); } fs.setPermission(historyFile, HISTORY_FILE_UMASK); writer = new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get( YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null, getConfig()); }
public LogWriter(final Configuration conf, final Path remoteAppLogFile, UserGroupInformation userUgi) throws IOException { try { this.fsDataOStream = userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { @Override public FSDataOutputStream run() throws Exception { fc = FileContext.getFileContext(conf); fc.setUMask(APP_LOG_FILE_UMASK); return fc.create( remoteAppLogFile, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[] {}); } }); } catch (InterruptedException e) { throw new IOException(e); } // Keys are not sorted: null arg // 256KB minBlockSize : Expected log size for each container too this.writer = new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); //Write the version string writeVersion(); }
public LogReader(Configuration conf, Path remoteAppLogFile) throws IOException { FileContext fileContext = FileContext.getFileContext(conf); this.fsDataIStream = fileContext.open(remoteAppLogFile); reader = new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( remoteAppLogFile).getLen(), conf); this.scanner = reader.createScanner(); }