protected synchronized void open(Path dir, WritableComparator comparator, Configuration conf, SequenceFile.Reader.Option... options ) throws IOException { Path dataFile = new Path(dir, DATA_FILE_NAME); Path indexFile = new Path(dir, INDEX_FILE_NAME); // open the data this.data = createDataFileReader(dataFile, conf, options); this.firstPosition = data.getPosition(); if (comparator == null) { Class<? extends WritableComparable> cls; cls = data.getKeyClass().asSubclass(WritableComparable.class); this.comparator = WritableComparator.get(cls, conf); } else { this.comparator = comparator; } // open the index SequenceFile.Reader.Option[] indexOptions = Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); this.index = new SequenceFile.Reader(conf, indexOptions); }
/** * Create a new Writer with the given options. * @param conf the configuration to use * @param opts the options to create the file with * @return a new Writer * @throws IOException */ public static Writer createWriter(Configuration conf, Writer.Option... opts ) throws IOException { Writer.CompressionOption compressionOption = Options.getOption(Writer.CompressionOption.class, opts); CompressionType kind; if (compressionOption != null) { kind = compressionOption.getValue(); } else { kind = getDefaultCompressionType(conf); opts = Options.prependOptions(opts, Writer.compression(kind)); } switch (kind) { default: case NONE: return new Writer(conf, opts); case RECORD: return new RecordCompressWriter(conf, opts); case BLOCK: return new BlockCompressWriter(conf, opts); } }
protected synchronized void open(Path dir, WritableComparator comparator, Configuration conf, SequenceFile.Reader.Option... options ) throws IOException { Path dataFile = new Path(dir, DATA_FILE_NAME); Path indexFile = new Path(dir, INDEX_FILE_NAME); // open the data this.data = createDataFileReader(dataFile, conf, options); this.firstPosition = data.getPosition(); if (comparator == null) this.comparator = WritableComparator.get(data.getKeyClass(). asSubclass(WritableComparable.class)); else this.comparator = comparator; // open the index SequenceFile.Reader.Option[] indexOptions = Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); this.index = new SequenceFile.Reader(conf, indexOptions); }
public Reader(Path dir, Configuration conf, SequenceFile.Reader.Option... opts) throws IOException { ComparatorOption comparatorOption = Options.getOption(ComparatorOption.class, opts); WritableComparator comparator = comparatorOption == null ? null : comparatorOption.getValue(); INDEX_SKIP = conf.getInt("io.map.index.skip", 0); open(dir, comparator, conf, opts); }
/** * Override this method to specialize the type of * {@link SequenceFile.Reader} returned. */ protected SequenceFile.Reader createDataFileReader(Path dataFile, Configuration conf, SequenceFile.Reader.Option... options ) throws IOException { SequenceFile.Reader.Option[] newOptions = Options.prependOptions(options, SequenceFile.Reader.file(dataFile)); return new SequenceFile.Reader(conf, newOptions); }
public Reader(Configuration conf, Option... opts) throws IOException { // Look up the options, these are null if not set FileOption fileOpt = Options.getOption(FileOption.class, opts); InputStreamOption streamOpt = Options.getOption(InputStreamOption.class, opts); StartOption startOpt = Options.getOption(StartOption.class, opts); LengthOption lenOpt = Options.getOption(LengthOption.class, opts); BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts); // check for consistency if ((fileOpt == null) == (streamOpt == null)) { throw new IllegalArgumentException("File or stream option must be specified"); } if (fileOpt == null && bufOpt != null) { throw new IllegalArgumentException("buffer size can only be set when" + " a file is specified."); } // figure out the real values Path filename = null; FSDataInputStream file; final long len; if (fileOpt != null) { filename = fileOpt.getValue(); FileSystem fs = filename.getFileSystem(conf); int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); len = null == lenOpt ? fs.getFileStatus(filename).getLen() : lenOpt.getValue(); file = openFile(fs, filename, bufSize, len); } else { len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); file = streamOpt.getValue(); } long start = startOpt == null ? 0 : startOpt.getValue(); // really set up initialize(filename, file, start, len, conf, headerOnly != null); }
Writer(Configuration conf, Option... opts) throws IOException { BlockSizeOption blockSizeOption = Options.getOption(BlockSizeOption.class, opts); BufferSizeOption bufferSizeOption = Options.getOption(BufferSizeOption.class, opts); ReplicationOption replicationOption = Options.getOption(ReplicationOption.class, opts); FileOption fileOption = Options.getOption(FileOption.class, opts); AppendIfExistsOption appendIfExistsOption = Options.getOption( AppendIfExistsOption.class, opts); StreamOption streamOption = Options.getOption(StreamOption.class, opts); // check consistency of options if ((fileOption == null) == (streamOption == null)) { throw new IllegalArgumentException("file or stream must be specified"); } if (fileOption == null && (blockSizeOption != null || bufferSizeOption != null || replicationOption != null)) { throw new IllegalArgumentException("file modifier options not " + "compatible with stream"); } FSDataOutputStream out; boolean ownStream = fileOption != null; if (ownStream) { Path p = fileOption.getValue(); FileSystem fs; fs = p.getFileSystem(conf); int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : bufferSizeOption.getValue(); short replication = replicationOption == null ? fs.getDefaultReplication(p) : (short) replicationOption.getValue(); long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize(p) : blockSizeOption.getValue(); if (appendIfExistsOption != null && appendIfExistsOption.getValue() && fs.exists(p)) { // Read the file and verify header details try (WALFile.Reader reader = new WALFile.Reader(conf, WALFile.Reader.file(p), new Reader.OnlyHeaderOption())){ if (reader.getVersion() != VERSION[3]) { throw new VersionMismatchException(VERSION[3], reader.getVersion()); } sync = reader.getSync(); } out = fs.append(p, bufferSize); this.appendMode = true; } else { out = fs.create(p, true, bufferSize, replication, blockSize); } } else { out = streamOption.getValue(); } init(conf, out, ownStream); }
public Reader(Configuration conf, Option... opts) throws IOException { // Look up the options, these are null if not set FileOption fileOpt = Options.getOption(FileOption.class, opts); InputStreamOption streamOpt = Options.getOption(InputStreamOption.class, opts); StartOption startOpt = Options.getOption(StartOption.class, opts); LengthOption lenOpt = Options.getOption(LengthOption.class, opts); BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class, opts); OnlyHeaderOption headerOnly = Options.getOption(OnlyHeaderOption.class, opts); // check for consistency if ((fileOpt == null) == (streamOpt == null)) { throw new IllegalArgumentException("File or stream option must be specified"); } if (fileOpt == null && bufOpt != null) { throw new IllegalArgumentException("buffer size can only be set when" + " a file is specified."); } // figure out the real values Path filename = null; FSDataInputStream file; final long len; if (fileOpt != null) { filename = fileOpt.getValue(); FileSystem fs = filename.getFileSystem(conf); int bufSize = bufOpt == null ? getBufferSize(conf) : bufOpt.getValue(); len = null == lenOpt ? fs.getFileStatus(filename).getLen() : lenOpt.getValue(); file = openFile(fs, filename, bufSize, len); } else { len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue(); file = streamOpt.getValue(); } long start = startOpt == null ? 0 : startOpt.getValue(); // really set up initialize(filename, file, start, len, conf, headerOnly != null); }