Java 类org.apache.hadoop.io.file.tfile.TFile 实例源码
项目:hadoop
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:aliyun-oss-hadoop-fs
文件:AggregatedLogFormat.java
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = null;
try {
ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
} finally {
IOUtils.cleanup(LOG, ownerScanner);
}
}
项目:aliyun-oss-hadoop-fs
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:big-c
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:hadoop-plus
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:Megh
文件:HadoopFilePerformanceTest.java
public void writeTFile(Path file, String cname) throws Exception
{
FSDataOutputStream fos = hdfs.create(file);
TFile.Writer writer =
new TFile.Writer(fos, blockSize, cname, "jclass:" +
BytesWritable.Comparator.class.getName(), new Configuration());
for (int i = 0; i < testSize; i++) {
String k = getKey(i);
String v = getValue();
writer.append(k.getBytes(), v.getBytes());
}
writer.close();
fos.close();
}
项目:Megh
文件:HadoopFilePerformanceTest.java
@Test
public void testTFileWrite() throws Exception
{
Path file = Testfile.TFILE.filepath();
logger.info("Writing {} with {} key/value pairs", file, String.format("%,d", testSize));
startTimer();
writeTFile(file, TFile.COMPRESSION_NONE);
logger.info("Duration: {}", stopTimer(Testfile.TFILE, "WRITE"));
Assert.assertTrue(hdfs.exists(file));
ContentSummary fileInfo = hdfs.getContentSummary(file);
logger.debug("Space consumed: {} bytes in {} files",
String.format("%,d", fileInfo.getSpaceConsumed()),
String.format("%,d", fileInfo.getFileCount()));
}
项目:Megh
文件:HadoopFilePerformanceTest.java
@Test
public void testTFileWriteGZ() throws Exception
{
Path file = Testfile.TFILE_GZ.filepath();
logger.info("Writing {} with {} key/value pairs", file, String.format("%,d", testSize));
startTimer();
writeTFile(file, TFile.COMPRESSION_GZ);
logger.info("Duration: {}", stopTimer(Testfile.TFILE_GZ, "WRITE"));
Assert.assertTrue(hdfs.exists(file));
ContentSummary fileInfo = hdfs.getContentSummary(file);
logger.debug("Space consumed: {} bytes in {} files",
String.format("%,d", fileInfo.getSpaceConsumed()),
String.format("%,d", fileInfo.getFileCount()));
}
项目:Megh
文件:HadoopFilePerformanceTest.java
@Test
public void testTFileRead() throws Exception
{
Path file = Testfile.TFILE.filepath();
logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
writeTFile(file, TFile.COMPRESSION_NONE);
startTimer();
readTFileSeq(file);
logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE, "READ-SEQ"));
startTimer();
readTFileSeqId(file);
logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE, "READ-SEQ-ID"));
startTimer();
readTFileRandom(file);
logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.TFILE, "READ-RAND"));
}
项目:Megh
文件:HadoopFilePerformanceTest.java
@Test
public void testTFileReadGZ() throws Exception
{
Path file = Testfile.TFILE_GZ.filepath();
logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
writeTFile(file, TFile.COMPRESSION_GZ);
startTimer();
readTFileSeq(file);
logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-SEQ"));
startTimer();
readTFileSeqId(file);
logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-SEQ-ID"));
startTimer();
readTFileRandom(file);
logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.TFILE_GZ, "READ-RAND"));
}
项目:Megh
文件:HadoopFilePerformanceTest.java
private void readTFileRandom(Path file) throws IOException
{
Random random = new Random();
FSDataInputStream in = hdfs.open(file);
long size = hdfs.getContentSummary(file).getLength();
TFile.Reader reader = new TFile.Reader(in, size, new Configuration());
Scanner scanner = reader.createScanner();
scanner.rewind();
for (int i = 0; i < testSize; i++) {
// scanner.rewind();
scanner.seekTo(getKey(random.nextInt(testSize)).getBytes());
// Entry en = scanner.entry();
// en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()]));
}
reader.close();
}
项目:Megh
文件:HadoopFilePerformanceTest.java
private void readTFileSeqId(Path file) throws IOException
{
FSDataInputStream in = hdfs.open(file);
long size = hdfs.getContentSummary(file).getLength();
TFile.Reader reader = new TFile.Reader(in, size, new Configuration());
Scanner scanner = reader.createScanner();
scanner.rewind();
for (int i = 0; i < testSize; i++) {
scanner.seekTo(getKey(i).getBytes());
Entry en = scanner.entry();
en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()]));
}
reader.close();
}
项目:Megh
文件:HadoopFilePerformanceTest.java
private void readTFileSeq(Path file) throws IOException
{
FSDataInputStream in = hdfs.open(file);
long size = hdfs.getContentSummary(file).getLength();
TFile.Reader reader = new TFile.Reader(in, size, new Configuration());
Scanner scanner = reader.createScanner();
scanner.rewind();
do {
Entry en = scanner.entry();
en.get(new BytesWritable(new byte[en.getKeyLength()]), new BytesWritable(new byte[en.getValueLength()]));
} while (scanner.advance() && !scanner.atEnd());
reader.close();
}
项目:Megh
文件:HadoopFilePerformanceTest.java
@Test
public void testDTFileRead() throws Exception
{
Path file = Testfile.DTFILE.filepath();
logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
writeTFile(file, TFile.COMPRESSION_NONE);
startTimer();
readDTFileSeq(file);
logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE, "READ-SEQ"));
startTimer();
readDTFileSeq(file);
logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE, "READ-SEQ-ID"));
startTimer();
readDTFileRandom(file);
logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.DTFILE, "READ-RAND"));
}
项目:Megh
文件:HadoopFilePerformanceTest.java
@Test
public void testDTFileReadGZ() throws Exception
{
Path file = Testfile.DTFILE_GZ.filepath();
logger.info("Reading {} with {} key/value pairs", file, String.format("%,d", testSize));
writeTFile(file, TFile.COMPRESSION_GZ);
startTimer();
readDTFileSeq(file);
logger.info("Duration for scanner.next() SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-SEQ"));
startTimer();
readDTFileSeqId(file);
logger.info("Duration for scanner.seekTo(key) SEQUENTIAL keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-SEQ-ID"));
startTimer();
readDTFileRandom(file);
logger.info("Duration for scanner.seekTo(key) RANDOM keys: {}", stopTimer(Testfile.DTFILE_GZ, "READ-RAND"));
}
项目:hopsworks
文件:LogReader.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:hops
文件:AggregatedLogFormat.java
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = null;
try {
ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
} finally {
IOUtils.cleanup(LOG, ownerScanner);
}
}
项目:hops
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:hops
文件:FileSystemApplicationHistoryStore.java
public HistoryFileWriter(Path historyFile) throws IOException {
if (fs.exists(historyFile)) {
fsdos = fs.append(historyFile);
} else {
fsdos = fs.create(historyFile);
}
try {
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
writer =
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
getConfig());
} catch (IOException e) {
IOUtils.cleanup(LOG, fsdos);
throw e;
}
}
项目:hadoop-TCP
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:hardfs
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:hadoop-on-lustre2
文件:AggregatedLogFormat.java
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
项目:reef
文件:TFileParser.java
/**
* @param path
* @return
* @throws IOException
*/
private TFile.Reader.Scanner getScanner(final Path path) throws IOException {
LOG.log(Level.FINE, "Creating Scanner for path {0}", path);
final TFile.Reader reader = new TFile.Reader(this.fileSystem.open(path),
this.fileSystem.getFileStatus(path).getLen(),
this.configuration);
final TFile.Reader.Scanner scanner = reader.createScanner();
for (int counter = 0;
counter < 3 && !scanner.atEnd();
counter += 1) {
//skip VERSION, APPLICATION_ACL, and APPLICATION_OWNER
scanner.advance();
}
LOG.log(Level.FINE, "Created Scanner for path {0}", path);
return scanner;
}
项目:tez
文件:TFileRecordReader.java
private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException {
entry.getKey(keyBytesWritable);
//splitpath contains the machine name. Create the key as splitPath + realKey
String keyStr = new StringBuilder()
.append(splitPath.getName()).append(":")
.append(new String(keyBytesWritable.getBytes()))
.toString();
/**
* In certain cases, values can be huge (files > 2 GB). Stream is
* better to handle such scenarios.
*/
currentValueReader = new BufferedReader(
new InputStreamReader(entry.getValueStream()));
key.set(keyStr);
String line = currentValueReader.readLine();
value.set((line == null) ? "" : line);
}
项目:hadoop
文件:AggregatedLogFormat.java
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
项目:hadoop
文件:AggregatedLogFormat.java
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
项目:hadoop
文件:AggregatedLogFormat.java
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
}
项目:hadoop
文件:FileSystemApplicationHistoryStore.java
public HistoryFileReader(Path historyFile) throws IOException {
fsdis = fs.open(historyFile);
reader =
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
getConfig());
reset();
}
项目:hadoop
文件:FileSystemApplicationHistoryStore.java
public Entry next() throws IOException {
TFile.Reader.Scanner.Entry entry = scanner.entry();
DataInputStream dis = entry.getKeyStream();
HistoryDataKey key = new HistoryDataKey();
key.readFields(dis);
dis = entry.getValueStream();
byte[] value = new byte[entry.getValueLength()];
dis.read(value);
scanner.advance();
return new Entry(key, value);
}
项目:hadoop
文件:FileSystemApplicationHistoryStore.java
public HistoryFileWriter(Path historyFile) throws IOException {
if (fs.exists(historyFile)) {
fsdos = fs.append(historyFile);
} else {
fsdos = fs.create(historyFile);
}
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
writer =
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
getConfig());
}
项目:aliyun-oss-hadoop-fs
文件:AggregatedLogFormat.java
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
项目:aliyun-oss-hadoop-fs
文件:AggregatedLogFormat.java
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
项目:aliyun-oss-hadoop-fs
文件:FileSystemApplicationHistoryStore.java
public HistoryFileReader(Path historyFile) throws IOException {
fsdis = fs.open(historyFile);
reader =
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
getConfig());
reset();
}
项目:aliyun-oss-hadoop-fs
文件:FileSystemApplicationHistoryStore.java
public Entry next() throws IOException {
TFile.Reader.Scanner.Entry entry = scanner.entry();
DataInputStream dis = entry.getKeyStream();
HistoryDataKey key = new HistoryDataKey();
key.readFields(dis);
dis = entry.getValueStream();
byte[] value = new byte[entry.getValueLength()];
dis.read(value);
scanner.advance();
return new Entry(key, value);
}
项目:aliyun-oss-hadoop-fs
文件:FileSystemApplicationHistoryStore.java
public HistoryFileWriter(Path historyFile) throws IOException {
if (fs.exists(historyFile)) {
fsdos = fs.append(historyFile);
} else {
fsdos = fs.create(historyFile);
}
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
writer =
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
getConfig());
}
项目:big-c
文件:AggregatedLogFormat.java
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
fc = FileContext.getFileContext(conf);
fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
项目:big-c
文件:AggregatedLogFormat.java
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext = FileContext.getFileContext(conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
项目:big-c
文件:AggregatedLogFormat.java
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
}
项目:big-c
文件:FileSystemApplicationHistoryStore.java
public HistoryFileReader(Path historyFile) throws IOException {
fsdis = fs.open(historyFile);
reader =
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
getConfig());
reset();
}