public AvroFileInputStream(FileStatus status) throws IOException { pos = 0; buffer = new byte[0]; GenericDatumReader<Object> reader = new GenericDatumReader<Object>(); FileContext fc = FileContext.getFileContext(new Configuration()); fileReader = DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader); Schema schema = fileReader.getSchema(); writer = new GenericDatumWriter<Object>(schema); output = new ByteArrayOutputStream(); JsonGenerator generator = new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8); MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter(); prettyPrinter.setRootValueSeparator(System.getProperty("line.separator")); generator.setPrettyPrinter(prettyPrinter); encoder = EncoderFactory.get().jsonEncoder(schema, generator); }
/** * Get a reader for the specified Avro file. A utility function. * @param path path to the existing file * @param readerSchema optional reader schema. If you want to use the * default option of using writer schema as the reader schema, pass the * {@code null} value. * @throws IOException */ private static <T> DataFileReader<T> getSingleFileReader( FileSystemPath path, Schema readerSchema) throws IOException{ try{ SpecificDatumReader<T> datumReader = new SpecificDatumReader<T>(); if(readerSchema != null){ datumReader.setExpected(readerSchema); } long len = path.getFileSystem().getFileStatus(path.getPath()).getLen(); FSDataInputStream inputStream = path.getFileSystem().open(path.getPath()); return new DataFileReader<T>( new AvroFSInput(inputStream, len), datumReader); } catch (IOException ex){ throw new IOException("Problem with file \""+ path.getPath().toString()+"\": "+ex.getMessage(), ex); } }
@Override public void open() { Preconditions.checkState(state.equals(ReaderWriterState.NEW), "A reader may not be opened more than once - current state:%s", state); logger.debug("Opening reader on path:{}", path); try { reader = new DataFileReader<E>(new AvroFSInput(fileSystem.open(path), fileSystem.getFileStatus(path).getLen()), new ReflectDatumReader<E>( schema)); } catch (IOException e) { throw new DatasetReaderException("Unable to create reader path:" + path, e); } state = ReaderWriterState.OPEN; }
public AvroFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException { super(fs, filePath, new GenericRecordToStruct(), config); AvroFSInput input = new AvroFSInput(FileContext.getFileContext(filePath.toUri()), filePath); this.reader = new DataFileReader<>(input, new SpecificDatumReader<>(this.schema)); this.offset = new AvroOffset(0); }
@Override public void open(String pathStr, String singleFileOffset) { LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", systemStreamPartition, pathStr, singleFileOffset)); Path path = new Path(pathStr); try { AvroFSInput input = new AvroFSInput(FileContext.getFileContext(path.toUri()), path); fileReader = new DataFileReader<>(input, new GenericDatumReader<>()); seek(singleFileOffset); } catch (IOException e) { throw new SamzaException(e); } }
public static void main(String[] args) throws IOException { if (args.length == 0) { System.out.println("AvroReader {dataFile} {schemaFile} {max.lines.to.read.optional}"); } String dataFile = args[0]; String schemaFile = args[1]; int recordsToRead = Integer.MAX_VALUE; if (args.length > 2) { recordsToRead = Integer.parseInt(args[2]); } Schema.Parser parser = new Schema.Parser(); Configuration config = new Configuration(); FileSystem fs = FileSystem.get(config); Schema schema = parser.parse(fs.open(new Path(schemaFile))); Path dataFilePath = new Path(dataFile); FileStatus fileStatus = fs.getFileStatus(dataFilePath); AvroFSInput input = new AvroFSInput(fs.open(dataFilePath), fileStatus.getLen()); DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(input, datumReader); System.out.println("Schema: " + dataFileReader.getSchema()); System.out.println(); int counter = 0; while (dataFileReader.hasNext() && counter++ < recordsToRead) { GenericRecord r = dataFileReader.next(); System.out.println(counter + " : " + r); } }