Java 类org.apache.hadoop.mapreduce.RecordReader 实例源码
项目:aliyun-maxcompute-data-collectors
文件:SQLServerDBInputFormat.java
@Override
/** {@inheritDoc} */
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
String dbProductName = getDBProductName();
LOG.debug("Creating db record reader for db product: " + dbProductName);
try {
return new SQLServerDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
} catch (SQLException ex) {
throw new IOException(ex);
}
}
项目:aliyun-maxcompute-data-collectors
文件:Db2DataDrivenDBInputFormat.java
@Override
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// Use DB2-specific db reader
return new Db2DataDrivenDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName());
} catch (SQLException ex) {
throw new IOException(ex);
}
}
项目:aliyun-maxcompute-data-collectors
文件:DataDrivenDBInputFormat.java
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
String dbProductName = getDBProductName();
LOG.debug("Creating db record reader for db product: " + dbProductName);
try {
return new DataDrivenDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
} catch (SQLException ex) {
throw new IOException(ex);
}
}
项目:aliyun-maxcompute-data-collectors
文件:OracleDataDrivenDBInputFormat.java
@Override
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// Use Oracle-specific db reader
return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName());
} catch (SQLException ex) {
throw new IOException(ex);
}
}
项目:aliyun-maxcompute-data-collectors
文件:CombineShimRecordReader.java
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
项目:aliyun-maxcompute-data-collectors
文件:SqlServerInputFormat.java
/** {@inheritDoc} */
@Override
protected RecordReader<LongWritable, T> createDBRecordReader(
DBInputSplit split, Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// Use Microsoft SQL Server specific db reader
return new SqlServerRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName());
} catch (SQLException ex) {
throw new IOException(ex);
}
}
项目:hadoop
文件:TestMRKeyValueTextInputFormat.java
private static List<Text> readSplit(KeyValueTextInputFormat format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<Text, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<Text, Text, Text, Text> mcontext =
new MapContextImpl<Text, Text, Text, Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
reader.close();
return result;
}
项目:hadoop
文件:TestCombineFileInputFormat.java
@Test
public void testReinit() throws Exception {
// Test that a split containing multiple files works correctly,
// with the child RecordReader getting its initialize() method
// called a second time.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf = new Configuration();
TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1"), new Path("file2") };
long [] lengths = { 1, 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// first initialize() call comes from MapTask. We'll do it here.
rr.initialize(split, context);
// First value is first filename.
assertTrue(rr.nextKeyValue());
assertEquals("file1", rr.getCurrentValue().toString());
// The inner RR will return false, because it only emits one (k, v) pair.
// But there's another sub-split to process. This returns true to us.
assertTrue(rr.nextKeyValue());
// And the 2nd rr will have its initialize method called correctly.
assertEquals("file2", rr.getCurrentValue().toString());
// But after both child RR's have returned their singleton (k, v), this
// should also return false.
assertFalse(rr.nextKeyValue());
}
项目:hadoop
文件:TestCombineTextInputFormat.java
private static List<Text> readSplit(InputFormat<LongWritable,Text> format,
InputSplit split, Job job) throws IOException, InterruptedException {
List<Text> result = new ArrayList<Text>();
Configuration conf = job.getConfiguration();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(conf);
RecordReader<LongWritable, Text> reader = format.createRecordReader(split,
MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
MapContext<LongWritable,Text,LongWritable,Text> mcontext =
new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf,
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
result.add(new Text(reader.getCurrentValue()));
}
return result;
}
项目:hadoop
文件:OracleDataDrivenDBInputFormat.java
@Override
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// Use Oracle-specific db reader
return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName());
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
项目:hadoop
文件:Chain.java
/**
* Add mapper(the first mapper) that reads input from the input
* context and writes to queue
*/
@SuppressWarnings("unchecked")
void addMapper(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> output, int index)
throws IOException, InterruptedException {
Configuration conf = getConf(index);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(inputContext);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
Mapper.Context mapperContext = createMapContext(rr, rw,
(MapContext) inputContext, getConf(index));
MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw);
threads.add(runner);
}
项目:hadoop
文件:Chain.java
/**
* Add mapper that reads and writes from/to the queue
*/
@SuppressWarnings("unchecked")
void addMapper(ChainBlockingQueue<KeyValuePair<?, ?>> input,
ChainBlockingQueue<KeyValuePair<?, ?>> output,
TaskInputOutputContext context, int index) throws IOException,
InterruptedException {
Configuration conf = getConf(index);
Class<?> keyClass = conf.getClass(MAPPER_INPUT_KEY_CLASS, Object.class);
Class<?> valueClass = conf.getClass(MAPPER_INPUT_VALUE_CLASS, Object.class);
Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class);
Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS,
Object.class);
RecordReader rr = new ChainRecordReader(keyClass, valueClass, input, conf);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output,
conf);
MapRunner runner = new MapRunner(mappers.get(index), createMapContext(rr,
rw, context, getConf(index)), rr, rw);
threads.add(runner);
}
项目:ditb
文件:IntegrationTestBulkLoad.java
@Override
public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
int taskId = context.getTaskAttemptID().getTaskID().getId();
int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
taskId = taskId + iteration * numMapTasks;
numMapTasks = numMapTasks * numIterations;
long chainId = Math.abs(new Random().nextLong());
chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
}
项目:angel
文件:DFSStorageNewAPI.java
@SuppressWarnings({"rawtypes", "unchecked"})
public void initReader() throws IOException {
try {
Configuration conf = WorkerContext.get().getConf();
String inputFormatClassName =
conf.get(AngelConf.ANGEL_INPUTFORMAT_CLASS,
AngelConf.DEFAULT_ANGEL_INPUTFORMAT_CLASS);
Class<? extends org.apache.hadoop.mapreduce.InputFormat> inputFormatClass =
(Class<? extends org.apache.hadoop.mapreduce.InputFormat>) Class
.forName(inputFormatClassName);
org.apache.hadoop.mapreduce.InputFormat inputFormat =
ReflectionUtils.newInstance(inputFormatClass,
new JobConf(conf));
MRTaskContext taskContext = new MRTaskContext(conf);
org.apache.hadoop.mapreduce.RecordReader<KEY, VALUE> recordReader =
inputFormat.createRecordReader(split, taskContext);
recordReader.initialize(split, taskContext);
setReader(new DFSReaderNewAPI(recordReader));
} catch (Exception x) {
LOG.error("init reader error ", x);
throw new IOException(x);
}
}
项目:Wikipedia-Index
文件:XmlInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit inputSplit, TaskAttemptContext context) {
try {
return new XMLRecordReader(inputSplit, context.getConfiguration());
} catch (IOException e) {
return null;
}
}
项目:circus-train
文件:DynamicInputFormatTest.java
@Test
public void getSplits() throws Exception {
S3MapReduceCpOptions options = getOptions();
Configuration configuration = new Configuration();
configuration.set("mapred.map.tasks", String.valueOf(options.getMaxMaps()));
CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
new Path(cluster.getFileSystem().getUri().toString() + "/tmp/testDynInputFormat/fileList.seq"), options);
JobContext jobContext = new JobContextImpl(configuration, new JobID());
DynamicInputFormat<Text, CopyListingFileStatus> inputFormat = new DynamicInputFormat<>();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
int nFiles = 0;
int taskId = 0;
for (InputSplit split : splits) {
RecordReader<Text, CopyListingFileStatus> recordReader = inputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(), recordReader, taskId);
final TaskAttemptContext taskAttemptContext = stubContext.getContext();
recordReader.initialize(splits.get(0), taskAttemptContext);
float previousProgressValue = 0f;
while (recordReader.nextKeyValue()) {
CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
String source = fileStatus.getPath().toString();
assertTrue(expectedFilePaths.contains(source));
final float progress = recordReader.getProgress();
assertTrue(progress >= previousProgressValue);
assertTrue(progress >= 0.0f);
assertTrue(progress <= 1.0f);
previousProgressValue = progress;
++nFiles;
}
assertTrue(recordReader.getProgress() == 1.0f);
++taskId;
}
Assert.assertEquals(expectedFilePaths.size(), nFiles);
}
项目:circus-train
文件:StubContext.java
public StubContext(Configuration conf, RecordReader<Text, CopyListingFileStatus> reader, int taskId)
throws IOException, InterruptedException {
WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper = new WrappedMapper<>();
MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl = new MapContextImpl<>(conf,
getTaskAttemptID(taskId), reader, writer, null, reporter, null);
this.reader = reader;
mapperContext = wrappedMapper.getMapContext(contextImpl);
}
项目:aliyun-maxcompute-data-collectors
文件:ExportInputFormat.java
@Override
@SuppressWarnings("unchecked")
public RecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
// Use CombineFileRecordReader since this can handle CombineFileSplits
// and instantiate another RecordReader in a loop; do this with the
// CombineShimRecordReader.
RecordReader rr = new CombineFileRecordReader(combineSplit, context,
CombineShimRecordReader.class);
return rr;
}
项目:aliyun-maxcompute-data-collectors
文件:DBInputFormat.java
protected RecordReader<LongWritable, T> createDBRecordReader(
com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit split,
Configuration conf) throws IOException {
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// use database product name to determine appropriate record reader.
if (dbProductName.startsWith("ORACLE")) {
// use Oracle-specific db reader.
return new OracleDBRecordReader<T>(split, inputClass,
conf, getConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else if (dbProductName.startsWith("DB2")) {
// use DB2-specific db reader.
return new Db2DBRecordReader<T>(split, inputClass,
conf, getConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else {
// Generic reader.
return new DBRecordReader<T>(split, inputClass,
conf, getConnection(), getDBConf(), conditions, fieldNames,
tableName);
}
} catch (SQLException ex) {
throw new IOException(ex);
}
}
项目:aliyun-maxcompute-data-collectors
文件:DBInputFormat.java
@Override
/** {@inheritDoc} */
public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return createDBRecordReader(
(com.cloudera.sqoop.mapreduce.db.DBInputFormat.DBInputSplit) split,
context.getConfiguration());
}
项目:aliyun-maxcompute-data-collectors
文件:SqoopHCatExportFormat.java
@Override
public RecordReader<WritableComparable, HCatRecord>
createRecordReader(InputSplit split,
TaskAttemptContext taskContext)
throws IOException, InterruptedException {
LOG.debug("Creating a SqoopHCatRecordReader");
return new SqoopHCatRecordReader(split, taskContext, this);
}
项目:aliyun-maxcompute-data-collectors
文件:SqoopHCatExportFormat.java
public RecordReader<WritableComparable, HCatRecord>
createHCatRecordReader(InputSplit split,
TaskAttemptContext taskContext)
throws IOException, InterruptedException {
LOG.debug("Creating a base HCatRecordReader");
return super.createRecordReader(split, taskContext);
}
项目:aliyun-maxcompute-data-collectors
文件:AvroInputFormat.java
@Override
public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
context.setStatus(split.toString());
return new AvroRecordReader<T>();
}
项目:hadoop-logfile-inputformat
文件:LogfileInputFormat.java
@Override
public RecordReader<Tuple2<Path, Long>, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
if (LOG.isTraceEnabled()) {
LOG.trace("create LogfileRecordReader");
}
return new LogfileRecordReader();
}
项目:hadoop
文件:TestInputSampler.java
@Override
public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return new org.apache.hadoop.mapred.RecordReader
<IntWritable, NullWritable>() {
private final IntWritable i =
new IntWritable(((MapredSequentialSplit)split).getInit());
private int maxVal = i.get() + maxDepth + 1;
@Override
public boolean next(IntWritable key, NullWritable value)
throws IOException {
i.set(i.get() + 1);
return i.get() < maxVal;
}
@Override
public IntWritable createKey() {
return new IntWritable(i.get());
}
@Override
public NullWritable createValue() {
return NullWritable.get();
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException {
return 0;
}
};
}
项目:hadoop
文件:TestMRSequenceFileInputFilter.java
private int countRecords(int numSplits)
throws IOException, InterruptedException {
InputFormat<Text, BytesWritable> format =
new SequenceFileInputFilter<Text, BytesWritable>();
if (numSplits == 0) {
numSplits =
random.nextInt(MAX_LENGTH / (SequenceFile.SYNC_INTERVAL / 20)) + 1;
}
FileInputFormat.setMaxInputSplitSize(job,
fs.getFileStatus(inFile).getLen() / numSplits);
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
// check each split
int count = 0;
for (InputSplit split : format.getSplits(job)) {
RecordReader<Text, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<Text, BytesWritable, Text, BytesWritable> mcontext =
new MapContextImpl<Text, BytesWritable, Text, BytesWritable>(
job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
try {
while (reader.nextKeyValue()) {
LOG.info("Accept record " + reader.getCurrentKey().toString());
count++;
}
} finally {
reader.close();
}
}
return count;
}
项目:hadoop
文件:TestCombineFileInputFormat.java
@SuppressWarnings("unchecked")
@Override
public RecordReader<Text,Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader((CombineFileSplit) split, context,
(Class) DummyRecordReader.class);
}
项目:hadoop
文件:TestCombineFileInputFormat.java
@Test
public void testRecordReaderInit() throws InterruptedException, IOException {
// Test that we properly initialize the child recordreader when
// CombineFileInputFormat and CombineFileRecordReader are used.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf1 = new Configuration();
conf1.set(DUMMY_KEY, "STATE1");
TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1") };
long [] lengths = { 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context1);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// Verify that the initial configuration is the one being used.
// Right after construction the dummy key should have value "STATE1"
assertEquals("Invalid initial dummy key value", "STATE1",
rr.getCurrentKey().toString());
// Switch the active context for the RecordReader...
Configuration conf2 = new Configuration();
conf2.set(DUMMY_KEY, "STATE2");
TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
rr.initialize(split, context2);
// And verify that the new context is updated into the child record reader.
assertEquals("Invalid secondary dummy key value", "STATE2",
rr.getCurrentKey().toString());
}
项目:hadoop
文件:TestFixedLengthInputFormat.java
/**
* Test with no record length set.
*/
@Test (timeout=5000)
public void testNoRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Create the job and do not set fixed record length
Job job = Job.getInstance(defaultConf);
FileInputFormat.setInputPaths(job, workDir);
FixedLengthInputFormat format = new FixedLengthInputFormat();
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for not setting record length:", exceptionThrown);
}
项目:hadoop
文件:TestFixedLengthInputFormat.java
/**
* Test with record length set to 0
*/
@Test (timeout=5000)
public void testZeroRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
Job job = Job.getInstance(defaultConf);
// Set the fixed length record length config property
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), 0);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(
job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for zero record length:", exceptionThrown);
}
项目:hadoop
文件:TestFixedLengthInputFormat.java
/**
* Test with record length set to a negative value
*/
@Test (timeout=5000)
public void testNegativeRecordLength() throws Exception {
localFs.delete(workDir, true);
Path file = new Path(workDir, new String("testFormat.txt"));
createFile(file, null, 10, 10);
// Set the fixed length record length config property
Job job = Job.getInstance(defaultConf);
FixedLengthInputFormat format = new FixedLengthInputFormat();
format.setRecordLength(job.getConfiguration(), -10);
FileInputFormat.setInputPaths(job, workDir);
List<InputSplit> splits = format.getSplits(job);
boolean exceptionThrown = false;
for (InputSplit split : splits) {
try {
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
reader.initialize(split, mcontext);
} catch(IOException ioe) {
exceptionThrown = true;
LOG.info("Exception message:" + ioe.getMessage());
}
}
assertTrue("Exception for negative record length:", exceptionThrown);
}
项目:hadoop
文件:TestFixedLengthInputFormat.java
private static List<String> readSplit(FixedLengthInputFormat format,
InputSplit split,
Job job) throws Exception {
List<String> result = new ArrayList<String>();
TaskAttemptContext context = MapReduceTestUtil.
createDummyMapTaskAttemptContext(job.getConfiguration());
RecordReader<LongWritable, BytesWritable> reader =
format.createRecordReader(split, context);
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
mcontext =
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
LongWritable key;
BytesWritable value;
try {
reader.initialize(split, mcontext);
while (reader.nextKeyValue()) {
key = reader.getCurrentKey();
value = reader.getCurrentValue();
result.add(new String(value.getBytes(), 0, value.getLength()));
}
} finally {
reader.close();
}
return result;
}
项目:hadoop
文件:InputSampler.java
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
项目:hadoop
文件:InputSampler.java
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, Job job)
throws IOException, InterruptedException {
List<InputSplit> splits = inf.getSplits(job);
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.size());
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
TaskAttemptContext samplingContext = new TaskAttemptContextImpl(
job.getConfiguration(), new TaskAttemptID());
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i), samplingContext);
reader.initialize(splits.get(i), samplingContext);
while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
samples.add(ReflectionUtils.copy(job.getConfiguration(),
reader.getCurrentKey(), null));
++kept;
}
}
reader.close();
}
return (K[])samples.toArray();
}
项目:hadoop
文件:CompositeInputFormat.java
/**
* Construct a CompositeRecordReader for the children of this InputFormat
* as defined in the init expression.
* The outermost join need only be composable, not necessarily a composite.
* Mandating TupleWritable isn't strictly correct.
*/
@SuppressWarnings("unchecked") // child types unknown
public RecordReader<K,TupleWritable> createRecordReader(InputSplit split,
TaskAttemptContext taskContext)
throws IOException, InterruptedException {
setFormat(taskContext.getConfiguration());
return root.createRecordReader(split, taskContext);
}
项目:hadoop
文件:CompositeRecordReader.java
/**
* Close all child RRs.
*/
public void close() throws IOException {
if (kids != null) {
for (RecordReader<K,? extends Writable> rr : kids) {
rr.close();
}
}
if (jc != null) {
jc.close();
}
}
项目:hadoop
文件:CompositeRecordReader.java
/**
* Report progress as the minimum of all child RR progress.
*/
public float getProgress() throws IOException, InterruptedException {
float ret = 1.0f;
for (RecordReader<K,? extends Writable> rr : kids) {
ret = Math.min(ret, rr.getProgress());
}
return ret;
}
项目:hadoop
文件:DBInputFormat.java
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
Configuration conf) throws IOException {
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
try {
// use database product name to determine appropriate record reader.
if (dbProductName.startsWith("ORACLE")) {
// use Oracle-specific db reader.
return new OracleDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader.
return new MySQLDBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
} else {
// Generic reader.
return new DBRecordReader<T>(split, inputClass,
conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName);
}
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
项目:hadoop
文件:DataDrivenDBInputFormat.java
protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
Configuration conf) throws IOException {
DBConfiguration dbConf = getDBConf();
@SuppressWarnings("unchecked")
Class<T> inputClass = (Class<T>) (dbConf.getInputClass());
String dbProductName = getDBProductName();
LOG.debug("Creating db record reader for db product: " + dbProductName);
try {
// use database product name to determine appropriate record reader.
if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader.
return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName());
} else {
// Generic reader.
return new DataDrivenDBRecordReader<T>(split, inputClass,
conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName);
}
} catch (SQLException ex) {
throw new IOException(ex.getMessage());
}
}
项目:hadoop
文件:ChainMapContextImpl.java
ChainMapContextImpl(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> base,
RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw,
Configuration conf) {
this.reader = rr;
this.output = rw;
this.base = base;
this.conf = conf;
}