Java 类org.apache.hadoop.mapreduce.JobContext 实例源码
项目:hadoop
文件:GenerateData.java
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
final JobClient client =
new JobClient(new JobConf(jobCtxt.getConfiguration()));
ClusterStatus stat = client.getClusterStatus(true);
final long toGen =
jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
if (toGen < 0) {
throw new IOException("Invalid/missing generation bytes: " + toGen);
}
final int nTrackers = stat.getTaskTrackers();
final long bytesPerTracker = toGen / nTrackers;
final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers);
final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
final Matcher m = trackerPattern.matcher("");
for (String tracker : stat.getActiveTrackerNames()) {
m.reset(tracker);
if (!m.find()) {
System.err.println("Skipping node: " + tracker);
continue;
}
final String name = m.group(1);
splits.add(new GenSplit(bytesPerTracker, new String[] { name }));
}
return splits;
}
项目:circus-train
文件:DynamicInputFormat.java
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<>(nSplits);
for (int i = 0; i < nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()), null));
}
ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
项目:easyhbase
文件:WdTableInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> allSplits = new ArrayList<InputSplit>();
Scan originalScan = getScan();
Scan[] scans = rowKeyDistributor.getDistributedScans(originalScan);
for (Scan scan : scans) {
// Internally super.getSplits(...) uses scan object stored in private variable,
// to re-use the code of super class we switch scan object with scans we
setScan(scan);
List<InputSplit> splits = super.getSplits(context);
allSplits.addAll(splits);
}
// Setting original scan back
setScan(originalScan);
return allSplits;
}
项目:aliyun-maxcompute-data-collectors
文件:OraOopDataDrivenDBInputFormat.java
private int getDesiredNumberOfMappers(JobContext jobContext) {
int desiredNumberOfMappers =
jobContext.getConfiguration().getInt(
OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS, -1);
int minMappersAcceptedByOraOop =
OraOopUtilities.getMinNumberOfImportMappersAcceptedByOraOop(jobContext
.getConfiguration());
if (desiredNumberOfMappers < minMappersAcceptedByOraOop) {
LOG.warn(String.format("%s should not be used to perform a sqoop import "
+ "when the number of mappers is %d\n "
+ "i.e. OraOopManagerFactory.accept() should only appect jobs "
+ "where the number of mappers is at least %d",
OraOopConstants.ORAOOP_PRODUCT_NAME, desiredNumberOfMappers,
minMappersAcceptedByOraOop));
}
return desiredNumberOfMappers;
}
项目:hadoop
文件:FileOutputFormat.java
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
项目:aliyun-maxcompute-data-collectors
文件:UpdateOutputFormat.java
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export.");
} else if (null == dbConf.getOutputFieldNames()) {
throw new IOException(
"Output field names are null.");
} else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
throw new IOException("Update key column is not set for export.");
}
}
项目:aliyun-maxcompute-data-collectors
文件:ExportCallOutputFormat.java
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Procedure name is not set for export");
} else if (null == dbConf.getOutputFieldNames()
&& 0 == dbConf.getOutputFieldCount()) {
throw new IOException(
"Output field names are null and zero output field count set.");
}
}
项目:aliyun-maxcompute-data-collectors
文件:SQLServerResilientUpdateOutputFormat.java
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export.");
} else if (null == dbConf.getOutputFieldNames()) {
throw new IOException(
"Output field names are null.");
} else if (null == conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY)) {
throw new IOException("Update key column is not set for export.");
}
}
项目:hadoop
文件:BaileyBorweinPlouffe.java
/** {@inheritDoc} */
public List<InputSplit> getSplits(JobContext context) {
//get the property values
final int startDigit = context.getConfiguration().getInt(
DIGIT_START_PROPERTY, 1);
final int nDigits = context.getConfiguration().getInt(
DIGIT_SIZE_PROPERTY, 100);
final int nMaps = context.getConfiguration().getInt(
DIGIT_PARTS_PROPERTY, 1);
//create splits
final List<InputSplit> splits = new ArrayList<InputSplit>(nMaps);
final int[] parts = partition(startDigit - 1, nDigits, nMaps);
for (int i = 0; i < parts.length; ++i) {
final int k = i < parts.length - 1 ? parts[i+1]: nDigits+startDigit-1;
splits.add(new BbpSplit(i, parts[i], k - parts[i]));
}
return splits;
}
项目:aliyun-maxcompute-data-collectors
文件:SQLServerResilientExportOutputFormat.java
@Override
/** {@inheritDoc} */
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
DBConfiguration dbConf = new DBConfiguration(conf);
// Sanity check all the configuration values we need.
if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
throw new IOException("Database connection URL is not set.");
} else if (null == dbConf.getOutputTableName()) {
throw new IOException("Table name is not set for export");
} else if (null == dbConf.getOutputFieldNames()
&& 0 == dbConf.getOutputFieldCount()) {
throw new IOException(
"Output field names are null and zero output field count set.");
}
}
项目:hadoop
文件:FileOutputCommitter.java
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
if (hasOutputPath()) {
Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
FileSystem fs = pendingJobAttemptsPath
.getFileSystem(context.getConfiguration());
fs.delete(pendingJobAttemptsPath, true);
} else {
LOG.warn("Output Path is null in cleanupJob()");
}
}
项目:hadoop
文件:YarnOutputFiles.java
/**
* Create a local map output index file name on the same volume.
*/
public Path getOutputIndexFileForWriteInVolume(Path existing) {
Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
Path attemptOutputDir = new Path(outputDir,
conf.get(JobContext.TASK_ATTEMPT_ID));
return new Path(attemptOutputDir, MAP_OUTPUT_FILENAME_STRING +
MAP_OUTPUT_INDEX_SUFFIX_STRING);
}
项目:hadoop
文件:FileOutputCommitter.java
/**
* The job has completed so move all committed tasks to the final output dir.
* Delete the temporary directory, including all of the work directories.
* Create a _SUCCESS file to make it as successful.
* @param context the job's context
*/
public void commitJob(JobContext context) throws IOException {
if (hasOutputPath()) {
Path finalOutput = getOutputPath();
FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
if (algorithmVersion == 1) {
for (FileStatus stat: getAllCommittedTaskPaths(context)) {
mergePaths(fs, stat, finalOutput);
}
}
// delete the _temporary folder and create a _done file in the o/p folder
cleanupJob(context);
// True if the job requires output.dir marked on successful job.
// Note that by default it is set to true.
if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
fs.create(markerPath).close();
}
} else {
LOG.warn("Output Path is null in commitJob()");
}
}
项目:ditb
文件:CompactionTool.java
/**
* Returns a split for each store files directory using the block location
* of each file as locality reference.
*/
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
Text key = new Text();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
LineReader reader = new LineReader(fs.open(path));
long pos = 0;
int n;
try {
while ((n = reader.readLine(key)) > 0) {
String[] hosts = getStoreDirHosts(fs, path);
splits.add(new FileSplit(path, pos, n, hosts));
pos += n;
}
} finally {
reader.close();
}
}
return splits;
}
项目:hadoop
文件:TestJobImpl.java
@Test
public void testTransitionsAtFailed() throws IOException {
Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = mock(OutputCommitter.class);
doThrow(new IOException("forcefail"))
.when(committer).setupJob(any(JobContext.class));
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
AppContext mockContext = mock(AppContext.class);
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(false);
JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
job.handle(new JobStartEvent(jobId));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
assertJobState(job, JobStateInternal.FAILED);
Assert.assertEquals(JobState.RUNNING, job.getState());
when(mockContext.hasSuccessfullyUnregistered()).thenReturn(true);
Assert.assertEquals(JobState.FAILED, job.getState());
dispatcher.stop();
commitHandler.stop();
}
项目:hadoop
文件:TestCompressionEmulationUtils.java
@Override
public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
// get the total data to be generated
long toGen =
jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
if (toGen < 0) {
throw new IOException("Invalid/missing generation bytes: " + toGen);
}
// get the total number of mappers configured
int totalMappersConfigured =
jobCtxt.getConfiguration().getInt(MRJobConfig.NUM_MAPS, -1);
if (totalMappersConfigured < 0) {
throw new IOException("Invalid/missing num mappers: "
+ totalMappersConfigured);
}
final long bytesPerTracker = toGen / totalMappersConfigured;
final ArrayList<InputSplit> splits =
new ArrayList<InputSplit>(totalMappersConfigured);
for (int i = 0; i < totalMappersConfigured; ++i) {
splits.add(new GenSplit(bytesPerTracker,
new String[] { "tracker_local" }));
}
return splits;
}
项目:hadoop
文件:TestMRCJCFileOutputCommitter.java
public void testEmptyOutput() throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
// setup
committer.setupJob(jContext);
committer.setupTask(tContext);
// Do not write any output
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
FileUtil.fullyDelete(new File(outDir.toString()));
}
项目:hadoop
文件:DistSum.java
/** Partitions the summation into parts and then return them as splits */
@Override
public List<InputSplit> getSplits(JobContext context) {
//read sigma from conf
final Configuration conf = context.getConfiguration();
final Summation sigma = SummationWritable.read(DistSum.class, conf);
final int nParts = conf.getInt(N_PARTS, 0);
//create splits
final List<InputSplit> splits = new ArrayList<InputSplit>(nParts);
final Summation[] parts = sigma.partition(nParts);
for(int i = 0; i < parts.length; ++i) {
splits.add(new SummationSplit(parts[i]));
//LOG.info("parts[" + i + "] = " + parts[i]);
}
return splits;
}
项目:hadoop
文件:FileOutputFormat.java
/**
* Get the {@link CompressionCodec} for compressing the job outputs.
* @param job the {@link Job} to look in
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} to be used to compress the
* job outputs
* @throws IllegalArgumentException if the class was specified, but not found
*/
public static Class<? extends CompressionCodec>
getOutputCompressorClass(JobContext job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
Configuration conf = job.getConfiguration();
String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
if (name != null) {
try {
codecClass =
conf.getClassByName(name).asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name +
" was not found.", e);
}
}
return codecClass;
}
项目:hadoop
文件:MapTask.java
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
项目:ditb
文件:ExportSnapshot.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir);
int mappers = conf.getInt(CONF_NUM_SPLITS, 0);
if (mappers == 0 && snapshotFiles.size() > 0) {
mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10));
mappers = Math.min(mappers, snapshotFiles.size());
conf.setInt(CONF_NUM_SPLITS, mappers);
conf.setInt(MR_NUM_MAPS, mappers);
}
List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers);
List<InputSplit> splits = new ArrayList(groups.size());
for (List<Pair<SnapshotFileInfo, Long>> files: groups) {
splits.add(new ExportSnapshotInputSplit(files));
}
return splits;
}
项目:hadoop
文件:FileOutputCommitter.java
/**
* Create a file output committer
* @param outputPath the job's output path, or null if you want the output
* committer to act as a noop.
* @param context the task's context
* @throws IOException
*/
@Private
public FileOutputCommitter(Path outputPath,
JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
algorithmVersion =
conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
if (algorithmVersion != 1 && algorithmVersion != 2) {
throw new IOException("Only 1 or 2 algorithm version is supported");
}
if (outputPath != null) {
FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
this.outputPath = fs.makeQualified(outputPath);
}
}
项目:hadoop
文件:NLineInputFormat.java
/**
* Logically splits the set of input files for the job, splits N lines
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status,
job.getConfiguration(), numLinesPerSplit));
}
return splits;
}
项目:hadoop
文件:FileInputFormat.java
/**
* Get the list of input {@link Path}s for the map-reduce job.
*
* @param context The job
* @return the list of input {@link Path}s for the map-reduce job.
*/
public static Path[] getInputPaths(JobContext context) {
String dirs = context.getConfiguration().get(INPUT_DIR, "");
String [] list = StringUtils.split(dirs);
Path[] result = new Path[list.length];
for (int i = 0; i < list.length; i++) {
result[i] = new Path(StringUtils.unEscapeString(list[i]));
}
return result;
}
项目:hadoop
文件:FileInputFormat.java
private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
PathFilter inputFilter, boolean recursive) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter =
fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
return result;
}
项目:hadoop
文件:CompositeInputFormat.java
/**
* Build a CompositeInputSplit from the child InputFormats by assigning the
* ith split from each child to the ith composite split.
*/
@SuppressWarnings("unchecked")
public List<InputSplit> getSplits(JobContext job)
throws IOException, InterruptedException {
setFormat(job.getConfiguration());
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", Long.MAX_VALUE);
return root.getSplits(job);
}
项目:circus-train
文件:CopyCommitter.java
/** @inheritDoc */
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
try {
super.abortJob(jobContext, state);
} finally {
cleanup(jobContext.getConfiguration());
}
}
项目:circus-train
文件:UniformSizeInputFormat.java
/**
* Implementation of InputFormat::getSplits(). Returns a list of InputSplits, such that the number of bytes to be
* copied for all the splits are approximately equal.
*
* @param context JobContext for the job.
* @return The list of uniformly-distributed input-splits.
* @throws IOException: On failure.
* @throws InterruptedException
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
int numSplits = ConfigurationUtil.getInt(configuration, MRJobConfig.NUM_MAPS);
if (numSplits == 0) {
return new ArrayList<>();
}
return getSplits(configuration, numSplits,
ConfigurationUtil.getLong(configuration, S3MapReduceCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED));
}
项目:circus-train
文件:CopyOutputFormat.java
/** @inheritDoc */
@Override
public void checkOutputSpecs(JobContext context) throws IOException {
Configuration conf = context.getConfiguration();
Path workingPath = getCommitDirectory(conf);
if (getCommitDirectory(conf) == null) {
throw new IllegalStateException("Commit directory not configured");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(context.getCredentials(), new Path[] { workingPath }, conf);
}
项目:circus-train
文件:DynamicInputFormatTest.java
@Test
public void getSplits() throws Exception {
S3MapReduceCpOptions options = getOptions();
Configuration configuration = new Configuration();
configuration.set("mapred.map.tasks", String.valueOf(options.getMaxMaps()));
CopyListing.getCopyListing(configuration, CREDENTIALS, options).buildListing(
new Path(cluster.getFileSystem().getUri().toString() + "/tmp/testDynInputFormat/fileList.seq"), options);
JobContext jobContext = new JobContextImpl(configuration, new JobID());
DynamicInputFormat<Text, CopyListingFileStatus> inputFormat = new DynamicInputFormat<>();
List<InputSplit> splits = inputFormat.getSplits(jobContext);
int nFiles = 0;
int taskId = 0;
for (InputSplit split : splits) {
RecordReader<Text, CopyListingFileStatus> recordReader = inputFormat.createRecordReader(split, null);
StubContext stubContext = new StubContext(jobContext.getConfiguration(), recordReader, taskId);
final TaskAttemptContext taskAttemptContext = stubContext.getContext();
recordReader.initialize(splits.get(0), taskAttemptContext);
float previousProgressValue = 0f;
while (recordReader.nextKeyValue()) {
CopyListingFileStatus fileStatus = recordReader.getCurrentValue();
String source = fileStatus.getPath().toString();
assertTrue(expectedFilePaths.contains(source));
final float progress = recordReader.getProgress();
assertTrue(progress >= previousProgressValue);
assertTrue(progress >= 0.0f);
assertTrue(progress <= 1.0f);
previousProgressValue = progress;
++nFiles;
}
assertTrue(recordReader.getProgress() == 1.0f);
++taskId;
}
Assert.assertEquals(expectedFilePaths.size(), nFiles);
}
项目:ditb
文件:MultiTableSnapshotInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
List<TableSnapshotInputFormatImpl.InputSplit> splits =
delegate.getSplits(jobContext.getConfiguration());
List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
}
return rtn;
}
项目:aliyun-maxcompute-data-collectors
文件:NetezzaExternalTableInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
int targetNumTasks = ConfigurationHelper.getJobNumMaps(context);
List<InputSplit> splits = new ArrayList<InputSplit>(targetNumTasks);
for (int i = 0; i < targetNumTasks; ++i) {
splits.add(new NetezzaExternalTableInputSplit(i));
}
return splits;
}
项目:hadoop
文件:TeraGen.java
/**
* Create the desired number of splits, dividing the number of rows
* between the mappers.
*/
public List<InputSplit> getSplits(JobContext job) {
long totalRows = getNumberOfRows(job);
int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
LOG.info("Generating " + totalRows + " using " + numSplits);
List<InputSplit> splits = new ArrayList<InputSplit>();
long currentRow = 0;
for(int split = 0; split < numSplits; ++split) {
long goal =
(long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
splits.add(new RangeInputSplit(currentRow, goal - currentRow));
currentRow = goal;
}
return splits;
}
项目:hadoop
文件:TestDistCacheEmulation.java
/**
* Reset the config properties related to Distributed Cache in the given job
* configuration <code>jobConf</code>.
*
* @param jobConf
* job configuration
*/
private void resetDistCacheConfigProperties(Configuration jobConf) {
// reset current/latest property names
jobConf.setStrings(MRJobConfig.CACHE_FILES, "");
jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "");
jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "");
jobConf.setStrings(JobContext.CACHE_FILE_VISIBILITIES, "");
// reset old property names
jobConf.setStrings("mapred.cache.files", "");
jobConf.setStrings("mapred.cache.files.filesizes", "");
jobConf.setStrings("mapred.cache.files.visibilities", "");
jobConf.setStrings("mapred.cache.files.timestamps", "");
}
项目:aliyun-maxcompute-data-collectors
文件:ConfigurationHelper.java
/**
* Get the (hinted) number of map tasks for a job.
*/
public static int getJobNumMaps(JobContext job) {
if (isLocalJobTracker(job.getConfiguration())) {
return numLocalModeMaps;
} else {
return job.getConfiguration().getInt(
ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
}
}
项目:hadoop
文件:TeraOutputFormat.java
@Override
public void checkOutputSpecs(JobContext job
) throws InvalidJobConfException, IOException {
// Ensure that the output directory is set
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
final Configuration jobConf = job.getConfiguration();
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, jobConf);
final FileSystem fs = outDir.getFileSystem(jobConf);
if (fs.exists(outDir)) {
// existing output dir is considered empty iff its only content is the
// partition file.
//
final FileStatus[] outDirKids = fs.listStatus(outDir);
boolean empty = false;
if (outDirKids != null && outDirKids.length == 1) {
final FileStatus st = outDirKids[0];
final String fname = st.getPath().getName();
empty =
!st.isDirectory() && TeraInputFormat.PARTITION_FILENAME.equals(fname);
}
if (TeraSort.getUseSimplePartitioner(job) || !empty) {
throw new FileAlreadyExistsException("Output directory " + outDir
+ " already exists");
}
}
}
项目:ditb
文件:NMapInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
int count = getNumMapTasks(context.getConfiguration());
List<InputSplit> splits = new ArrayList<InputSplit>(count);
for (int i = 0; i < count; i++) {
splits.add(new NullInputSplit());
}
return splits;
}
项目:hadoop
文件:CombineFileInputFormat.java
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
项目:hadoop
文件:TestJobImpl.java
@Test
public void testAbortJobCalledAfterKillingTasks() throws IOException {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
InlineDispatcher dispatcher = new InlineDispatcher();
dispatcher.init(conf);
dispatcher.start();
OutputCommitter committer = Mockito.mock(OutputCommitter.class);
CommitterEventHandler commitHandler =
createCommitterEventHandler(dispatcher, committer);
commitHandler.init(conf);
commitHandler.start();
JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
//Fail one task. This should land the JobImpl in the FAIL_WAIT state
job.handle(new JobTaskEvent(
MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
TaskState.FAILED));
//Verify abort job hasn't been called
Mockito.verify(committer, Mockito.never())
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
assertJobState(job, JobStateInternal.FAIL_WAIT);
//Verify abortJob is called once and the job failed
Mockito.verify(committer, Mockito.timeout(2000).times(1))
.abortJob((JobContext) Mockito.any(), (State) Mockito.any());
assertJobState(job, JobStateInternal.FAILED);
dispatcher.stop();
}
项目:ditb
文件:TableInputFormat.java
@Override
protected void initialize(JobContext context) throws IOException {
// Do we have to worry about mis-matches between the Configuration from setConf and the one
// in this context?
TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
try {
initializeTable(ConnectionFactory.createConnection(new Configuration(conf)), tableName);
} catch (Exception e) {
LOG.error(StringUtils.stringifyException(e));
}
}