Java 类org.apache.hadoop.io.compress.CodecPool 实例源码
项目:hadoop-oss
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
项目:hadoop-oss
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
项目:hadoop-oss
文件:Compression.java
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
项目:hadoop-oss
文件:Compression.java
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
项目:flume-release-1.7.0
文件:HDFSCompressedDataStream.java
@Override
public void close() throws IOException {
serializer.flush();
serializer.beforeClose();
if (!isFinished) {
cmpOut.finish();
isFinished = true;
}
fsOut.flush();
hflushOrSync(fsOut);
cmpOut.close();
if (compressor != null) {
CodecPool.returnCompressor(compressor);
compressor = null;
}
unregisterCurrentStream();
}
项目:hadoop
文件:IFile.java
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
项目:hadoop
文件:IFile.java
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
项目:hadoop
文件:InMemoryMapOutput.java
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}
项目:hadoop
文件:PossiblyDecompressedInputStream.java
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
项目:hadoop
文件:Anonymizer.java
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
项目:hadoop
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
项目:hadoop
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
项目:hadoop
文件:Compression.java
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
项目:hadoop
文件:Compression.java
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
项目:ditb
文件:Compression.java
public Compressor getCompressor() {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using it.
LOG.warn("Compressor obtained from CodecPool is already finished()");
}
compressor.reset();
}
return compressor;
}
return null;
}
项目:ditb
文件:Compression.java
public Decompressor getDecompressor() {
CompressionCodec codec = getCodec(conf);
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor
+ " from pool.");
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using it.
LOG.warn("Deompressor obtained from CodecPool is already finished()");
}
decompressor.reset();
}
return decompressor;
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:IFile.java
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
项目:aliyun-oss-hadoop-fs
文件:IFile.java
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
项目:aliyun-oss-hadoop-fs
文件:PossiblyDecompressedInputStream.java
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
项目:aliyun-oss-hadoop-fs
文件:Anonymizer.java
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
项目:aliyun-oss-hadoop-fs
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
项目:aliyun-oss-hadoop-fs
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
项目:aliyun-oss-hadoop-fs
文件:Compression.java
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Compression.java
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
项目:gemfirexd-oss
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
项目:gemfirexd-oss
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
项目:big-c
文件:IFile.java
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
项目:big-c
文件:IFile.java
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
项目:big-c
文件:InMemoryMapOutput.java
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
super(mapId, (long)size, primaryMapOutput);
this.conf = conf;
this.merger = merger;
this.codec = codec;
byteStream = new BoundedByteArrayOutputStream(size);
memory = byteStream.getBuffer();
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
}
项目:big-c
文件:PossiblyDecompressedInputStream.java
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
项目:big-c
文件:Anonymizer.java
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
项目:big-c
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
keySerializer.close();
uncompressedValSerializer.close();
if (compressedValSerializer != null) {
compressedValSerializer.close();
}
CodecPool.returnCompressor(compressor);
compressor = null;
if (out != null) {
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
out.flush();
}
out = null;
}
}
项目:big-c
文件:SequenceFile.java
/** Close the file. */
@Override
public synchronized void close() throws IOException {
// Return the decompressors to the pool
CodecPool.returnDecompressor(keyLenDecompressor);
CodecPool.returnDecompressor(keyDecompressor);
CodecPool.returnDecompressor(valLenDecompressor);
CodecPool.returnDecompressor(valDecompressor);
keyLenDecompressor = keyDecompressor = null;
valLenDecompressor = valDecompressor = null;
if (keyDeserializer != null) {
keyDeserializer.close();
}
if (valDeserializer != null) {
valDeserializer.close();
}
// Close the input-stream
in.close();
}
项目:big-c
文件:Compression.java
public Compressor getCompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Compressor compressor = CodecPool.getCompressor(codec);
if (compressor != null) {
if (compressor.finished()) {
// Somebody returns the compressor to CodecPool but is still using
// it.
LOG.warn("Compressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a compressor: " + compressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* compressor is referenced after returned back to the codec pool.
*/
compressor.reset();
}
return compressor;
}
return null;
}
项目:big-c
文件:Compression.java
public Decompressor getDecompressor() throws IOException {
CompressionCodec codec = getCodec();
if (codec != null) {
Decompressor decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
if (decompressor.finished()) {
// Somebody returns the decompressor to CodecPool but is still using
// it.
LOG.warn("Deompressor obtained from CodecPool already finished()");
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Got a decompressor: " + decompressor.hashCode());
}
}
/**
* Following statement is necessary to get around bugs in 0.18 where a
* decompressor is referenced after returned back to the codec pool.
*/
decompressor.reset();
}
return decompressor;
}
return null;
}
项目:hadoopoffice
文件:MapReduceExcelOutputIntegrationTest.java
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
项目:hadoopoffice
文件:MapReduceExcelInputIntegrationTest.java
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
项目:hadoopoffice
文件:AbstractSpreadSheetDocumentRecordReader.java
@Override
public synchronized void close() throws IOException {
try {
if (officeReader!=null) {
officeReader.close();
}
} finally {
if (decompressor != null) { // return this decompressor
CodecPool.returnDecompressor(decompressor);
decompressor = null;
} // return decompressor of linked workbooks
if (this.currentHFR!=null) {
currentHFR.close();
}
}
// do not close the filesystem! will cause exceptions in Spark
}
项目:hadoopoffice
文件:AbstractSpreadSheetDocumentRecordReader.java
@Override
public synchronized void close() throws IOException {
try {
if (officeReader!=null) {
officeReader.close();
}
} finally {
if (decompressor != null) { // return this decompressor
CodecPool.returnDecompressor(decompressor);
decompressor = null;
} // return decompressor of linked workbooks
if (this.currentHFR!=null) {
currentHFR.close();
}
}
// do not close the filesystem! will cause exceptions in Spark
}
项目:hadoopoffice
文件:HadoopFileReader.java
public InputStream openFile(Path path) throws IOException {
CompressionCodec codec=compressionCodecs.getCodec(path);
FSDataInputStream fileIn=fs.open(path);
// check if compressed
if (codec==null) { // uncompressed
LOG.debug("Reading from an uncompressed file \""+path+"\"");
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
LOG.debug("Reading from a compressed file \""+path+"\" with splittable compression codec");
long end = fs.getFileStatus(path).getLen();
return ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
} else {
LOG.debug("Reading from a compressed file \""+path+"\" with non-splittable compression codec");
return codec.createInputStream(fileIn,decompressor);
}
}
}