Java 类org.apache.hadoop.mapreduce.InputSplit 实例源码
项目:angel
文件:BalanceInputFormat.java
private void addCreatedSplit(List<InputSplit> splitList,
Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
long[] offset = new long[validBlocks.size()];
long[] length = new long[validBlocks.size()];
for (int i = 0; i < validBlocks.size(); i++) {
fl[i] = validBlocks.get(i).onepath;
offset[i] = validBlocks.get(i).offset;
length[i] = validBlocks.get(i).length;
}
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
length, locations.toArray(new String[0]));
splitList.add(thissplit);
}
项目:Wikipedia-Index
文件:XmlInputFormat.java
/**
* 初始化读取资源以及相关的参数也可以放到initialize()方法中去执行
* @param inputSplit
* @param context
* @throws IOException
*/
public XMLRecordReader(InputSplit inputSplit, Configuration context) throws IOException {
/**
* 获取开传入的开始和结束标签
*/
startTag = context.get(START_TAG_KEY).getBytes("UTF-8");
endTag = context.get(END_TAG_KEY).getBytes("UTF-8");
FileSplit fileSplit = (FileSplit) inputSplit;
/**
* 获取分片的开始位置和结束的位置
*/
start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(context);
/**
* 根据分片打开一个HDFS的文件输入流
*/
fsin = fs.open(fileSplit.getPath());
/**
* 定位到分片开始的位置
*/
fsin.seek(start);
}
项目:circus-train
文件:DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<>(nSplits);
for (int i = 0; i < nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()), null));
}
ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
项目:easyhbase
文件:WdTableInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> allSplits = new ArrayList<InputSplit>();
Scan originalScan = getScan();
Scan[] scans = rowKeyDistributor.getDistributedScans(originalScan);
for (Scan scan : scans) {
// Internally super.getSplits(...) uses scan object stored in private variable,
// to re-use the code of super class we switch scan object with scans we
setScan(scan);
List<InputSplit> splits = super.getSplits(context);
allSplits.addAll(splits);
}
// Setting original scan back
setScan(originalScan);
return allSplits;
}
项目:aliyun-maxcompute-data-collectors
文件:NetezzaDataDrivenDBInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
int numMappers = ConfigurationHelper.getJobNumMaps(job);
String boundaryQuery = getDBConf().getInputBoundingQuery();
// Resort to base class if
// dataslice aligned import is not requested
// Not table extract
// No boundary query
// Only one mapper.
if (!getConf().getBoolean(
NetezzaManager.NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false)
|| getDBConf().getInputTableName() == null
|| numMappers == 1
|| (boundaryQuery != null && !boundaryQuery.isEmpty())) {
return super.getSplits(job);
}
// Generate a splitter that splits only on datasliceid. It is an
// integer split. We will just use the lower bounding query to specify
// the restriction of dataslice and set the upper bound to a constant
NetezzaDBDataSliceSplitter splitter = new NetezzaDBDataSliceSplitter();
return splitter.split(getConf(), null, null);
}
项目:hadoop
文件:TestFileInputFormat.java
@Test
public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration();
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
"test:///a1/a2");
Job job = Job.getInstance(conf);
TextInputFormat fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
String[] locations = splits.get(0).getLocations();
Assert.assertEquals(2, locations.length);
SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
Assert.assertEquals(2, locationInfo.length);
SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
locationInfo[0] : locationInfo[1];
SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
locationInfo[0] : locationInfo[1];
Assert.assertTrue(localhostInfo.isOnDisk());
Assert.assertTrue(localhostInfo.isInMemory());
Assert.assertTrue(otherhostInfo.isOnDisk());
Assert.assertFalse(otherhostInfo.isInMemory());
}
项目:aliyun-maxcompute-data-collectors
文件:ExportInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
// Set the max split size based on the number of map tasks we want.
long numTasks = getNumMapTasks(job);
long numFileBytes = getJobSize(job);
long maxSplitSize = numFileBytes / numTasks;
setMaxSplitSize(maxSplitSize);
LOG.debug("Target numMapTasks=" + numTasks);
LOG.debug("Total input bytes=" + numFileBytes);
LOG.debug("maxSplitSize=" + maxSplitSize);
List<InputSplit> splits = super.getSplits(job);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated splits:");
for (InputSplit split : splits) {
LOG.debug(" " + split);
}
}
return splits;
}
项目:aliyun-maxcompute-data-collectors
文件:NetezzaDBDataSliceSplitter.java
@Override
public List<InputSplit> split(Configuration conf, ResultSet results,
String colName) {
// For each map we will add a split such that
// the datasliceid % the mapper index equals the mapper index.
// The query will only be on the lower bound where clause.
// For upper bounds, we will specify a constant clause which always
// evaluates to true
int numSplits = ConfigurationHelper.getConfNumMaps(conf);
List<InputSplit> splitList = new ArrayList<InputSplit>(numSplits);
for (int i = 0; i < numSplits; ++i) {
StringBuilder lowerBoundClause = new StringBuilder(128);
lowerBoundClause.append(" datasliceid % ").append(numSplits)
.append(" = ").append(i);
splitList.add(new DataDrivenDBInputSplit(lowerBoundClause.toString(),
"1 = 1"));
}
return splitList;
}
项目:hadoop
文件:DistSum.java
/** Partitions the summation into parts and then return them as splits */
@Override
public List<InputSplit> getSplits(JobContext context) {
//read sigma from conf
final Configuration conf = context.getConfiguration();
final Summation sigma = SummationWritable.read(DistSum.class, conf);
final int nParts = conf.getInt(N_PARTS, 0);
//create splits
final List<InputSplit> splits = new ArrayList<InputSplit>(nParts);
final Summation[] parts = sigma.partition(nParts);
for(int i = 0; i < parts.length; ++i) {
splits.add(new SummationSplit(parts[i]));
//LOG.info("parts[" + i + "] = " + parts[i]);
}
return splits;
}
项目:hadoop
文件:CompositeInputSplit.java
/**
* {@inheritDoc}
* @throws IOException If the child InputSplit cannot be read, typically
* for failing access checks.
*/
@SuppressWarnings("unchecked") // Generic array assignment
public void readFields(DataInput in) throws IOException {
int card = WritableUtils.readVInt(in);
if (splits == null || splits.length != card) {
splits = new InputSplit[card];
}
Class<? extends InputSplit>[] cls = new Class[card];
try {
for (int i = 0; i < card; ++i) {
cls[i] =
Class.forName(Text.readString(in)).asSubclass(InputSplit.class);
}
for (int i = 0; i < card; ++i) {
splits[i] = ReflectionUtils.newInstance(cls[i], null);
SerializationFactory factory = new SerializationFactory(conf);
Deserializer deserializer = factory.getDeserializer(cls[i]);
deserializer.open((DataInputStream)in);
splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
}
} catch (ClassNotFoundException e) {
throw new IOException("Failed split init", e);
}
}
项目:hadoop
文件:TeraScheduler.java
/**
* Solve the schedule and modify the FileSplit array to reflect the new
* schedule. It will move placed splits to front and unplacable splits
* to the end.
* @return a new list of FileSplits that are modified to have the
* best host as the only host.
* @throws IOException
*/
public List<InputSplit> getNewFileSplits() throws IOException {
solve();
FileSplit[] result = new FileSplit[realSplits.length];
int left = 0;
int right = realSplits.length - 1;
for(int i=0; i < splits.length; ++i) {
if (splits[i].isAssigned) {
// copy the split and fix up the locations
String[] newLocations = {splits[i].locations.get(0).hostname};
realSplits[i] = new FileSplit(realSplits[i].getPath(),
realSplits[i].getStart(), realSplits[i].getLength(), newLocations);
result[left++] = realSplits[i];
} else {
result[right--] = realSplits[i];
}
}
List<InputSplit> ret = new ArrayList<InputSplit>();
for (FileSplit fs : result) {
ret.add(fs);
}
return ret;
}
项目:ditb
文件:IntegrationTestBulkLoad.java
@Override
public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
int taskId = context.getTaskAttemptID().getTaskID().getId();
int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
taskId = taskId + iteration * numMapTasks;
numMapTasks = numMapTasks * numIterations;
long chainId = Math.abs(new Random().nextLong());
chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
}
项目:aliyun-maxcompute-data-collectors
文件:OdpsExportMapper.java
public void map(LongWritable key, Record val, Context context)
throws IOException, InterruptedException{
try {
odpsImpl.parse(val);
context.write(odpsImpl, NullWritable.get());
} catch (Exception e) {
LOG.error("Exception raised during data export");
LOG.error("Exception: ", e);
LOG.error("On input: " + val);
LOG.error("At position " + key);
InputSplit is = context.getInputSplit();
LOG.error("");
LOG.error("Currently processing split:");
LOG.error(is);
LOG.error("");
LOG.error("This issue might not necessarily be caused by current input");
LOG.error("due to the batching nature of export.");
LOG.error("");
throw new IOException("Can't export data, please check failed map task logs", e);
}
}
项目:hadoop
文件:TestSleepJob.java
private void testRandomLocation(int locations, int njobs,
UserGroupInformation ugi) throws Exception {
Configuration configuration = new Configuration();
DebugJobProducer jobProducer = new DebugJobProducer(njobs, configuration);
Configuration jconf = GridmixTestUtils.mrvl.getConfig();
jconf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations);
JobStory story;
int seq = 1;
while ((story = jobProducer.getNextJob()) != null) {
GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0,
story, new Path("ignored"), ugi, seq++);
gridmixJob.buildSplits(null);
List<InputSplit> splits = new SleepJob.SleepInputFormat()
.getSplits(gridmixJob.getJob());
for (InputSplit split : splits) {
assertEquals(locations, split.getLocations().length);
}
}
jobProducer.close();
}
项目:hadoop
文件:TestFileInputFormat.java
@Test
public void testNumInputFilesRecursively() throws Exception {
Configuration conf = getConfiguration();
conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 3, splits.size());
verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
"test:/a1/file1"), splits);
// Using the deprecated configuration
conf = getConfiguration();
conf.set("mapred.input.dir.recursive", "true");
job = Job.getInstance(conf);
splits = fileInputFormat.getSplits(job);
verifySplits(Lists.newArrayList("test:/a1/a2/file2", "test:/a1/a2/file3",
"test:/a1/file1"), splits);
}
项目:aliyun-maxcompute-data-collectors
文件:TestMainframeDatasetInputFormat.java
@Test
public void testRetrieveDatasets() throws IOException {
JobConf conf = new JobConf();
conf.set(DBConfiguration.URL_PROPERTY, "localhost:12345");
conf.set(DBConfiguration.USERNAME_PROPERTY, "user");
conf.set(DBConfiguration.PASSWORD_PROPERTY, "pssword");
// set the password in the secure credentials object
Text PASSWORD_SECRET_KEY = new Text(DBConfiguration.PASSWORD_PROPERTY);
conf.getCredentials().addSecretKey(PASSWORD_SECRET_KEY,
"pssword".getBytes());
String dsName = "dsName1";
conf.set(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME, dsName);
Job job = new Job(conf);
ConfigurationHelper.setJobNumMaps(job, 2);
//format.getSplits(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
splits = ((MainframeDatasetInputFormat<SqoopRecord>) format).getSplits(job);
Assert.assertEquals("test1", ((MainframeDatasetInputSplit) splits.get(0))
.getNextDataset().toString());
Assert.assertEquals("test2", ((MainframeDatasetInputSplit) splits.get(1))
.getNextDataset().toString());
}
项目:ditb
文件:TestWALRecordReader.java
/**
* Create a new reader from the split, and match the edits against the passed columns.
*/
private void testSplit(InputSplit split, byte[]... columns) throws Exception {
final WALRecordReader reader = getReader();
reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
for (byte[] column : columns) {
assertTrue(reader.nextKeyValue());
Cell cell = reader.getCurrentValue().getCells().get(0);
if (!Bytes.equals(column, cell.getQualifier())) {
assertTrue("expected [" + Bytes.toString(column) + "], actual ["
+ Bytes.toString(cell.getQualifier()) + "]", false);
}
}
assertFalse(reader.nextKeyValue());
reader.close();
}
项目:hadoop
文件:TestInputSampler.java
@Override
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
List<InputSplit> splits = null;
try {
splits = getSplits(Job.getInstance(job));
} catch (InterruptedException e) {
throw new IOException(e);
}
org.apache.hadoop.mapred.InputSplit[] retVals =
new org.apache.hadoop.mapred.InputSplit[splits.size()];
for (int i = 0; i < splits.size(); ++i) {
MapredSequentialSplit split = new MapredSequentialSplit(
((SequentialSplit) splits.get(i)).getInit());
retVals[i] = split;
}
return retVals;
}
项目:ditb
文件:ExportSnapshot.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
if (mappers == 0 && snapshotFiles.size() > 0) {
mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, snapshotFiles.size());
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}
List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
List<InputSplit> splits = new ArrayList(groups.size());
for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
splits.add(new ExportSnapshotInputSplit(files));
}
return splits;
}
项目:hadoop
文件:GenerateDistCacheData.java
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
final JobClient client = new JobClient(jobConf);
ClusterStatus stat = client.getClusterStatus(true);
int numTrackers = stat.getTaskTrackers();
final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
// Total size of distributed cache files to be generated
final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
// Get the path of the special file
String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
throw new RuntimeException("Invalid metadata: #files (" + fileCount
+ "), total_size (" + totalSize + "), filelisturi ("
+ distCacheFileList + ")");
}
Path sequenceFile = new Path(distCacheFileList);
FileSystem fs = sequenceFile.getFileSystem(jobConf);
FileStatus srcst = fs.getFileStatus(sequenceFile);
// Consider the number of TTs * mapSlotsPerTracker as number of mappers.
int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
int numSplits = numTrackers * numMapSlotsPerTracker;
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
// Average size of data to be generated by each map task
final long targetSize = Math.max(totalSize / numSplits,
DistributedCacheEmulator.AVG_BYTES_PER_MAP);
long splitStartPosition = 0L;
long splitEndPosition = 0L;
long acc = 0L;
long bytesRemaining = srcst.getLen();
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
while (reader.next(key, value)) {
// If adding this file would put this split past the target size,
// cut the last split and put this file in the next split.
if (acc + key.get() > targetSize && acc != 0) {
long splitSize = splitEndPosition - splitStartPosition;
splits.add(new FileSplit(
sequenceFile, splitStartPosition, splitSize, (String[])null));
bytesRemaining -= splitSize;
splitStartPosition = splitEndPosition;
acc = 0L;
}
acc += key.get();
splitEndPosition = reader.getPosition();
}
} finally {
if (reader != null) {
reader.close();
}
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(
sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
}
return splits;
}
项目:hadoop
文件:CombineFileInputFormat.java
/**
* Create a single split from the list of blocks specified in validBlocks
* Add this new split into splitList.
*/
private void addCreatedSplit(List<InputSplit> splitList,
Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
long[] offset = new long[validBlocks.size()];
long[] length = new long[validBlocks.size()];
for (int i = 0; i < validBlocks.size(); i++) {
fl[i] = validBlocks.get(i).onepath;
offset[i] = validBlocks.get(i).offset;
length[i] = validBlocks.get(i).length;
}
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
length, locations.toArray(new String[0]));
splitList.add(thissplit);
}
项目:hadoop
文件:BaileyBorweinPlouffe.java
/** {@inheritDoc} */
public List<InputSplit> getSplits(JobContext context) {
//get the property values
final int startDigit = context.getConfiguration().getInt(
DIGIT_START_PROPERTY, 1);
final int nDigits = context.getConfiguration().getInt(
DIGIT_SIZE_PROPERTY, 100);
final int nMaps = context.getConfiguration().getInt(
DIGIT_PARTS_PROPERTY, 1);
//create splits
final List<InputSplit> splits = new ArrayList<InputSplit>(nMaps);
final int[] parts = partition(startDigit - 1, nDigits, nMaps);
for (int i = 0; i < parts.length; ++i) {
final int k = i < parts.length - 1 ? parts[i+1]: nDigits+startDigit-1;
splits.add(new BbpSplit(i, parts[i], k - parts[i]));
}
return splits;
}
项目:ditb
文件:TestTableInputFormatScanBase.java
/**
* Tests a MR scan using data skew auto-balance
*
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
InterruptedException,
ClassNotFoundException {
String jobName = "TestJobForNumOfSplits";
LOG.info("Before map/reduce startup - job " + jobName);
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
c.set("hbase.mapreduce.input.autobalance", "true");
c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
c.set(KEY_STARTROW, "");
c.set(KEY_LASTROW, "");
Job job = new Job(c, jobName);
TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
TableInputFormat tif = new TableInputFormat();
tif.setConf(job.getConfiguration());
Assert.assertEquals(new String(TABLE_NAME), new String(table.getTableName()));
List<InputSplit> splits = tif.getSplits(job);
Assert.assertEquals(expectedNumOfSplits, splits.size());
}
项目:hadoop
文件:CombineFileRecordReaderWrapper.java
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// it really should be the same file split at the time the wrapper instance
// was created
assert fileSplitIsValid(context);
delegate.initialize(fileSplit, context);
}
项目:multiple-dimension-spread
文件:MDSSpreadReader.java
@Override
public void initialize( final InputSplit inputSplit, final TaskAttemptContext context ) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)inputSplit;
Configuration config = context.getConfiguration();
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem( config );
long fileLength = fs.getLength( path );
long start = fileSplit.getStart();
long length = fileSplit.getLength();
InputStream in = fs.open( path );
}
项目:angel
文件:BalanceInputFormat.java
public void getMoreSplits(Configuration conf, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits) throws IOException {
// all blocks for all the files in input set
OneFileInfo[] files;
// mapping from a rack name to the list of blocks it has
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String, Set<OneBlockInfo>> nodeToBlocks =
new HashMap<String, Set<OneBlockInfo>>();
files = new OneFileInfo[stats.size()];
if (stats.size() == 0) {
return;
}
// populate all the blocks for all files
long totLength = 0;
int i = 0;
for (FileStatus stat : stats) {
files[i] = new OneFileInfo(stat, conf, isSplitable(conf, stat.getPath()),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, 256 * 1024 * 1024);
totLength += files[i].getLength();
}
int partNum = (int) (totLength / maxSize);
createSplitsGreedy(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
partNum, minSizeNode, minSizeRack, splits);
}
项目:hadoop
文件:DistSum.java
/** @return a list containing a single split of summation */
@Override
public List<InputSplit> getSplits(JobContext context) {
//read sigma from conf
final Configuration conf = context.getConfiguration();
final Summation sigma = SummationWritable.read(DistSum.class, conf);
//create splits
final List<InputSplit> splits = new ArrayList<InputSplit>(1);
splits.add(new SummationSplit(sigma));
return splits;
}
项目:hadoop
文件:TestSplitters.java
@Test(timeout=2000)
public void testBooleanSplitter() throws Exception{
BooleanSplitter splitter = new BooleanSplitter();
ResultSet result = mock(ResultSet.class);
when(result.getString(1)).thenReturn("result1");
List<InputSplit> splits=splitter.split(configuration, result, "column");
assertSplits(new String[] {"column = FALSE column = FALSE",
"column IS NULL column IS NULL"}, splits);
when(result.getString(1)).thenReturn("result1");
when(result.getString(2)).thenReturn("result2");
when(result.getBoolean(1)).thenReturn(true);
when(result.getBoolean(2)).thenReturn(false);
splits=splitter.split(configuration, result, "column");
assertEquals(0, splits.size());
when(result.getString(1)).thenReturn("result1");
when(result.getString(2)).thenReturn("result2");
when(result.getBoolean(1)).thenReturn(false);
when(result.getBoolean(2)).thenReturn(true);
splits = splitter.split(configuration, result, "column");
assertSplits(new String[] {
"column = FALSE column = FALSE", ".*column = TRUE"}, splits);
}
项目:BLASpark
文件:RowPerLineRecordReader.java
@Override
public void initialize(final InputSplit inputSplit,
final TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
this.lrr = new LineRecordReader();
this.lrr.initialize(inputSplit, taskAttemptContext);
}
项目:Wikipedia-Index
文件:XmlInputFormat.java
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit inputSplit, TaskAttemptContext context) {
try {
return new XMLRecordReader(inputSplit, context.getConfiguration());
} catch (IOException e) {
return null;
}
}
项目:spark-util
文件:ErrorHandlingAvroKeyRecordReader.java
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)inputSplit;
if(fileSplit != null && fileSplit.getPath() != null && fileSplit.getPath().toString().endsWith(TEMP_FILE_SUFFIX)){
LOG.info("Not processing Avro tmp file {}", fileSplit.getPath());
}else {
super.initialize(inputSplit, context);
}
}
项目:circus-train
文件:UniformSizeInputFormat.java
/**
* Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be
* copied for all the splits are approximately equal.
*
* @param context JobContext for the job.
* @return The list of uniformly-distributed input-splits.
* @throws IOException: On failure.
* @throws InterruptedException
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS);
if (numSplits == 0) {
return new ArrayList<>();
}
return getSplits(configuration, numSplits,
ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
项目:hadoop
文件:CompositeRecordReader.java
@SuppressWarnings("unchecked")
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
if (kids != null) {
for (int i = 0; i < kids.length; ++i) {
kids[i].initialize(((CompositeInputSplit)split).get(i), context);
if (kids[i].key() == null) {
continue;
}
// get keyclass
if (keyclass == null) {
keyclass = kids[i].createKey().getClass().
asSubclass(WritableComparable.class);
}
// create priority queue
if (null == q) {
cmp = WritableComparator.get(keyclass, conf);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,
ComposableRecordReader<K,?> o2) {
return cmp.compare(o1.key(), o2.key());
}
});
}
// Explicit check for key class agreement
if (!keyclass.equals(kids[i].key().getClass())) {
throw new ClassCastException("Child key classes fail to agree");
}
// add the kid to priority queue if it has any elements
if (kids[i].hasNext()) {
q.add(kids[i]);
}
}
}
}
项目:hadoop
文件:WrappedRecordReader.java
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
rr.initialize(split, context);
conf = context.getConfiguration();
nextKeyValue();
if (!empty) {
keyclass = key.getClass().asSubclass(WritableComparable.class);
valueclass = value.getClass();
if (cmp == null) {
cmp = WritableComparator.get(keyclass, conf);
}
}
}
项目:hadoop
文件:BooleanSplitter.java
public List<InputSplit> split(Configuration conf, ResultSet results, String colName)
throws SQLException {
List<InputSplit> splits = new ArrayList<InputSplit>();
if (results.getString(1) == null && results.getString(2) == null) {
// Range is null to null. Return a null split accordingly.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
return splits;
}
boolean minVal = results.getBoolean(1);
boolean maxVal = results.getBoolean(2);
// Use one or two splits.
if (!minVal) {
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " = FALSE", colName + " = FALSE"));
}
if (maxVal) {
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " = TRUE", colName + " = TRUE"));
}
if (results.getString(1) == null || results.getString(2) == null) {
// Include a null value.
splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
colName + " IS NULL", colName + " IS NULL"));
}
return splits;
}
项目:hadoop
文件:FixedLengthRecordReader.java
@Override
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
final Path file = split.getPath();
initialize(job, split.getStart(), split.getLength(), file);
}
项目:hadoop
文件:JobSplitWriter.java
@SuppressWarnings("unchecked")
private static <T extends InputSplit>
SplitMetaInfo[] writeNewSplits(Configuration conf,
T[] array, FSDataOutputStream out)
throws IOException, InterruptedException {
SplitMetaInfo[] info = new SplitMetaInfo[array.length];
if (array.length != 0) {
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
long offset = out.getPos();
for(T split: array) {
long prevCount = out.getPos();
Text.writeString(out, split.getClass().getName());
Serializer<T> serializer =
factory.getSerializer((Class<T>) split.getClass());
serializer.open(out);
serializer.serialize(split);
long currCount = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
info[i++] =
new JobSplit.SplitMetaInfo(
locations, offset,
split.getLength());
offset += currCount - prevCount;
}
}
return info;
}
项目:aliyun-maxcompute-data-collectors
文件:MySQLDumpInputFormat.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
DataDrivenDBInputFormat.DataDrivenDBInputSplit dbSplit =
(DataDrivenDBInputFormat.DataDrivenDBInputSplit) split;
this.clause = "(" + dbSplit.getLowerClause() + ") AND ("
+ dbSplit.getUpperClause() + ")";
}
项目:hadoop
文件:JobSplitWriter.java
public static void createSplitFiles(Path jobSubmitDir,
Configuration conf, FileSystem fs,
org.apache.hadoop.mapred.InputSplit[] splits)
throws IOException {
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
SplitMetaInfo[] info = writeOldSplits(splits, out, conf);
out.close();
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}
项目:aliyun-maxcompute-data-collectors
文件:MainframeDatasetFTPRecordReader.java
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
super.initialize(inputSplit, taskAttemptContext);
Configuration conf = getConfiguration();
ftp = MainframeFTPClientUtils.getFTPConnection(conf);
if (ftp != null) {
String dsName
= conf.get(MainframeConfiguration.MAINFRAME_INPUT_DATASET_NAME);
ftp.changeWorkingDirectory("'" + dsName + "'");
}
}