Java 类org.apache.hadoop.mapreduce.lib.input.FileSplit 实例源码
项目:ViraPipe
文件:InterleaveMulti.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
项目:ViraPipe
文件:InterleaveMulti.java
private static void splitFastq(FileStatus fst, String fqPath, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
Path fqpath = new Path(fqPath);
String fqname = fqpath.getName();
String[] ns = fqname.split("\\.");
//TODO: Handle also compressed files
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
splitRDD.foreach( split -> {
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), split);
writeFastqFile(fqreader, new Configuration(), splitDir + "/split_" + split.getStart() + "." + ns[1]);
});
}
项目:ViraPipe
文件:Decompress.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir+"/"+path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
项目:ViraPipe
文件:DecompressInterleave.java
public static void interleaveSplitFastq(FileStatus fst, FileStatus fst2, String splitDir, int splitlen, JavaSparkContext sc) throws IOException {
String[] ns = fst.getPath().getName().split("\\.");
//TODO: Handle also compressed files
List<FileSplit> nlif = NLineInputFormat.getSplitsForFile(fst, sc.hadoopConfiguration(), splitlen);
List<FileSplit> nlif2 = NLineInputFormat.getSplitsForFile(fst2, sc.hadoopConfiguration(), splitlen);
JavaRDD<FileSplit> splitRDD = sc.parallelize(nlif);
JavaRDD<FileSplit> splitRDD2 = sc.parallelize(nlif2);
JavaPairRDD<FileSplit, FileSplit> zips = splitRDD.zip(splitRDD2);
zips.foreach( splits -> {
Path path = splits._1.getPath();
FastqRecordReader fqreader = new FastqRecordReader(new Configuration(), splits._1);
FastqRecordReader fqreader2 = new FastqRecordReader(new Configuration(), splits._2);
writeInterleavedSplits(fqreader, fqreader2, new Configuration(), splitDir, path.getParent().getName()+"_"+splits._1.getStart()+".fq");
});
}
项目:Wikipedia-Index
文件:XmlInputFormat.java
/**
* 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行
* @param inputSplit
* @param context
* @throws IOException
*/
public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {
/**
* 获取开传入的开始和结束标签
*/
startTag = context.get(START_TAG_KEY).getBytes("UTF-8");
endTag = context.get(END_TAG_KEY).getBytes("UTF-8");
FileSplit fileSplit = (FileSplit) inputSplit;
/**
* 获取分片的开始位置和结束的位置
*/
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(context);
/**
* 根据分片打开一个HDFS的文件输入流
*/
fsin = fs.open(fileSplit.getPath());
/**
* 定位到分片开始的位置
*/
fsin.seek(start);
}
项目:circus-train
文件:DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<>(nSplits);
for (int i = 0; i < nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()), null));
}
ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
项目:aliyun-maxcompute-data-collectors
文件:MergeMapperBase.java
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
InputSplit is = context.getInputSplit();
FileSplit fs = (FileSplit) is;
Path splitPath = fs.getPath();
if (splitPath.toString().startsWith(
conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
this.isNew = true;
} else if (splitPath.toString().startsWith(
conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
this.isNew = false;
} else {
throw new IOException("File " + splitPath + " is not under new path "
+ conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path "
+ conf.get(MergeJob.MERGE_OLD_PATH_KEY));
}
}
项目:hadoop
文件:TestJobSplitWriter.java
@Test
public void testMaxBlockLocationsNewSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new FileSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
项目:hadoop
文件:TestJobSplitWriter.java
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
org.apache.hadoop.mapred.FileSplit split =
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new org.apache.hadoop.mapred.InputSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
项目:hadoop
文件:TeraInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
if (job == lastContext) {
return lastResult;
}
long t1, t2, t3;
t1 = System.currentTimeMillis();
lastContext = job;
lastResult = super.getSplits(job);
t2 = System.currentTimeMillis();
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
TeraScheduler scheduler = new TeraScheduler(
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
lastResult = scheduler.getNewFileSplits();
t3 = System.currentTimeMillis();
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
}
return lastResult;
}
项目:hadoop
文件:TeraScheduler.java
public TeraScheduler(FileSplit[] realSplits,
Configuration conf) throws IOException {
this.realSplits = realSplits;
this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
Map<String, Host> hostTable = new HashMap<String, Host>();
splits = new Split[realSplits.length];
for(FileSplit realSplit: realSplits) {
Split split = new Split(realSplit.getPath().toString());
splits[remainingSplits++] = split;
for(String hostname: realSplit.getLocations()) {
Host host = hostTable.get(hostname);
if (host == null) {
host = new Host(hostname);
hostTable.put(hostname, host);
hosts.add(host);
}
host.splits.add(split);
split.locations.add(host);
}
}
}
项目:hadoop
文件:TeraScheduler.java
/**
* Solve the schedule and modify the FileSplit array to reflect the new
* schedule. It will move placed splits to front and unplacable splits
* to the end.
* @return a new list of FileSplits that are modified to have the
* best host as the only host.
* @throws IOException
*/
public List<InputSplit> getNewFileSplits() throws IOException {
solve();
FileSplit[] result = new FileSplit[realSplits.length];
int left = 0;
int right = realSplits.length - 1;
for(int i=0; i < splits.length; ++i) {
if (splits[i].isAssigned) {
// copy the split and fix up the locations
String[] newLocations = {splits[i].locations.get(0).hostname};
realSplits[i] = new FileSplit(realSplits[i].getPath(),
realSplits[i].getStart(), realSplits[i].getLength(), newLocations);
result[left++] = realSplits[i];
} else {
result[right--] = realSplits[i];
}
}
List<InputSplit> ret = new ArrayList<InputSplit>();
for (FileSplit fs : result) {
ret.add(fs);
}
return ret;
}
项目:hadoop
文件:DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunks)
throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
for (int i=0; i< nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
项目:hadoop
文件:TestUniformSizeInputFormat.java
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
long lastEnd = 0;
//Verify if each split's start is matching with the previous end and
//we are not missing anything
for (InputSplit split : splits) {
FileSplit fileSplit = (FileSplit) split;
long start = fileSplit.getStart();
Assert.assertEquals(lastEnd, start);
lastEnd = start + fileSplit.getLength();
}
//Verify there is nothing more to read from the input file
SequenceFile.Reader reader
= new SequenceFile.Reader(cluster.getFileSystem().getConf(),
SequenceFile.Reader.file(listFile));
try {
reader.seek(lastEnd);
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
} finally {
IOUtils.closeStream(reader);
}
}
项目:hadoop
文件:GenerateDistCacheData.java
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
final JobClient client = new JobClient(jobConf);
ClusterStatus stat = client.getClusterStatus(true);
int numTrackers = stat.getTaskTrackers();
final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
// Total size of distributed cache files to be generated
final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
// Get the path of the special file
String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ "), total_size (" + totalSize + "), filelisturi ("
+ distCacheFileList + ")");
}
Path sequenceFile = new Path(distCacheFileList);
FileSystem fs = sequenceFile.getFileSystem(jobConf);
FileStatus srcst = fs.getFileStatus(sequenceFile);
// Consider the number of TTs * mapSlotsPerTracker as number of mappers.
int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
int numSplits = numTrackers * numMapSlotsPerTracker;
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
// Average size of data to be generated by each map task
final long targetSize = Math.max(totalSize / numSplits,
DistributedCacheEmulator.AVG_BYTES_PER_MAP);
long splitStartPosition = 0L;
long splitEndPosition = 0L;
long acc = 0L;
long bytesRemaining = srcst.getLen();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
while (reader.next(key, value)) {
// If adding this file would put this split past the target size,
// cut the last split and put this file in the next split.
if (acc + key.get() > targetSize && acc != 0) {
long splitSize = splitEndPosition - splitStartPosition;
splits.add(new FileSplit(
sequenceFile, splitStartPosition, splitSize, (String[])null));
bytesRemaining -= splitSize;
splitStartPosition = splitEndPosition;
acc = 0L;
}
acc += key.get();
splitEndPosition = reader.getPosition();
}
} finally {
if (reader != null) {
reader.close();
}
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(
sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
}
return splits;
}
项目:SparkSeq
文件:SingleFastqInputFormat.java
public SingleFastqRecordReader(Configuration conf, FileSplit split) throws IOException {
file = split.getPath();
start = split.getStart();
end = start + split.getLength();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(file);
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(file);
if (codec == null) { // no codec. Uncompressed file.
positionAtFirstRecord(fileIn);
inputStream = fileIn;
} else {
// compressed file
if (start != 0) {
throw new RuntimeException("Start position for compressed file is not 0! (found " + start + ")");
}
inputStream = codec.createInputStream(fileIn);
end = Long.MAX_VALUE; // read until the end of the file
}
lineReader = new LineReader(inputStream);
}
项目:pipelines
文件:DwCAInputFormat.java
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// what follows is very questionable but a quick test
// the file is read from HDFS and copied to a temporary location
FileSplit split = (FileSplit)inputSplit;
Configuration job = context.getConfiguration();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
java.nio.file.Path tmpFile = Files.createTempFile("tmp", ".zip"); // consider using job and task IDs?
FSDataInputStream fileIn = fs.open(file);
FileOutputStream fileOut = new FileOutputStream(tmpFile.toFile());
LOG.info("Copying from {} to {}", file, tmpFile);
IOUtils.copyBytes(fileIn, fileOut, 100000, true);
// having copied the file out of HDFS onto the local FS in a temp folder, we prepare it (sorts files)
java.nio.file.Path tmpSpace = Files.createTempDirectory("tmp-" + context.getTaskAttemptID().getJobID().getId() +
":" + context.getTaskAttemptID().getId());
reader = new DwCAReader(tmpFile.toAbsolutePath().toString(), tmpSpace.toAbsolutePath().toString());
nextKeyValue();
}
项目:dataSqueeze
文件:OrcCompactionMapper.java
/**
* {@inheritDoc}
*/
protected void map(final Object key, final OrcStruct value, final Context context) throws IOException, InterruptedException {
if (value!= null && value.toString() != null && value.toString().isEmpty()) {
return;
}
// Mapper sends data with parent directory path as keys to retain directory structure
final FileSplit fileSplit = (FileSplit) context.getInputSplit();
final Path filePath = fileSplit.getPath();
final String parentFilePath = String.format("%s/", filePath.getParent().toString());
log.debug("Parent file path {}", parentFilePath);
if (!fileSizesMap.containsKey(filePath.toString())) {
if (fileSystem == null){
final URI uri = URI.create(filePath.toString());
fileSystem = FileSystem.get(uri, configuration);
}
final FileStatus[] listStatuses = fileSystem.listStatus(filePath);
for (FileStatus fileStatus : listStatuses) {
if (!fileStatus.isDirectory()) {
fileSizesMap.put(fileStatus.getPath().toString(), fileStatus.getLen());
log.info("Entry added to fileSizes Map {} {}", fileStatus.getPath().toString(), fileStatus.getLen());
}
}
}
final Text parentFilePathKey = new Text(parentFilePath);
final Text filePathKey = new Text(filePath.toString());
final OrcValue orcValue = new OrcValue();
orcValue.value = value;
final Long fileSize = fileSizesMap.get(filePath.toString());
if (fileSize < threshold) {
context.write(parentFilePathKey, orcValue);
} else {
context.write(filePathKey, orcValue);
}
}
项目:ditb
文件:CompactionTool.java
/**
* Returns a split for each store files directory using the block location
* of each file as locality reference.
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
Text key = new Text();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
LineReader reader = new LineReader(fs.open(path));
long pos = 0;
int n;
try {
while ((n = reader.readLine(key)) > 0) {
String[] hosts = getStoreDirHosts(fs, path);
splits.add(new FileSplit(path, pos, n, hosts));
pos += n;
}
} finally {
reader.close();
}
}
return splits;
}
项目:ditb
文件:IntegrationTestBigLinkedList.java
private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
InterruptedException {
SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
// Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
// what is missing.
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
InputSplit is =
new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
rr.initialize(is, context);
while (rr.nextKeyValue()) {
rr.getCurrentKey();
BytesWritable bw = rr.getCurrentValue();
if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
byte[] key = new byte[rr.getCurrentKey().getLength()];
System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
.getLength());
result.add(key);
}
}
}
return result;
}
项目:accumulo-wikisearch
文件:AggregatingRecordReaderTest.java
@Test
public void testPartialXML2() throws Exception {
File f = createFile(xml3);
// Create FileSplit
Path p = new Path(f.toURI().toString());
WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
// Initialize the RecordReader
AggregatingRecordReader reader = new AggregatingRecordReader();
reader.initialize(split, ctx);
assertTrue(reader.nextKeyValue());
testXML(reader.getCurrentValue(), "A", "B", "");
assertTrue(reader.nextKeyValue());
testXML(reader.getCurrentValue(), "C", "D", "");
assertTrue(reader.nextKeyValue());
try {
testXML(reader.getCurrentValue(), "E", "", "");
fail("Fragment returned, and it somehow passed XML parsing.");
} catch (SAXParseException e) {
// ignore
}
assertTrue(!reader.nextKeyValue());
}
项目:big-c
文件:TeraInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
if (job == lastContext) {
return lastResult;
}
long t1, t2, t3;
t1 = System.currentTimeMillis();
lastContext = job;
lastResult = super.getSplits(job);
t2 = System.currentTimeMillis();
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
TeraScheduler scheduler = new TeraScheduler(
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
lastResult = scheduler.getNewFileSplits();
t3 = System.currentTimeMillis();
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
}
return lastResult;
}
项目:aliyun-oss-hadoop-fs
文件:TestJobSplitWriter.java
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
org.apache.hadoop.mapred.FileSplit split =
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new org.apache.hadoop.mapred.InputSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
项目:big-c
文件:TestUniformSizeInputFormat.java
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
long lastEnd = 0;
//Verify if each split's start is matching with the previous end and
//we are not missing anything
for (InputSplit split : splits) {
FileSplit fileSplit = (FileSplit) split;
long start = fileSplit.getStart();
Assert.assertEquals(lastEnd, start);
lastEnd = start + fileSplit.getLength();
}
//Verify there is nothing more to read from the input file
SequenceFile.Reader reader
= new SequenceFile.Reader(cluster.getFileSystem().getConf(),
SequenceFile.Reader.file(listFile));
try {
reader.seek(lastEnd);
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
Assert.assertFalse(reader.next(srcRelPath, srcFileStatus));
} finally {
IOUtils.closeStream(reader);
}
}
项目:aliyun-oss-hadoop-fs
文件:TeraInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
if (job == lastContext) {
return lastResult;
}
long t1, t2, t3;
t1 = System.currentTimeMillis();
lastContext = job;
lastResult = super.getSplits(job);
t2 = System.currentTimeMillis();
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
if (job.getConfiguration().getBoolean(TeraSortConfigKeys.USE_TERA_SCHEDULER.key(),
TeraSortConfigKeys.DEFAULT_USE_TERA_SCHEDULER)) {
TeraScheduler scheduler = new TeraScheduler(
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
lastResult = scheduler.getNewFileSplits();
t3 = System.currentTimeMillis();
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
}
return lastResult;
}
项目:accumulo-wikisearch
文件:WikipediaInputFormat.java
@Override
public void readFields(DataInput in) throws IOException {
Path file = new Path(in.readUTF());
long start = in.readLong();
long length = in.readLong();
String [] hosts = null;
if(in.readBoolean())
{
int numHosts = in.readInt();
hosts = new String[numHosts];
for(int i = 0; i < numHosts; i++)
hosts[i] = in.readUTF();
}
fileSplit = new FileSplit(file, start, length, hosts);
partition = in.readInt();
}
项目:aliyun-oss-hadoop-fs
文件:TeraScheduler.java
public TeraScheduler(FileSplit[] realSplits,
Configuration conf) throws IOException {
this.realSplits = realSplits;
this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
Map<String, Host> hostTable = new HashMap<String, Host>();
splits = new Split[realSplits.length];
for(FileSplit realSplit: realSplits) {
Split split = new Split(realSplit.getPath().toString());
splits[remainingSplits++] = split;
for(String hostname: realSplit.getLocations()) {
Host host = hostTable.get(hostname);
if (host == null) {
host = new Host(hostname);
hostTable.put(hostname, host);
hosts.add(host);
}
host.splits.add(split);
split.locations.add(host);
}
}
}
项目:aliyun-oss-hadoop-fs
文件:DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunks)
throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
for (int i=0; i< nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
项目:accumulo-wikisearch
文件:AggregatingRecordReaderTest.java
public void testPartialXML2WithNoPartialRecordsReturned() throws Exception {
conf.set(AggregatingRecordReader.RETURN_PARTIAL_MATCHES, Boolean.toString(false));
File f = createFile(xml3);
// Create FileSplit
Path p = new Path(f.toURI().toString());
WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
// Initialize the RecordReader
AggregatingRecordReader reader = new AggregatingRecordReader();
reader.initialize(split, ctx);
assertTrue(reader.nextKeyValue());
testXML(reader.getCurrentValue(), "A", "B", "");
assertTrue(reader.nextKeyValue());
testXML(reader.getCurrentValue(), "C", "D", "");
assertTrue(!reader.nextKeyValue());
}
项目:big-c
文件:DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext,
List<DynamicInputChunk> chunks)
throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
for (int i=0; i< nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()),
null));
}
DistCpUtils.publish(jobContext.getConfiguration(),
CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
项目:fst-bench
文件:TeraInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
if (job == lastContext) {
return lastResult;
}
long t1, t2, t3;
t1 = System.currentTimeMillis();
lastContext = job;
lastResult = super.getSplits(job);
t2 = System.currentTimeMillis();
System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) {
TeraScheduler scheduler = new TeraScheduler(
lastResult.toArray(new FileSplit[0]), job.getConfiguration());
lastResult = scheduler.getNewFileSplits();
t3 = System.currentTimeMillis();
System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
}
return lastResult;
}
项目:fst-bench
文件:TeraScheduler.java
public TeraScheduler(FileSplit[] realSplits,
Configuration conf) throws IOException {
this.realSplits = realSplits;
this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
Map<String, Host> hostTable = new HashMap<String, Host>();
splits = new Split[realSplits.length];
for(FileSplit realSplit: realSplits) {
Split split = new Split(realSplit.getPath().toString());
splits[remainingSplits++] = split;
for(String hostname: realSplit.getLocations()) {
Host host = hostTable.get(hostname);
if (host == null) {
host = new Host(hostname);
hostTable.put(hostname, host);
hosts.add(host);
}
host.splits.add(split);
split.locations.add(host);
}
}
}
项目:marklogic-contentpump
文件:CompressedDelimitedTextReader.java
@Override
public void initialize(InputSplit inSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
initConfig(context);
initDocType();
initDelimConf();
setFile(((FileSplit) inSplit).getPath());
fs = file.getFileSystem(context.getConfiguration());
FileStatus status = fs.getFileStatus(file);
if (status.isDirectory()) {
iterator = new FileIterator((FileSplit)inSplit, context);
inSplit = iterator.next();
}
initStream(inSplit);
}
项目:marklogic-contentpump
文件:ArchiveRecordReader.java
@Override
public void initialize(InputSplit inSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
initConfig(context);
allowEmptyMeta = conf.getBoolean(
CONF_INPUT_ARCHIVE_METADATA_OPTIONAL, false);
setFile(((FileSplit) inSplit).getPath());
fs = file.getFileSystem(context.getConfiguration());
FileStatus status = fs.getFileStatus(file);
if(status.isDirectory()) {
iterator = new FileIterator((FileSplit)inSplit, context);
inSplit = iterator.next();
}
initStream(inSplit);
}
项目:accumulo-wikisearch
文件:AggregatingRecordReaderTest.java
@Test
public void testIncorrectArgs() throws Exception {
File f = createFile(xml1);
// Create FileSplit
Path p = new Path(f.toURI().toString());
WikipediaInputSplit split = new WikipediaInputSplit(new FileSplit(p, 0, f.length(), null), 0);
AggregatingRecordReader reader = new AggregatingRecordReader();
try {
// Clear the values for BEGIN and STOP TOKEN
conf.set(AggregatingRecordReader.START_TOKEN, null);
conf.set(AggregatingRecordReader.END_TOKEN, null);
reader.initialize(split, ctx);
// If we got here, then the code didnt throw an exception
fail();
} catch (Exception e) {
// Do nothing, we succeeded
f = null;
}
reader.close();
}
项目:marklogic-contentpump
文件:AggregateXMLReader.java
protected void initStreamReader(InputSplit inSplit) throws IOException,
InterruptedException {
start = 0;
end = inSplit.getLength();
overflow = false;
fInputStream = openFile(inSplit, true);
if (fInputStream == null) {
return;
}
try {
xmlSR = f.createXMLStreamReader(fInputStream, encoding);
} catch (XMLStreamException e) {
LOG.error(e.getMessage(), e);
}
if (useAutomaticId) {
idGen = new IdGenerator(file.toUri().getPath() + "-"
+ ((FileSplit) inSplit).getStart());
}
}
项目:hadoop-wiki-index
文件:XmlInputFormat.java
/**
* 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行
* @param inputSplit
* @param context
* @throws IOException
*/
public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {
/**
* 获取开传入的开始和结束标签
*/
startTag = context.get(START_TAG_KEY).getBytes("UTF-8");
endTag = context.get(END_TAG_KEY).getBytes("UTF-8");
FileSplit fileSplit = (FileSplit) inputSplit;
/**
* 获取分片的开始位置和结束的位置
*/
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(context);
/**
* 根据分片打开一个HDFS的文件输入流
*/
fsin = fs.open(fileSplit.getPath());
/**
* 定位到分片开始的位置
*/
fsin.seek(start);
}
项目:marklogic-contentpump
文件:ImportRecordReader.java
public FSDataInputStream openFile(InputSplit inSplit,
boolean configCol) throws IOException {
while (true) {
setFile(((FileSplit) inSplit).getPath());
if (configCol) {
configFileNameAsCollection(conf, file);
}
try {
return fs.open(file);
} catch (IllegalArgumentException e){
LOG.error("Input file skipped, reason: " + e.getMessage());
if (iterator != null &&
iterator.hasNext()) {
inSplit = iterator.next();
} else {
return null;
}
}
}
}
项目:big-c
文件:TeraScheduler.java
public TeraScheduler(FileSplit[] realSplits,
Configuration conf) throws IOException {
this.realSplits = realSplits;
this.slotsPerHost = conf.getInt(TTConfig.TT_MAP_SLOTS, 4);
Map<String, Host> hostTable = new HashMap<String, Host>();
splits = new Split[realSplits.length];
for(FileSplit realSplit: realSplits) {
Split split = new Split(realSplit.getPath().toString());
splits[remainingSplits++] = split;
for(String hostname: realSplit.getLocations()) {
Host host = hostTable.get(hostname);
if (host == null) {
host = new Host(hostname);
hostTable.put(hostname, host);
hosts.add(host);
}
host.splits.add(split);
split.locations.add(host);
}
}
}
项目:marklogic-contentpump
文件:CombineDocumentSplit.java
public void readFields(DataInput in) throws IOException {
// splits
int splitSize = in.readInt();
splits = new ArrayList<FileSplit>();
for (int i = 0; i < splitSize; i++) {
Path path = new Path(Text.readString(in));
long start = in.readLong();
long len = in.readLong();
FileSplit split = new FileSplit(path, start, len, null);
splits.add(split);
}
// length
length = in.readLong();
// locations
locations = new HashSet<String>();
}