Java 类org.apache.hadoop.mapreduce.lib.input.LineRecordReader 实例源码
项目:aliyun-maxcompute-data-collectors
文件:CombineShimRecordReader.java
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
项目:incubator-pirk
文件:JSONRecordReader.java
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException
{
key = new Text();
value = new MapWritable();
jsonParser = new JSONParser();
lineReader = new LineRecordReader();
lineReader.initialize(inputSplit, context);
queryString = context.getConfiguration().get("query", "?q=*");
// Load the data schemas
FileSystem fs = FileSystem.get(context.getConfiguration());
try
{
SystemConfiguration.setProperty("data.schemas", context.getConfiguration().get("data.schemas"));
DataSchemaLoader.initialize(true, fs);
} catch (Exception e)
{
e.printStackTrace();
}
String dataSchemaName = context.getConfiguration().get("dataSchemaName");
dataSchema = DataSchemaRegistry.get(dataSchemaName);
}
项目:bigdata-interop
文件:GsonRecordReader.java
/**
* Called once at initialization to initialize the RecordReader.
*
* @param genericSplit the split that defines the range of records to read.
* @param context the information about the task.
* @throws IOException on IO Error.
*/
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
if (LOG.isDebugEnabled()) {
try {
LOG.debug("initialize('{}', '{}')",
HadoopToStringUtil.toString(genericSplit), HadoopToStringUtil.toString(context));
} catch (InterruptedException ie) {
LOG.debug("InterruptedException during HadoopToStringUtil.toString", ie);
}
}
Preconditions.checkArgument(genericSplit instanceof FileSplit,
"InputSplit genericSplit should be an instance of FileSplit.");
// Get FileSplit.
FileSplit fileSplit = (FileSplit) genericSplit;
// Create the JsonParser.
jsonParser = new JsonParser();
// Initialize the LineRecordReader.
lineReader = new LineRecordReader();
lineReader.initialize(fileSplit, context);
}
项目:zSqoop
文件:CombineShimRecordReader.java
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
项目:sqoop
文件:CombineShimRecordReader.java
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
项目:BLASpark
文件:RowPerLineRecordReader.java
@Override
public void initialize(final InputSplit inputSplit,
final TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
this.lrr = new LineRecordReader();
this.lrr.initialize(inputSplit, taskAttemptContext);
}
项目:hadoop-plus
文件:ARFFManyLineRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
int halfOfBufferSize = pair.value.capacity() / 2;
maxLineSize = context.getConfiguration().getInt(
LineRecordReader.MAX_LINE_LENGTH, halfOfBufferSize);
if (maxLineSize > halfOfBufferSize) {
context.getConfiguration().setInt(LineRecordReader.MAX_LINE_LENGTH,
halfOfBufferSize);
maxLineSize = halfOfBufferSize;
}
r.initialize(split, context);
FileSplit fs = (FileSplit) split;
start = fs.getStart();
}
项目:titan0.5.4-hbase1.1.1-custom
文件:ScriptRecordReader.java
public ScriptRecordReader(final VertexQueryFilter vertexQuery, final TaskAttemptContext context) throws IOException {
this.lineRecordReader = new LineRecordReader();
this.vertexQuery = vertexQuery;
this.configuration = DEFAULT_COMPAT.getContextConfiguration(context);
this.faunusConf = ModifiableHadoopConfiguration.of(configuration);
final FileSystem fs = FileSystem.get(configuration);
try {
this.engine.eval(new InputStreamReader(fs.open(new Path(faunusConf.getInputConf(ROOT_NS).get(SCRIPT_FILE)))));
} catch (Exception e) {
throw new IOException(e.getMessage());
}
}
项目:hbase-in-action
文件:BulkImportJobExample.java
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(inputSplit, taskAttemptContext);
currentKey = new ImmutableBytesWritable();
parser = new JSONParser();
skipBadLines = taskAttemptContext.getConfiguration().getBoolean(
SKIP_LINES_CONF_KEY, true);
}
项目:mrgeo
文件:DelimitedVectorRecordReader.java
@Override
@SuppressWarnings("squid:S2095") // recordReader is closed explictly in the close() method
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException
{
if (split instanceof FileSplit)
{
FileSplit fsplit = (FileSplit) split;
delimitedParser = getDelimitedParser(fsplit.getPath().toString(),
context.getConfiguration());
recordReader = new LineRecordReader();
recordReader.initialize(fsplit, context);
// Skip the first
if (delimitedParser.getSkipFirstLine())
{
// Only skip the first line of the first split. The other
// splits are somewhere in the middle of the original file,
// so their first lines should not be skipped.
if (fsplit.getStart() != 0)
{
nextKeyValue();
}
}
}
else
{
throw new IOException("input split is not a FileSplit");
}
}
项目:geowave
文件:GeonamesDataFileInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split,
TaskAttemptContext context )
throws IOException,
InterruptedException {
return new LineRecordReader();
}
项目:pmr-common
文件:CombineTextInputFormat.java
private void initializeNextReader() throws IOException {
rdr = new LineRecordReader();
rdr.initialize(
new FileSplit(split.getPath(currentSplit),
split.getOffset(currentSplit), split
.getLength(currentSplit), null), context);
++currentSplit;
}
项目:pmr-common
文件:JsonInputFormat.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
rdr = new LineRecordReader();
rdr.initialize(split, context);
}
项目:bigdata_pattern
文件:LogFileRecordReader.java
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext attempt)
throws IOException, InterruptedException {
lineReader = new LineRecordReader();
lineReader.initialize(inputSplit, attempt);
}
项目:seal
文件:TsvInputFormat.java
public TsvRecordReader(Configuration conf, int[] keyFields) throws IOException
{
in = new LineRecordReader();
if (keyFields.length == 0)
{
cutter = null;
builder = null;
}
else
{
cutter = new CutText( conf.get(DELIM_CONF, DELIM_DEFALT), keyFields);
builder = new StringBuilder(1000);
}
}
项目:seal
文件:SamInputFormat.java
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException
{
lineReader = new LineRecordReader();
lineReader.initialize(genericSplit, context);
split = (FileSplit)genericSplit;
value = null;
}
项目:LiteGraph
文件:ScriptRecordReader.java
public ScriptRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
项目:LiteGraph
文件:GraphSONRecordReader.java
public GraphSONRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
项目:hadoop-plus
文件:ARFFManyLineRecordReader.java
public ARFFManyLineRecordReader(byte[] recordDelimiterBytes,
int recordSizeLimit) {
this.r = new LineRecordReader(recordDelimiterBytes);
pair = new PairOfByteBuffers();
}
项目:dog
文件:NewTextInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
项目:mara
文件:CombineTextFileInputFormat.java
public FileLineWritableRecordReader(CombineFileSplit split,
TaskAttemptContext context, Integer splitIndex) {
delegate = new LineRecordReader();
this.splitIndex = splitIndex;
}
项目:titan0.5.4-hbase1.1.1-custom
文件:GraphSONRecordReader.java
public GraphSONRecordReader(VertexQueryFilter vertexQuery) {
lineRecordReader = new LineRecordReader();
this.vertexQuery = vertexQuery;
}
项目:titan0.5.4-hbase1.1.1-custom
文件:RDFRecordReader.java
public RDFRecordReader(final ModifiableHadoopConfiguration configuration) throws IOException {
this.lineRecordReader = new LineRecordReader();
this.handler = new RDFBlueprintsHandler(configuration);
}
项目:tinkerpop
文件:ScriptRecordReader.java
public ScriptRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
项目:tinkerpop
文件:GraphSONRecordReader.java
public GraphSONRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
项目:Hanhan-Hadoop-MapReduce
文件:MultiLineJSONInputFormat.java
public MultiLineRecordReader(byte[] recordDelimiterBytes) {
linereader = new LineRecordReader(recordDelimiterBytes);
}
项目:bigdata-interop
文件:JsonTextBigQueryInputFormat.java
@Override
public RecordReader<LongWritable, Text> createDelegateRecordReader(
InputSplit split, Configuration configuration) throws IOException, InterruptedException {
LOG.debug("createDelegateRecordReader -> new LineRecordReader");
return new LineRecordReader();
}
项目:bigdata-interop
文件:GsonBigQueryInputFormatTest.java
/**
* Tests getSplits method of GsonBigQueryInputFormat.
*/
@Test
public void testGetSplitsSharded()
throws IOException, InterruptedException {
config.setBoolean(BigQueryConfiguration.ENABLE_SHARDED_EXPORT_KEY, true);
// Make the bytes large enough that we will estimate a large number of shards.
table.setNumRows(BigInteger.valueOf(99999L))
.setNumBytes(1024L * 1024 * 1024 * 8);
// If the hinted map.tasks is smaller than the estimated number of files, then we defer
// to the hint.
config.setInt(ShardedExportToCloudStorage.NUM_MAP_TASKS_HINT_KEY, 3);
// Run getSplits method.
GsonBigQueryInputFormat gsonBigQueryInputFormat = new GsonBigQueryInputFormatForTest();
BigQueryJobWrapper wrapper = new BigQueryJobWrapper(config);
wrapper.setJobID(new JobID());
List<InputSplit> splits = gsonBigQueryInputFormat.getSplits(wrapper);
// The base export path should've gotten created.
Path baseExportPath = new Path(config.get(BigQueryConfiguration.TEMP_GCS_PATH_KEY));
FileStatus baseStatus = baseExportPath.getFileSystem(config).getFileStatus(baseExportPath);
assertTrue(baseStatus.isDir());
assertEquals(3, splits.size());
for (int i = 0; i < 3; ++i) {
assertTrue(splits.get(i) instanceof ShardedInputSplit);
DynamicFileListRecordReader<LongWritable, Text> reader =
new DynamicFileListRecordReader<>(new DelegateRecordReaderFactory<LongWritable, Text>() {
@Override
public RecordReader<LongWritable, Text> createDelegateRecordReader(
InputSplit split, Configuration configuration)
throws IOException, InterruptedException {
return new LineRecordReader();
}
});
when(mockTaskAttemptContext.getConfiguration()).thenReturn(config);
reader.initialize(splits.get(i), mockTaskAttemptContext);
Path shardDir = ((ShardedInputSplit) splits.get(i))
.getShardDirectoryAndPattern()
.getParent();
FileStatus shardDirStatus = shardDir.getFileSystem(config).getFileStatus(shardDir);
assertTrue(shardDirStatus.isDir());
}
// Verify correct calls to BigQuery are made.
verify(mockBigQueryHelper, times(2)).createJobReference(
eq(jobProjectId), any(String.class));
verify(mockBigQueryHelper, times(2))
.insertJobOrFetchDuplicate(eq(jobProjectId), any(Job.class));
// Make sure we didn't try to delete the table in sharded mode even though
// DELETE_INTERMEDIATE_TABLE_KEY is true and we had a query.
verify(mockBigQueryHelper, times(1)).getTable(eq(tableRef));
verifyNoMoreInteractions(mockBigqueryTables);
verify(mockBigQueryHelper, atLeastOnce()).getRawBigquery();
}
项目:dog
文件:NewTextInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
项目:spork-streaming
文件:RegExLoader.java
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
项目:spork
文件:RegExLoader.java
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
项目:mrgeo
文件:DelimitedVectorRecordReader.java
public VectorLineProducer(LineRecordReader recordReader)
{
lineRecordReader = recordReader;
}
项目:kangaroo
文件:S3TextInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
final String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
return new LineRecordReader(delimiter != null ? delimiter.getBytes() : null);
}
项目:hadoop-journey
文件:TweetRecordReader.java
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
this.lineRecordReader = new LineRecordReader();
this.lineRecordReader.initialize(inputSplit, taskAttemptContext);
}
项目:giraph-gora
文件:GiraphTextInputFormat.java
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
项目:giraph-research
文件:GiraphTextInputFormat.java
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
项目:giraph-research
文件:GiraphTextInputFormat.java
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
项目:giraph-research
文件:GiraphTextInputFormat.java
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split, TaskAttemptContext context) {
return new LineRecordReader();
}
项目:sedge
文件:RegExLoader.java
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}