@Override protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException { if (isHCatJob) { return SqoopHCatUtilities.getInputFormatClass(); } switch (fileType) { case AVRO_DATA_FILE: return AvroInputFormat.class; case PARQUET_FILE: return DatasetKeyInputFormat.class; default: Class<? extends InputFormat> configuredIF = super.getInputFormatClass(); if (null == configuredIF) { return ExportInputFormat.class; } else { return configuredIF; } } }
@Test public void testReinit() throws Exception { // Test that a split containing multiple files works correctly, // with the child RecordReader getting its initialize() method // called a second time. TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0); Configuration conf = new Configuration(); TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId); // This will create a CombineFileRecordReader that itself contains a // DummyRecordReader. InputFormat inputFormat = new ChildRRInputFormat(); Path [] files = { new Path("file1"), new Path("file2") }; long [] lengths = { 1, 1 }; CombineFileSplit split = new CombineFileSplit(files, lengths); RecordReader rr = inputFormat.createRecordReader(split, context); assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); // first initialize() call comes from MapTask. We'll do it here. rr.initialize(split, context); // First value is first filename. assertTrue(rr.nextKeyValue()); assertEquals("file1", rr.getCurrentValue().toString()); // The inner RR will return false, because it only emits one (k, v) pair. // But there's another sub-split to process. This returns true to us. assertTrue(rr.nextKeyValue()); // And the 2nd rr will have its initialize method called correctly. assertEquals("file2", rr.getCurrentValue().toString()); // But after both child RR's have returned their singleton (k, v), this // should also return false. assertFalse(rr.nextKeyValue()); }
private static List<Text> readSplit(InputFormat<LongWritable,Text> format, InputSplit split, Job job) throws IOException, InterruptedException { List<Text> result = new ArrayList<Text>(); Configuration conf = job.getConfiguration(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(conf); RecordReader<LongWritable, Text> reader = format.createRecordReader(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); MapContext<LongWritable,Text,LongWritable,Text> mcontext = new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf, context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); while (reader.nextKeyValue()) { result.add(new Text(reader.getCurrentValue())); } return result; }
@SuppressWarnings("unchecked") public void testAddInputPathWithMapper() throws IOException { final Job conf = Job.getInstance(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class, MapClass.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class, KeyValueMapClass.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); final Map<Path, Class<? extends Mapper>> maps = MultipleInputs .getMapperTypeMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); assertEquals(MapClass.class, maps.get(new Path("/foo"))); assertEquals(KeyValueMapClass.class, maps.get(new Path("/bar"))); }
public static void main(String[] args) { Configuration conf = new Configuration(); // assume defaults on CP conf.setClass("mapreduce.job.inputformat.class", DwCAInputFormat.class, InputFormat.class); conf.setStrings("mapreduce.input.fileinputformat.inputdir", "hdfs://ha-nn/tmp/dwca-lep5.zip"); conf.setClass("key.class", Text.class, Object.class); conf.setClass("value.class", ExtendedRecord.class, Object.class); Pipeline p = newPipeline(args, conf); Coders.registerAvroCoders(p, UntypedOccurrence.class, TypedOccurrence.class, ExtendedRecord.class); PCollection<KV<Text, ExtendedRecord>> rawRecords = p.apply("Read DwC-A", HadoopInputFormatIO.<Text, ExtendedRecord>read().withConfiguration(conf)); PCollection<UntypedOccurrence> verbatimRecords = rawRecords.apply( "Convert to Avro", ParDo.of(fromExtendedRecordKVP())); verbatimRecords.apply( "Write Avro files", AvroIO.write(UntypedOccurrence.class).to("hdfs://ha-nn/tmp/dwca-lep5.avro")); LOG.info("Starting the pipeline"); PipelineResult result = p.run(); result.waitUntilFinish(); LOG.info("Pipeline finished with state: {} ", result.getState()); }
void testInputFormat(Class<? extends InputFormat> clazz) throws IOException, InterruptedException, ClassNotFoundException { final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); job.setInputFormatClass(clazz); job.setOutputFormatClass(NullOutputFormat.class); job.setMapperClass(ExampleVerifier.class); job.setNumReduceTasks(0); LOG.debug("submitting job."); assertTrue("job failed!", job.waitForCompletion(true)); assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue()); assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue()); assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue()); assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue()); assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue()); assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters() .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue()); }
public LocalMapTask(InputFormat<INKEY, INVALUE> inputFormat, OutputFormat<OUTKEY, OUTVALUE> outputFormat, Configuration conf, int id, InputSplit split, ContentPumpReporter reporter, AtomicInteger pctProgress) { this.inputFormat = inputFormat; this.outputFormat = outputFormat; this.conf = conf; this.id = id; this.split = split; this.pctProgress = pctProgress; this.reporter = reporter; try { mapperClass = job.getMapperClass(); } catch (ClassNotFoundException e) { LOG.error("Mapper class not found", e); } }
/** * This test validates behavior of * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop * InputFormat's {@link InputFormat#getSplits(JobContext)} returns empty list. */ @Test public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception { InputFormat<?, ?> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class); Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn( new ArrayList<InputSplit>()); HadoopInputFormatBoundedSource<Text, Employee> hifSource = new HadoopInputFormatBoundedSource<Text, Employee>( serConf, WritableCoder.of(Text.class), AvroCoder.of(Employee.class), null, // No key translation required. null, // No value translation required. mockInputSplit); thrown.expect(IOException.class); thrown.expectMessage("Error in computing splits, getSplits() returns a empty list"); hifSource.setInputFormatObj(mockInputFormat); hifSource.computeSplitsIfNecessary(); }
/** * This test validates behavior of * {@link HadoopInputFormatBoundedSource.HadoopInputFormatReader#start() start()} method if * InputFormat's {@link InputFormat#getSplits() getSplits()} returns InputSplitList having zero * records. */ @Test public void testReadersStartWhenZeroRecords() throws Exception { InputFormat mockInputFormat = Mockito.mock(EmployeeInputFormat.class); EmployeeRecordReader mockReader = Mockito.mock(EmployeeRecordReader.class); Mockito.when( mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), Mockito.any(TaskAttemptContext.class))).thenReturn(mockReader); Mockito.when(mockReader.nextKeyValue()).thenReturn(false); InputSplit mockInputSplit = Mockito.mock(NewObjectsEmployeeInputSplit.class); HadoopInputFormatBoundedSource<Text, Employee> boundedSource = new HadoopInputFormatBoundedSource<Text, Employee>( serConf, WritableCoder.of(Text.class), AvroCoder.of(Employee.class), null, // No key translation required. null, // No value translation required. new SerializableSplit(mockInputSplit)); boundedSource.setInputFormatObj(mockInputFormat); BoundedReader<KV<Text, Employee>> reader = boundedSource.createReader(p.getOptions()); assertEquals(false, reader.start()); assertEquals(Double.valueOf(1), reader.getFractionConsumed()); reader.close(); }
@Override public Iterator<Vertex> head(final String location, final Class readerClass, final int totalLines) { final Configuration configuration = new BaseConfiguration(); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, location); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, readerClass.getCanonicalName()); try { if (InputRDD.class.isAssignableFrom(readerClass)) { return IteratorUtils.map(((InputRDD) readerClass.getConstructor().newInstance()).readGraphRDD(configuration, new JavaSparkContext(Spark.getContext())).take(totalLines).iterator(), tuple -> tuple._2().get()); } else if (InputFormat.class.isAssignableFrom(readerClass)) { return IteratorUtils.map(new InputFormatRDD().readGraphRDD(configuration, new JavaSparkContext(Spark.getContext())).take(totalLines).iterator(), tuple -> tuple._2().get()); } } catch (final Exception e) { throw new IllegalArgumentException(e.getMessage(), e); } throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or an " + InputRDD.class.getCanonicalName() + ": " + readerClass.getCanonicalName()); }
@Override public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class readerClass, final int totalLines) { final Configuration configuration = new BaseConfiguration(); configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, location); configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, readerClass.getCanonicalName()); try { if (InputRDD.class.isAssignableFrom(readerClass)) { return IteratorUtils.map(((InputRDD) readerClass.getConstructor().newInstance()).readMemoryRDD(configuration, memoryKey, new JavaSparkContext(Spark.getContext())).take(totalLines).iterator(), tuple -> new KeyValue(tuple._1(), tuple._2())); } else if (InputFormat.class.isAssignableFrom(readerClass)) { return IteratorUtils.map(new InputFormatRDD().readMemoryRDD(configuration, memoryKey, new JavaSparkContext(Spark.getContext())).take(totalLines).iterator(), tuple -> new KeyValue(tuple._1(), tuple._2())); } } catch (final Exception e) { throw new IllegalArgumentException(e.getMessage(), e); } throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or an " + InputRDD.class.getCanonicalName() + ": " + readerClass.getCanonicalName()); }
@SuppressWarnings({ "rawtypes", "unchecked" }) private Dataset<Row> readInputFormat(String path) throws Exception { String inputType = config.getString(INPUT_FORMAT_TYPE_CONFIG); String keyType = config.getString(INPUT_FORMAT_KEY_CONFIG); String valueType = config.getString(INPUT_FORMAT_VALUE_CONFIG); LOG.debug("Reading InputFormat[{}]: {}", inputType, path); Class<? extends InputFormat> typeClazz = Class.forName(inputType).asSubclass(InputFormat.class); Class<?> keyClazz = Class.forName(keyType); Class<?> valueClazz = Class.forName(valueType); @SuppressWarnings("resource") JavaSparkContext context = new JavaSparkContext(Contexts.getSparkSession().sparkContext()); JavaPairRDD<?, ?> rdd = context.newAPIHadoopFile(path, typeClazz, keyClazz, valueClazz, new Configuration()); TranslateFunction translateFunction = new TranslateFunction(config.getConfig("translator")); return Contexts.getSparkSession().createDataFrame(rdd.flatMap(translateFunction), translateFunction.getSchema()); }
/** * Returns Hadoop configuration for reading data from Elasticsearch. Configuration object should * have InputFormat class, key class and value class to be set. Mandatory fields for ESInputFormat * to be set are es.resource, es.nodes, es.port, es.internal.es.version, es.nodes.wan.only. Please * refer <a href="https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html" * >Elasticsearch Configuration</a> for more details. */ private static Configuration getConfiguration(HIFTestOptions options) { Configuration conf = new Configuration(); conf.set(ConfigurationOptions.ES_NODES, options.getElasticServerIp()); conf.set(ConfigurationOptions.ES_PORT, options.getElasticServerPort().toString()); conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, TRUE); // Set username and password if Elasticsearch is configured with security. conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, options.getElasticUserName()); conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, options.getElasticPassword()); conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE); conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION); conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, TRUE); conf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat.class, InputFormat.class); conf.setClass("key.class", Text.class, Object.class); conf.setClass("value.class", LinkedMapWritable.class, Object.class); // Optimizations added to change the max docs per partition, scroll size and batch size of // bytes to improve the test time for large data conf.set("es.input.max.docs.per.partition", "50000"); conf.set("es.scroll.size", "400"); conf.set("es.batch.size.bytes", "8mb"); return conf; }
/** * Returns Hadoop configuration for reading data from Cassandra. To read data from Cassandra using * HadoopInputFormatIO, following properties must be set: InputFormat class, InputFormat key * class, InputFormat value class, Thrift address, Thrift port, partitioner class, keyspace and * columnfamily name. */ private static Configuration getConfiguration(HIFTestOptions options) { Configuration conf = new Configuration(); conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, options.getCassandraServerPort().toString()); conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, options.getCassandraServerIp()); conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE); conf.set(CASSANDRA_KEYSPACE_PROPERTY, CASSANDRA_KEYSPACE); conf.set(CASSANDRA_COLUMNFAMILY_PROPERTY, CASSANDRA_TABLE); // Set user name and password if Cassandra instance has security configured. conf.set(USERNAME, options.getCassandraUserName()); conf.set(PASSWORD, options.getCassandraPassword()); conf.set(INPUT_KEYSPACE_USERNAME_CONFIG, options.getCassandraUserName()); conf.set(INPUT_KEYSPACE_PASSWD_CONFIG, options.getCassandraPassword()); conf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat.class, InputFormat.class); conf.setClass("key.class", java.lang.Long.class, Object.class); conf.setClass("value.class", com.datastax.driver.core.Row.class, Object.class); return conf; }
/** * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform() * Read.validateTransform()} function when myKeyTranslate's (simple function provided by user for * key translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set * in configuration as "key.class"). */ @Test public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() { SimpleFunction<LongWritable, String> myKeyTranslateWithWrongInputType = new SimpleFunction<LongWritable, String>() { @Override public String apply(LongWritable input) { return input.toString(); } }; HadoopInputFormatIO.Read<String, Employee> read = HadoopInputFormatIO.<String, Employee>read() .withConfiguration(serConf.get()) .withKeyTranslation(myKeyTranslateWithWrongInputType); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(String.format( "Key translation's input type is not same as hadoop InputFormat : %s key " + "class : %s", serConf.get().getClass("mapreduce.job.inputformat.class", InputFormat.class), serConf.get() .getClass("key.class", Object.class))); read.validateTransform(); }
/** * This test validates functionality of {@link HadoopInputFormatIO.Read#validateTransform() * Read.validateTransform()} function when myValueTranslate's (simple function provided by user * for value translation) input type is not same as Hadoop InputFormat's valueClass(Which is * property set in configuration as "value.class"). */ @Test public void testReadValidationFailsWithWrongInputTypeValueTranslationFunction() { SimpleFunction<LongWritable, String> myValueTranslateWithWrongInputType = new SimpleFunction<LongWritable, String>() { @Override public String apply(LongWritable input) { return input.toString(); } }; HadoopInputFormatIO.Read<Text, String> read = HadoopInputFormatIO.<Text, String>read() .withConfiguration(serConf.get()) .withValueTranslation(myValueTranslateWithWrongInputType); String expectedMessage = String.format( "Value translation's input type is not same as hadoop InputFormat : " + "%s value class : %s", serConf.get().getClass("mapreduce.job.inputformat.class", InputFormat.class), serConf.get().getClass("value.class", Object.class)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(expectedMessage); read.validateTransform(); }
/** * This test validates behavior of {@link HadoopInputFormatBoundedSource} if RecordReader object * creation fails. */ @Test public void testReadIfCreateRecordReaderFails() throws Exception { thrown.expect(Exception.class); thrown.expectMessage("Exception in creating RecordReader"); InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); Mockito.when( mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), Mockito.any(TaskAttemptContext.class))).thenThrow( new IOException("Exception in creating RecordReader")); HadoopInputFormatBoundedSource<Text, Employee> boundedSource = new HadoopInputFormatBoundedSource<Text, Employee>( serConf, WritableCoder.of(Text.class), AvroCoder.of(Employee.class), null, // No key translation required. null, // No value translation required. new SerializableSplit()); boundedSource.setInputFormatObj(mockInputFormat); SourceTestUtils.readFromSource(boundedSource, p.getOptions()); }
/** * This test validates behavior of HadoopInputFormatSource if * {@link InputFormat#createRecordReader() createRecordReader()} of InputFormat returns null. */ @Test public void testReadWithNullCreateRecordReader() throws Exception { InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class); thrown.expect(IOException.class); thrown.expectMessage(String.format("Null RecordReader object returned by %s", mockInputFormat.getClass())); Mockito.when( mockInputFormat.createRecordReader(Mockito.any(InputSplit.class), Mockito.any(TaskAttemptContext.class))).thenReturn(null); HadoopInputFormatBoundedSource<Text, Employee> boundedSource = new HadoopInputFormatBoundedSource<Text, Employee>( serConf, WritableCoder.of(Text.class), AvroCoder.of(Employee.class), null, // No key translation required. null, // No value translation required. new SerializableSplit()); boundedSource.setInputFormatObj(mockInputFormat); SourceTestUtils.readFromSource(boundedSource, p.getOptions()); }
@Override protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException { if (isHCatJob) { return SqoopHCatUtilities.getInputFormatClass(); } return super.getInputFormatClass(); }
public ImportJobBase(final SqoopOptions opts, final Class<? extends Mapper> mapperClass, final Class<? extends InputFormat> inputFormatClass, final Class<? extends OutputFormat> outputFormatClass, final ImportJobContext context) { super(opts, mapperClass, inputFormatClass, outputFormatClass); this.context = context; }
public JobBase(final SqoopOptions opts, final Class<? extends Mapper> mapperClass, final Class<? extends InputFormat> inputFormatClass, final Class<? extends OutputFormat> outputFormatClass) { this.options = opts; this.mapperClass = mapperClass; this.inputFormatClass = inputFormatClass; this.outputFormatClass = outputFormatClass; isHCatJob = options.getHCatTableName() != null; }
/** * Configure the inputformat to use for the job. */ protected void configureInputFormat(Job job, String tableName, String tableClassName, String splitByCol) throws ClassNotFoundException, IOException { //TODO: 'splitByCol' is import-job specific; lift it out of this API. Class<? extends InputFormat> ifClass = getInputFormatClass(); LOG.debug("Using InputFormat: " + ifClass); job.setInputFormatClass(ifClass); }
@Override protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException { if (isHCatJob) { return SqoopHCatUtilities.getInputFormatClass(); } switch (fileType) { case AVRO_DATA_FILE: return AvroInputFormat.class; case PARQUET_FILE: return DatasetKeyInputFormat.class; default: return super.getInputFormatClass(); } }
@Override protected Class<? extends InputFormat> getInputFormatClass() throws ClassNotFoundException { Class<? extends InputFormat> configuredIF = super.getInputFormatClass(); if (null == configuredIF) { return ExportInputFormat.class; } else { return configuredIF; } }
public ImportJobBase(final SqoopOptions opts, final Class<? extends Mapper> mapperClass, final Class<? extends InputFormat> inputFormatClass, final Class<? extends OutputFormat> outputFormatClass, final ImportJobContext context) { super(opts, mapperClass, inputFormatClass, outputFormatClass, context); }
/** * Allow the user to inject custom mapper, input, and output formats * into the importTable() process. */ @Override @SuppressWarnings("unchecked") public void importTable(ImportJobContext context) throws IOException, ImportException { SqoopOptions options = context.getOptions(); Configuration conf = options.getConf(); Class<? extends Mapper> mapperClass = (Class<? extends Mapper>) conf.getClass(MAPPER_KEY, Mapper.class); Class<? extends InputFormat> ifClass = (Class<? extends InputFormat>) conf.getClass(INPUT_FORMAT_KEY, TextInputFormat.class); Class<? extends OutputFormat> ofClass = (Class<? extends OutputFormat>) conf.getClass(OUTPUT_FORMAT_KEY, TextOutputFormat.class); Class<? extends ImportJobBase> jobClass = (Class<? extends ImportJobBase>) conf.getClass(IMPORT_JOB_KEY, ImportJobBase.class); String tableName = context.getTableName(); // Instantiate the user's chosen ImportJobBase instance. ImportJobBase importJob = ReflectionUtils.newInstance(jobClass, conf); // And configure the dependencies to inject importJob.setOptions(options); importJob.setMapperClass(mapperClass); importJob.setInputFormatClass(ifClass); importJob.setOutputFormatClass(ofClass); importJob.runImport(tableName, context.getJarFile(), getSplitColumn(options, tableName), conf); }
private int countRecords(int numSplits) throws IOException, InterruptedException { InputFormat<Text, BytesWritable> format = new SequenceFileInputFilter<Text, BytesWritable>(); if (numSplits == 0) { numSplits = random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1; } FileInputFormat.setMaxInputSplitSize(job, fs.getFileStatus(inFile).getLen() / numSplits); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(job.getConfiguration()); // check each split int count = 0; for (InputSplit split : format.getSplits(job)) { RecordReader<Text, BytesWritable> reader = format.createRecordReader(split, context); MapContext<Text, BytesWritable, Text, BytesWritable> mcontext = new MapContextImpl<Text, BytesWritable, Text, BytesWritable>( job.getConfiguration(), context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); try { while (reader.nextKeyValue()) { LOG.info("Accept record " + reader.getCurrentKey().toString()); count++; } } finally { reader.close(); } } return count; }
@Test public void testRecordReaderInit() throws InterruptedException, IOException { // Test that we properly initialize the child recordreader when // CombineFileInputFormat and CombineFileRecordReader are used. TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0); Configuration conf1 = new Configuration(); conf1.set(DUMMY_KEY, "STATE1"); TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId); // This will create a CombineFileRecordReader that itself contains a // DummyRecordReader. InputFormat inputFormat = new ChildRRInputFormat(); Path [] files = { new Path("file1") }; long [] lengths = { 1 }; CombineFileSplit split = new CombineFileSplit(files, lengths); RecordReader rr = inputFormat.createRecordReader(split, context1); assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader); // Verify that the initial configuration is the one being used. // Right after construction the dummy key should have value "STATE1" assertEquals("Invalid initial dummy key value", "STATE1", rr.getCurrentKey().toString()); // Switch the active context for the RecordReader... Configuration conf2 = new Configuration(); conf2.set(DUMMY_KEY, "STATE2"); TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId); rr.initialize(split, context2); // And verify that the new context is updated into the child record reader. assertEquals("Invalid secondary dummy key value", "STATE2", rr.getCurrentKey().toString()); }
@SuppressWarnings("unchecked") public void testAddInputPathWithFormat() throws IOException { final Job conf = Job.getInstance(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); }
/** * From each split sampled, take the first numSamples / numSplits records. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList<K>(numSamples); int splitsToSample = Math.min(maxSplitsSampled, splits.size()); int samplesPerSplit = numSamples / splitsToSample; long records = 0; for (int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID()); RecordReader<K,V> reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); while (reader.nextKeyValue()) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), null)); ++records; if ((i+1) * samplesPerSplit <= records) { break; } } reader.close(); } return (K[])samples.toArray(); }
/** * For each split sampled, emit when the ratio of the number of records * retained to the total record count is less than the specified * frequency. */ @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException { List<InputSplit> splits = inf.getSplits(job); ArrayList<K> samples = new ArrayList<K>(); int splitsToSample = Math.min(maxSplitsSampled, splits.size()); long records = 0; long kept = 0; for (int i = 0; i < splitsToSample; ++i) { TaskAttemptContext samplingContext = new TaskAttemptContextImpl( job.getConfiguration(), new TaskAttemptID()); RecordReader<K,V> reader = inf.createRecordReader( splits.get(i), samplingContext); reader.initialize(splits.get(i), samplingContext); while (reader.nextKeyValue()) { ++records; if ((double) kept / records < freq) { samples.add(ReflectionUtils.copy(job.getConfiguration(), reader.getCurrentKey(), null)); ++kept; } } reader.close(); } return (K[])samples.toArray(); }
/** * Write a partition file for the given job, using the Sampler provided. * Queries the sampler for a sample keyset, sorts by the output key * comparator, selects the keys for each rank, and writes to the destination * returned from {@link TotalOrderPartitioner#getPartitionFile}. */ @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator public static <K,V> void writePartitionFile(Job job, Sampler<K,V> sampler) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = job.getConfiguration(); final InputFormat inf = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); int numPartitions = job.getNumReduceTasks(); K[] samples = (K[])sampler.getSample(inf, job); LOG.info("Using " + samples.length + " samples"); RawComparator<K> comparator = (RawComparator<K>) job.getSortComparator(); Arrays.sort(samples, comparator); Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); FileSystem fs = dst.getFileSystem(conf); if (fs.exists(dst)) { fs.delete(dst, false); } SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class); NullWritable nullValue = NullWritable.get(); float stepSize = samples.length / (float) numPartitions; int last = -1; for(int i = 1; i < numPartitions; ++i) { int k = Math.round(stepSize * i); while (last >= k && comparator.compare(samples[last], samples[k]) == 0) { ++k; } writer.append(samples[k], nullValue); last = k; } writer.close(); }
/** * Convenience method for constructing composite formats. * Given operation (op), Object class (inf), set of paths (p) return: * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) } */ public static String compose(String op, Class<? extends InputFormat> inf, String... path) { final String infname = inf.getName(); StringBuffer ret = new StringBuffer(op + '('); for (String p : path) { compose(infname, p, ret); ret.append(','); } ret.setCharAt(ret.length() - 1, ')'); return ret.toString(); }
/** * Convenience method for constructing composite formats. * Given operation (op), Object class (inf), set of paths (p) return: * {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) } */ public static String compose(String op, Class<? extends InputFormat> inf, Path... path) { ArrayList<String> tmp = new ArrayList<String>(path.length); for (Path p : path) { tmp.add(p.toString()); } return compose(op, inf, tmp.toArray(new String[0])); }
/** * Creates a new TaggedInputSplit. * * @param inputSplit The InputSplit to be tagged * @param conf The configuration to use * @param inputFormatClass The InputFormat class to use for this job * @param mapperClass The Mapper class to use for this job */ @SuppressWarnings("unchecked") public TaggedInputSplit(InputSplit inputSplit, Configuration conf, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass) { this.inputSplitClass = inputSplit.getClass(); this.inputSplit = inputSplit; this.conf = conf; this.inputFormatClass = inputFormatClass; this.mapperClass = mapperClass; }
/** * Constructs the DelegatingRecordReader. * * @param split TaggegInputSplit object * @param context TaskAttemptContext object * * @throws IOException * @throws InterruptedException */ @SuppressWarnings("unchecked") public DelegatingRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // Find the InputFormat and then the RecordReader from the // TaggedInputSplit. TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split; InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils .newInstance(taggedInputSplit.getInputFormatClass(), context .getConfiguration()); originalRR = inputFormat.createRecordReader(taggedInputSplit .getInputSplit(), context); }