Java 类org.apache.hadoop.mapred.TaskAttemptID 实例源码
项目:es-hadoop-v2.2.0
文件:HadoopCfgUtils.java
public static TaskID getTaskID(Configuration cfg) {
// first try with the attempt since some Hadoop versions mix the two
String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
if (StringUtils.hasText(taskAttemptId)) {
try {
return TaskAttemptID.forName(taskAttemptId).getTaskID();
} catch (IllegalArgumentException ex) {
// the task attempt is invalid (Tez in particular uses the wrong string - see #346)
// try to fallback to task id
return parseTaskIdFromTaskAttemptId(taskAttemptId);
}
}
String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
// double-check task id bug in Hadoop 2.5.x
if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
return TaskID.forName(taskIdProp);
}
return null;
}
项目:hadoop-EAR
文件:ReduceTask.java
/**
* Only get the locations that are fetchable (not copied or not made
* obsolete).
*
* @param copiedMapOutputs Synchronized set of already copied map outputs
* @param obsoleteMapIdsSet Synchronized set of obsolete map ids
* @return List of fetchable locations (could be empty)
*/
List<MapOutputLocation> getFetchableLocations(
Set<TaskID> copiedMapOutputs,
Set<TaskAttemptID> obsoleteMapIdsSet) {
List<MapOutputLocation> fetchableLocations =
new ArrayList<MapOutputLocation>(locations.size());
for (MapOutputLocation location : locations) {
// Check if we still need to copy the output from this location
if (copiedMapOutputs.contains(location.getTaskId())) {
location.errorType = CopyOutputErrorType.NO_ERROR;
location.sizeRead = CopyResult.OBSOLETE;
LOG.info("getFetchableLocations: Already " +
"copied - " + location + ", will not try again");
} else if (obsoleteMapIds.contains(location.getTaskAttemptId())) {
location.errorType = CopyOutputErrorType.NO_ERROR;
location.sizeRead = CopyResult.OBSOLETE;
LOG.info("getFetchableLocations: Obsolete - " + location + ", " +
"will not try now.");
} else {
fetchableLocations.add(location);
}
}
return fetchableLocations;
}
项目:hazelcast-jet
文件:WriteHdfsP.java
@Override @Nonnull
public List<Processor> get(int count) {
return processorList = range(0, count).mapToObj(i -> {
try {
String uuid = context.jetInstance().getCluster().getLocalMember().getUuid();
TaskAttemptID taskAttemptID = new TaskAttemptID("jet-node-" + uuid, jobContext.getJobID().getId(),
JOB_SETUP, i, 0);
jobConf.set("mapred.task.id", taskAttemptID.toString());
jobConf.setInt("mapred.task.partition", i);
TaskAttemptContextImpl taskAttemptContext = new TaskAttemptContextImpl(jobConf, taskAttemptID);
@SuppressWarnings("unchecked")
OutputFormat<K, V> outFormat = jobConf.getOutputFormat();
RecordWriter<K, V> recordWriter = outFormat.getRecordWriter(
null, jobConf, uuid + '-' + valueOf(i), Reporter.NULL);
return new WriteHdfsP<>(
recordWriter, taskAttemptContext, outputCommitter, extractKeyFn, extractValueFn);
} catch (IOException e) {
throw new JetException(e);
}
}).collect(toList());
}
项目:systemml
文件:MultipleOutputCommitter.java
@Override
public void commitTask(TaskAttemptContext context)
throws IOException
{
JobConf conf = context.getJobConf();
TaskAttemptID attemptId = context.getTaskAttemptID();
// get the mapping between index to output filename
outputs = MRJobConfiguration.getOutputs(conf);
// get temp task output path (compatible with hadoop1 and hadoop2)
Path taskOutPath = FileOutputFormat.getWorkOutputPath(conf);
FileSystem fs = taskOutPath.getFileSystem(conf);
if( !fs.exists(taskOutPath) )
throw new IOException("Task output path "+ taskOutPath.toString() + "does not exist.");
// move the task outputs to their final places
context.getProgressible().progress();
moveFinalTaskOutputs(context, fs, taskOutPath);
// delete the temporary task-specific output directory
if( !fs.delete(taskOutPath, true) )
LOG.debug("Failed to delete the temporary output directory of task: " + attemptId + " - " + taskOutPath);
}
项目:systemml
文件:MultipleOutputCommitter.java
private void moveFileToDestination(TaskAttemptContext context, FileSystem fs, Path file)
throws IOException
{
TaskAttemptID attemptId = context.getTaskAttemptID();
// get output index and final destination
String name = file.getName(); //e.g., 0-r-00000
int index = Integer.parseInt(name.substring(0, name.indexOf("-")));
Path dest = new Path(outputs[index], name); //e.g., outX/0-r-00000
// move file from 'file' to 'finalPath'
if( !fs.rename(file, dest) ) {
if (!fs.delete(dest, true))
throw new IOException("Failed to delete earlier output " + dest + " for rename of " + file + " in task " + attemptId);
if (!fs.rename(file, dest))
throw new IOException("Failed to save output " + dest + " for rename of " + file + " in task: " + attemptId);
}
}
项目:ignite
文件:HadoopV1OutputCollector.java
/**
* @param jobConf Job configuration.
* @param taskCtx Task context.
* @param directWrite Direct write flag.
* @param fileName File name.
* @throws IOException In case of IO exception.
*/
HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
@Nullable String fileName, TaskAttemptID attempt) throws IOException {
this.jobConf = jobConf;
this.taskCtx = taskCtx;
this.attempt = attempt;
if (directWrite) {
jobConf.set("mapreduce.task.attempt.id", attempt.toString());
OutputFormat outFormat = jobConf.getOutputFormat();
writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
}
else
writer = null;
}
项目:RecordServiceClient
文件:MapReduceTest.java
@Test
public void testCountStar() throws IOException, InterruptedException {
Configuration config = new Configuration();
TextInputFormat.TextRecordReader reader =
new TextInputFormat.TextRecordReader();
try {
RecordServiceConfig.setInputQuery(config, "select count(*) from tpch.nation");
List<InputSplit> splits = PlanUtil.getSplits(config, new Credentials()).splits;
int numRows = 0;
for (InputSplit split: splits) {
reader.initialize(split,
new TaskAttemptContextImpl(new JobConf(config), new TaskAttemptID()));
while (reader.nextKeyValue()) {
++numRows;
}
}
assertEquals(25, numRows);
} finally {
reader.close();
}
}
项目:vs.msc.ws14
文件:HadoopUtils.java
public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception {
try {
// for Hadoop 1.xx
Class<?> clazz = null;
if(!TaskAttemptContext.class.isInterface()) {
clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
}
// for Hadoop 2.xx
else {
clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
}
Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
// for Hadoop 1.xx
constructor.setAccessible(true);
TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
return context;
} catch(Exception e) {
throw new Exception("Could not create instance of TaskAttemptContext.", e);
}
}
项目:vs.msc.ws14
文件:HadoopFileOutputCommitter.java
public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
throws IOException {
Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
if (taskOutputPath != null) {
FileSystem fs = taskOutputPath.getFileSystem(conf);
if (fs.exists(taskOutputPath)) {
Path jobOutputPath = taskOutputPath.getParent().getParent();
// Move the task outputs to their final place
moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath);
// Delete the temporary task-specific output directory
if (!fs.delete(taskOutputPath, true)) {
LOG.info("Failed to delete the temporary output" +
" directory of task: " + taskAttemptID + " - " + taskOutputPath);
}
LOG.info("Saved output of task '" + taskAttemptID + "' to " +
jobOutputPath);
}
}
}
项目:vs.msc.ws14
文件:HadoopFileOutputCommitter.java
public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID)
throws IOException {
try {
Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
if (taskOutputPath != null) {
// Get the file-system for the task output directory
FileSystem fs = taskOutputPath.getFileSystem(conf);
// since task output path is created on demand,
// if it exists, task needs a commit
if (fs.exists(taskOutputPath)) {
return true;
}
}
} catch (IOException ioe) {
throw ioe;
}
return false;
}
项目:vs.msc.ws14
文件:HadoopFileOutputCommitter.java
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path p = new Path(outputPath,
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + taskAttemptID.toString()));
try {
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
return p;
}
}
return null;
}
项目:elasticsearch-hadoop
文件:HadoopCfgUtils.java
public static TaskID getTaskID(Configuration cfg) {
// first try with the attempt since some Hadoop versions mix the two
String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
if (StringUtils.hasText(taskAttemptId)) {
try {
return TaskAttemptID.forName(taskAttemptId).getTaskID();
} catch (IllegalArgumentException ex) {
// the task attempt is invalid (Tez in particular uses the wrong string - see #346)
// try to fallback to task id
return parseTaskIdFromTaskAttemptId(taskAttemptId);
}
}
String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
// double-check task id bug in Hadoop 2.5.x
if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
return TaskID.forName(taskIdProp);
}
return null;
}
项目:hadoop
文件:TestPipeApplication.java
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
项目:hadoop
文件:TestPipeApplication.java
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
项目:hadoop
文件:TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
项目:hadoop
文件:TestEventFetcher.java
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
项目:hadoop
文件:TestStreamingStatus.java
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
项目:aliyun-oss-hadoop-fs
文件:TestPipeApplication.java
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
项目:aliyun-oss-hadoop-fs
文件:TestPipeApplication.java
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
项目:aliyun-oss-hadoop-fs
文件:TaskContext.java
public TaskContext(JobConf conf, Class<?> iKClass, Class<?> iVClass,
Class<?> oKClass, Class<?> oVClass, TaskReporter reporter,
TaskAttemptID id) {
this.conf = conf;
this.iKClass = iKClass;
this.iVClass = iVClass;
this.oKClass = oKClass;
this.oVClass = oVClass;
this.reporter = reporter;
this.taskAttemptID = id;
}
项目:aliyun-oss-hadoop-fs
文件:NativeCollectorOnlyHandler.java
protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler,
BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException {
Configuration conf = context.getConf();
TaskAttemptID id = context.getTaskAttemptId();
if (null == id) {
this.output = OutputUtil.createNativeTaskOutput(conf, "");
} else {
this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId()
.toString());
}
this.combinerHandler = combiner;
this.kvPusher = kvPusher;
this.nativeHandler = nativeHandler;
nativeHandler.setCommandDispatcher(this);
}
项目:aliyun-oss-hadoop-fs
文件:TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
项目:aliyun-oss-hadoop-fs
文件:TestEventFetcher.java
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
项目:aliyun-oss-hadoop-fs
文件:TestStreamingStatus.java
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
项目:big-c
文件:TestPipeApplication.java
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
项目:big-c
文件:TestPipeApplication.java
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
项目:big-c
文件:TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
项目:big-c
文件:TestEventFetcher.java
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
项目:big-c
文件:TestStreamingStatus.java
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
项目:flink
文件:HadoopOutputFormatBase.java
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
项目:flink
文件:HadoopOutputFormatBase.java
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestPipeApplication.java
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestPipeApplication.java
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TaskContext.java
public TaskContext(JobConf conf, Class<?> iKClass, Class<?> iVClass,
Class<?> oKClass, Class<?> oVClass, TaskReporter reporter,
TaskAttemptID id) {
this.conf = conf;
this.iKClass = iKClass;
this.iVClass = iVClass;
this.oKClass = oKClass;
this.oVClass = oVClass;
this.reporter = reporter;
this.taskAttemptID = id;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:NativeCollectorOnlyHandler.java
protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler,
BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException {
Configuration conf = context.getConf();
TaskAttemptID id = context.getTaskAttemptId();
if (null == id) {
this.output = OutputUtil.createNativeTaskOutput(conf, "");
} else {
this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId()
.toString());
}
this.combinerHandler = combiner;
this.kvPusher = kvPusher;
this.nativeHandler = nativeHandler;
nativeHandler.setCommandDispatcher(this);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestShuffleScheduler.java
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestEventFetcher.java
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestStreamingStatus.java
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
项目:hadoop-EAR
文件:ReduceTask.java
public MapOutputLocation(TaskAttemptID taskAttemptId,
String ttHost, String httpTaskTracker) {
this.taskAttemptId = taskAttemptId;
this.taskId = this.taskAttemptId.getTaskID();
this.ttHost = ttHost;
this.httpTaskTracker = httpTaskTracker;
}
项目:hadoop-EAR
文件:ReduceTask.java
public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId,
Configuration conf, Path file, long size) {
this.mapId = mapId;
this.mapAttemptId = mapAttemptId;
this.conf = conf;
this.file = file;
this.compressedSize = size;
this.data = null;
this.inMemory = false;
}