private void codecTestMapFile(Class<? extends CompressionCodec> clazz, CompressionType type, int records) throws Exception { FileSystem fs = FileSystem.get(conf); LOG.info("Creating MapFiles with " + records + " records using codec " + clazz.getSimpleName()); Path path = new Path(new Path( System.getProperty("test.build.data", "/tmp")), clazz.getSimpleName() + "-" + type + "-" + records); LOG.info("Writing " + path); createMapFile(conf, fs, path, clazz.newInstance(), type, records); MapFile.Reader reader = new MapFile.Reader(path, conf); Text key1 = new Text("002"); assertNotNull(reader.get(key1, new Text())); Text key2 = new Text("004"); assertNotNull(reader.get(key2, new Text())); }
@Override protected List<FileStatus> listStatus(JobContext job )throws IOException { List<FileStatus> files = super.listStatus(job); int len = files.size(); for(int i=0; i < len; ++i) { FileStatus file = files.get(i); if (file.isDirectory()) { // it's a MapFile Path p = file.getPath(); FileSystem fs = p.getFileSystem(job.getConfiguration()); // use the data file files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME))); } } return files; }
private void validateMapFileOutputContent( FileSystem fs, Path dir) throws IOException { // map output is a directory with index and data files Path expectedMapDir = new Path(dir, partFile); assert(fs.getFileStatus(expectedMapDir).isDirectory()); FileStatus[] files = fs.listStatus(expectedMapDir); int fileCount = 0; boolean dataFileFound = false; boolean indexFileFound = false; for (FileStatus f : files) { if (f.isFile()) { ++fileCount; if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) { indexFileFound = true; } else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) { dataFileFound = true; } } } assert(fileCount > 0); assert(dataFileFound && indexFileFound); }
/** Open the output generated by this format. */ public static MapFile.Reader[] getReaders(Path dir, Configuration conf) throws IOException { FileSystem fs = dir.getFileSystem(conf); PathFilter filter = new PathFilter() { @Override public boolean accept(Path path) { String name = path.getName(); if (name.startsWith("_") || name.startsWith(".")) return false; return true; } }; Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, filter)); // sort names, so that hash partitioning works Arrays.sort(names); MapFile.Reader[] parts = new MapFile.Reader[names.length]; for (int i = 0; i < names.length; i++) { parts[i] = new MapFile.Reader(fs, names[i].toString(), conf); } return parts; }
public static final IndexedMapFile getSharedMapFile(String symbol, JobConf job) throws IOException { int slots = job.getInt(symbol, 0); if (slots <=0) { log.error("slots number should be no less than 1 !!!"); System.exit(-1); } FileSystem fs = FileSystem.getLocal(job); MapFile.Reader[] readers = new MapFile.Reader[slots]; for (int i=0; i<slots; i++) { String symbfile = fs.getWorkingDirectory().toString() + "/" + symbol + "-" + Integer.toString(i); readers[i] = new MapFile.Reader(fs, symbfile, job); } return new IndexedMapFile(slots, readers); }
private List<Writable> getMapRecords(Path dir, Text key) throws Exception { MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf()); ArrayList<Writable> res = new ArrayList<Writable>(); Class<?> keyClass = readers[0].getKeyClass(); Class<?> valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); Writable value = (Writable) valueClass.newInstance(); // we don't know the partitioning schema for (int i = 0; i < readers.length; i++) { if (readers[i].get(key, value) != null) { res.add(value); value = (Writable) valueClass.newInstance(); Text aKey = (Text) keyClass.newInstance(); while (readers[i].next(aKey, value) && aKey.equals(key)) { res.add(value); value = (Writable) valueClass.newInstance(); } } readers[i].close(); } return res; }
private void createCrawlDb(Configuration config, FileSystem fs, Path crawldb, TreeSet<String> init, CrawlDatum cd) throws Exception { LOG.fine("* creating crawldb: " + crawldb); Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME); Option wKeyOpt = MapFile.Writer.keyClass(Text.class); org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(CrawlDatum.class); MapFile.Writer writer = new MapFile.Writer(config, new Path(dir, "part-00000"), wKeyOpt, wValueOpt); Iterator<String> it = init.iterator(); while (it.hasNext()) { String key = it.next(); writer.append(new Text(key), cd); } writer.close(); }
/** * Creates synthetic crawldb * * @param fs * filesystem where db will be created * @param crawldb * path were db will be created * @param init * urls to be inserted, objects are of type URLCrawlDatum * @throws Exception */ public static void createCrawlDb(Configuration conf, FileSystem fs, Path crawldb, List<URLCrawlDatum> init) throws Exception { LOG.trace("* creating crawldb: " + crawldb); Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME); Option wKeyOpt = MapFile.Writer.keyClass(Text.class); org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(CrawlDatum.class); MapFile.Writer writer = new MapFile.Writer(conf, new Path(dir, "part-00000"), wKeyOpt, wValueOpt); Iterator<URLCrawlDatum> it = init.iterator(); while (it.hasNext()) { URLCrawlDatum row = it.next(); LOG.info("adding:" + row.url.toString()); writer.append(new Text(row.url), row.datum); } writer.close(); }
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb, TreeMap<String, String[]> init) throws Exception { LOG.fine("* creating linkdb: " + linkdb); Path dir = new Path(linkdb, LinkDb.CURRENT_NAME); Option wKeyOpt = MapFile.Writer.keyClass(Text.class); org.apache.hadoop.io.SequenceFile.Writer.Option wValueOpt = SequenceFile.Writer.valueClass(Inlinks.class); MapFile.Writer writer = new MapFile.Writer(config, new Path(dir, "part-00000"), wKeyOpt, wValueOpt); Iterator<String> it = init.keySet().iterator(); while (it.hasNext()) { String key = it.next(); Inlinks inlinks = new Inlinks(); String[] vals = init.get(key); for (int i = 0; i < vals.length; i++) { Inlink in = new Inlink(vals[i], vals[i]); inlinks.add(in); } writer.append(new Text(key), inlinks); } writer.close(); }
/** Open the output generated by this format. */ private MapFile.Reader[] getReaders(String subDir) throws IOException { Path dir = new Path(segmentDir, subDir); FileSystem fs = dir.getFileSystem(conf); Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, SegmentPathFilter.INSTANCE)); // sort names, so that hash partitioning works Arrays.sort(names); MapFile.Reader[] parts = new MapFile.Reader[names.length]; for (int i = 0; i < names.length; i++) { parts[i] = new MapFile.Reader(names[i], conf); } return parts; }
/** * * @param outputDir Output directory for the map file(s) * @param mapFileSplitSize Split size for the map file: if 0, use a single map file for all output. If > 0, * multiple map files will be used: each will contain a maximum of mapFileSplitSize. * This can be used to avoid having a single multi gigabyte map file, which may be * undesirable in some cases (transfer across the network, for example) * @param convertTextTo If null: Make no changes to Text writable objects. If non-null, Text writable instances * will be converted to this type. This is useful, when would rather store numerical values * even if the original record reader produces strings/text. * @param indexInterval Index interval for the Map file. Defaults to 1, which is suitable for most cases * @param filenamePattern The naming pattern for the map files. Used with String.format(pattern, int) * @param hadoopConfiguration Hadoop configuration. */ public AbstractMapFileWriter(@NonNull File outputDir, int mapFileSplitSize, WritableType convertTextTo, int indexInterval, String filenamePattern, org.apache.hadoop.conf.Configuration hadoopConfiguration) { if(indexInterval <= 0){ throw new UnsupportedOperationException("Index interval: must be >= 0 (got: " + indexInterval + ")"); } this.outputDir = outputDir; this.mapFileSplitSize = mapFileSplitSize; if (convertTextTo == WritableType.Text) { convertTextTo = null; } this.convertTextTo = convertTextTo; this.indexInterval = indexInterval; this.filenamePattern = filenamePattern; this.hadoopConfiguration = hadoopConfiguration; if(this.hadoopConfiguration.get(MAP_FILE_INDEX_INTERVAL_KEY) != null){ this.hadoopConfiguration.set(MAP_FILE_INDEX_INTERVAL_KEY, String.valueOf(indexInterval)); } opts = new SequenceFile.Writer.Option[]{MapFile.Writer.keyClass(KEY_CLASS), SequenceFile.Writer.valueClass(getValueClass())}; }
public MapFileReader(List<String> paths, IndexToKey indexToKey, Class<? extends Writable> recordClass) throws IOException { this.indexToKey = indexToKey; this.recordClass = recordClass; this.readers = new MapFile.Reader[paths.size()]; SequenceFile.Reader.Option[] opts = new SequenceFile.Reader.Option[0]; Configuration config = new Configuration(); for (int i = 0; i < paths.size(); i++) { readers[i] = new MapFile.Reader(new Path(paths.get(i)), config, opts); if (readers[i].getValueClass() != recordClass) { throw new UnsupportedOperationException("MapFile record class: " + readers[i].getValueClass() + ", but got class " + recordClass + ", path = " + paths.get(i)); } } recordIndexesEachReader = indexToKey.initialize(readers, recordClass); }
@Test public void testMapWriteTextWithKey() throws Exception { if (!canTest()) { return; } String txtKey = "THEKEY"; String txtValue = "CIAO MONDO !"; template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey); Configuration conf = new Configuration(); MapFile.Reader reader = new MapFile.Reader(new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3"), conf); Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(key, value); assertEquals(key.toString(), txtKey); assertEquals(value.toString(), txtValue); IOHelper.close(reader); }
@Test public void testMapWriteTextWithKey() throws Exception { if (!canTest()) { return; } String txtKey = "THEKEY"; String txtValue = "CIAO MONDO !"; template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey); Configuration conf = new Configuration(); Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text3", conf); Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(key, value); assertEquals(key.toString(), txtKey); assertEquals(value.toString(), txtValue); IOHelper.close(reader); }
/** Open the output generated by this format. */ public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir, Configuration conf) throws IOException { FileSystem fs = dir.getFileSystem(conf); Path[] names = FileUtil.stat2Paths(fs.listStatus(dir)); // sort names, so that hash partitioning works Arrays.sort(names); MapFile.Reader[] parts = new MapFile.Reader[names.length]; for (int i = 0; i < names.length; i++) { parts[i] = new MapFile.Reader(fs, names[i].toString(), conf); } return parts; }
@Override protected List<FileStatus> listStatus(JobContext job )throws IOException { List<FileStatus> files = super.listStatus(job); int len = files.size(); for(int i=0; i < len; ++i) { FileStatus file = files.get(i); if (file.isDir()) { // it's a MapFile Path p = file.getPath(); FileSystem fs = p.getFileSystem(job.getConfiguration()); // use the data file files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME))); } } return files; }
@Override protected List<LocatedFileStatus> listLocatedStatus(JobContext job )throws IOException { List<LocatedFileStatus> files = super.listLocatedStatus(job); int len = files.size(); for(int i=0; i < len; ++i) { FileStatus file = files.get(i); if (file.isDir()) { // it's a MapFile Path p = file.getPath(); FileSystem fs = p.getFileSystem(job.getConfiguration()); // use the data file files.set(i, fs.listLocatedStatus( new Path(p, MapFile.DATA_FILE_NAME)).next()); } } return files; }
public static void main(String[] args) throws IOException { String uri = args[0]; conf = new Configuration(); fs = FileSystem.get(URI.create(uri),conf); path = new Path(uri); try { reader = new MapFile.Reader(fs, uri, conf); WritableComparable key = (WritableComparable) ReflectionUtils.newInstance(reader.getKeyClass(),conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(),conf); while(reader.next(key,value)) { System.out.printf("%s\t%s\n", key, value); } } finally { IOUtils.closeStream(reader); } }
private static void runHadoopGetNames(String in) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); if (!fs.exists(new Path(in))) { System.out.println("Error: Hdfs file " + in + " does not exist!"); System.exit(-1); } MapFile.Reader reader = null; try { reader = new MapFile.Reader(fs, in, conf); Text key = (Text) reader.getKeyClass().newInstance(); BytesWritable value = (BytesWritable) reader.getValueClass().newInstance(); while (reader.next(key, value)) { System.out.println(key.toString()); } } catch (Exception e) { e.printStackTrace(); if (reader != null) reader.close(); } }
private void writeMapFile() throws Exception { Path path = Testfile.MAPFILE.filepath(); Text key = new Text(); Text value = new Text(); long fsMinBlockSize = conf.getLong("dfs.namenode.fs-limits.min-block-size", 0); long testBlockSize = (blockSize < fsMinBlockSize ) ? fsMinBlockSize : (long)blockSize; MapFile.Writer writer = new MapFile.Writer(conf, path, MapFile.Writer.keyClass(key.getClass()), MapFile.Writer.valueClass(value.getClass()), MapFile.Writer.compression(SequenceFile.CompressionType.NONE), SequenceFile.Writer.blockSize(testBlockSize), SequenceFile.Writer.bufferSize((int)testBlockSize)); for (int i = 0; i < testSize; i++) { key.set(getKey(i)); value.set(getValue()); writer.append(key, value); } IOUtils.closeStream(writer); }
private List<Writable> getMapRecords(Path dir, Text key) throws Exception { MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf()); ArrayList<Writable> res = new ArrayList<Writable>(); Class keyClass = readers[0].getKeyClass(); Class valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); Writable value = (Writable)valueClass.newInstance(); // we don't know the partitioning schema for (int i = 0; i < readers.length; i++) { if (readers[i].get(key, value) != null) { res.add(value); value = (Writable)valueClass.newInstance(); Text aKey = (Text) keyClass.newInstance(); while (readers[i].next(aKey, value) && aKey.equals(key)) { res.add(value); value = (Writable)valueClass.newInstance(); } } readers[i].close(); } return res; }
private void createLinkDb(Configuration config, FileSystem fs, Path linkdb, TreeMap init) throws Exception { LOG.fine("* creating linkdb: " + linkdb); Path dir = new Path(linkdb, LinkDb.CURRENT_NAME); MapFile.Writer writer = new MapFile.Writer(config, fs, new Path(dir, "part-00000").toString(), Text.class, Inlinks.class); Iterator it = init.keySet().iterator(); while (it.hasNext()) { String key = (String)it.next(); Inlinks inlinks = new Inlinks(); String[] vals = (String[])init.get(key); for (int i = 0; i < vals.length; i++) { Inlink in = new Inlink(vals[i], vals[i]); inlinks.add(in); } writer.append(new Text(key), inlinks); } writer.close(); }