Java 类org.apache.hadoop.mapreduce.jobhistory.HistoryEvent 实例源码
项目:hadoop
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
JobID jobID = JobID.forName(jobIDName);
if (jobIDName == null) {
return null;
}
String priority = line.get("JOB_PRIORITY");
if (priority != null) {
return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
}
return null;
}
项目:hadoop
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String status = line.get("JOB_STATUS");
if (status != null) {
return new JobStatusChangedEvent(jobID, status);
}
return null;
}
项目:hadoop
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
if (launchTime != null) {
Job20LineHistoryEventEmitter that =
(Job20LineHistoryEventEmitter) thatg;
return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
.parseLong(launchTime));
}
return null;
}
项目:hadoop
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
if (status != null && !status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
}
return null;
}
项目:hadoop
文件:ReduceAttempt20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
String shuffleFinish = line.get("SHUFFLE_FINISHED");
String sortFinish = line.get("SORT_FINISHED");
if (shuffleFinish != null && sortFinish != null
&& "success".equalsIgnoreCase(status)) {
ReduceAttempt20LineHistoryEventEmitter that =
(ReduceAttempt20LineHistoryEventEmitter) thatg;
return new ReduceAttemptFinishedEvent
(taskAttemptID,
that.originalTaskType, status,
Long.parseLong(shuffleFinish),
Long.parseLong(sortFinish),
Long.parseLong(finishTime),
hostName, -1, null,
state, maybeParseCounters(counters),
null);
}
}
return null;
}
项目:hadoop
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
项目:hadoop
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
项目:hadoop
文件:TopologyBuilder.java
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
}
项目:hadoop
文件:MapAttempt20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
MapAttempt20LineHistoryEventEmitter that =
(MapAttempt20LineHistoryEventEmitter) thatg;
if ("success".equalsIgnoreCase(status)) {
return new MapAttemptFinishedEvent
(taskAttemptID,
that.originalTaskType, status,
Long.parseLong(finishTime),
Long.parseLong(finishTime),
hostName, -1, null, state, maybeParseCounters(counters),
null);
}
}
return null;
}
项目:hadoop
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
String uberized = line.get("UBERIZED");
if (launchTime != null && totalMaps != null && totalReduces != null) {
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
Boolean.parseBoolean(uberized));
}
return null;
}
项目:hadoop
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
String failedMaps = line.get("FAILED_MAPS");
String failedReduces = line.get("FAILED_REDUCES");
String counters = line.get("COUNTERS");
if (status != null && status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
.parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
.parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
maybeParseCounters(counters));
}
return null;
}
项目:hadoop
文件:TaskAttempt20LineEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String startTime = line.get("START_TIME");
String taskType = line.get("TASK_TYPE");
String trackerName = line.get("TRACKER_NAME");
String httpPort = line.get("HTTP_PORT");
String locality = line.get("LOCALITY");
if (locality == null) {
locality = "";
}
String avataar = line.get("AVATAAR");
if (avataar == null) {
avataar = "";
}
if (startTime != null && taskType != null) {
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
int port =
httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
.parseInt(httpPort);
return new TaskAttemptStartedEvent(taskAttemptID,
that.originalTaskType, that.originalStartTime, trackerName, port, -1,
locality, avataar);
}
return null;
}
项目:hadoop
文件:TaskAttempt20LineEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String counters = line.get("COUNTERS");
String state = line.get("STATE_STRING");
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
return new TaskAttemptFinishedEvent(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
pHost.getRackName(), pHost.getNodeName(), state,
maybeParseCounters(counters));
}
return null;
}
项目:hadoop
文件:TaskAttempt20LineEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
HistoryEventEmitter thatg) {
if (taskAttemptIDName == null) {
return null;
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("TASK_STATUS");
if (finishTime != null && status != null
&& !status.equalsIgnoreCase("success")) {
String hostName = line.get("HOSTNAME");
String error = line.get("ERROR");
TaskAttempt20LineEventEmitter that =
(TaskAttempt20LineEventEmitter) thatg;
ParsedHost pHost = ParsedHost.parse(hostName);
String rackName = null;
// Earlier versions of MR logged on hostnames (without rackname) for
// unsuccessful attempts
if (pHost != null) {
rackName = pHost.getRackName();
hostName = pHost.getNodeName();
}
return new TaskAttemptUnsuccessfulCompletionEvent
(taskAttemptID,
that.originalTaskType, status, Long.parseLong(finishTime),
hostName, -1, rackName, error, null);
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:TopologyBuilder.java
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
}
项目:aliyun-oss-hadoop-fs
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
JobID jobID = JobID.forName(jobIDName);
if (jobIDName == null) {
return null;
}
String priority = line.get("JOB_PRIORITY");
if (priority != null) {
return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
String uberized = line.get("UBERIZED");
if (launchTime != null && totalMaps != null && totalReduces != null) {
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
Boolean.parseBoolean(uberized));
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String status = line.get("JOB_STATUS");
if (status != null) {
return new JobStatusChangedEvent(jobID, status);
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
if (launchTime != null) {
Job20LineHistoryEventEmitter that =
(Job20LineHistoryEventEmitter) thatg;
return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
.parseLong(launchTime));
}
return null;
}
项目:aliyun-oss-hadoop-fs
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
if (status != null && !status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
}
return null;
}
项目:big-c
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
项目:big-c
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
项目:big-c
文件:TopologyBuilder.java
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
}
项目:big-c
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
JobID jobID = JobID.forName(jobIDName);
if (jobIDName == null) {
return null;
}
String priority = line.get("JOB_PRIORITY");
if (priority != null) {
return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
}
return null;
}
项目:big-c
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
String uberized = line.get("UBERIZED");
if (launchTime != null && totalMaps != null && totalReduces != null) {
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
Boolean.parseBoolean(uberized));
}
return null;
}
项目:big-c
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String status = line.get("JOB_STATUS");
if (status != null) {
return new JobStatusChangedEvent(jobID, status);
}
return null;
}
项目:big-c
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
if (launchTime != null) {
Job20LineHistoryEventEmitter that =
(Job20LineHistoryEventEmitter) thatg;
return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
.parseLong(launchTime));
}
return null;
}
项目:big-c
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
if (status != null && !status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TopologyBuilder.java
/**
* Process one {@link HistoryEvent}
*
* @param event
* The {@link HistoryEvent} to be processed.
*/
public void process(HistoryEvent event) {
if (event instanceof TaskAttemptFinishedEvent) {
processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
} else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
} else if (event instanceof TaskStartedEvent) {
processTaskStartedEvent((TaskStartedEvent) event);
} else if (event instanceof MapAttemptFinishedEvent) {
processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
} else if (event instanceof ReduceAttemptFinishedEvent) {
processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
}
// I do NOT expect these if statements to be exhaustive.
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
JobID jobID = JobID.forName(jobIDName);
if (jobIDName == null) {
return null;
}
String priority = line.get("JOB_PRIORITY");
if (priority != null) {
return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
String uberized = line.get("UBERIZED");
if (launchTime != null && totalMaps != null && totalReduces != null) {
return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
.parseInt(totalMaps), Integer.parseInt(totalReduces), status,
Boolean.parseBoolean(uberized));
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String status = line.get("JOB_STATUS");
if (status != null) {
return new JobStatusChangedEvent(jobID, status);
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String launchTime = line.get("LAUNCH_TIME");
if (launchTime != null) {
Job20LineHistoryEventEmitter that =
(Job20LineHistoryEventEmitter) thatg;
return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
.parseLong(launchTime));
}
return null;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
HistoryEventEmitter thatg) {
if (jobIDName == null) {
return null;
}
JobID jobID = JobID.forName(jobIDName);
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String finishedMaps = line.get("FINISHED_MAPS");
String finishedReduces = line.get("FINISHED_REDUCES");
if (status != null && !status.equalsIgnoreCase("success")
&& finishTime != null && finishedMaps != null
&& finishedReduces != null) {
return new JobUnsuccessfulCompletionEvent(jobID, Long
.parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
.parseInt(finishedReduces), status);
}
return null;
}
项目:hadoop-plus
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String taskType = line.get("TASK_TYPE");
String startTime = line.get("START_TIME");
String splits = line.get("SPLITS");
if (startTime != null && taskType != null) {
Task20LineHistoryEventEmitter that =
(Task20LineHistoryEventEmitter) thatg;
that.originalStartTime = Long.parseLong(startTime);
that.originalTaskType =
Version20LogInterfaceUtils.get20TaskType(taskType);
return new TaskStartedEvent(taskID, that.originalStartTime,
that.originalTaskType, splits);
}
return null;
}
项目:hadoop-plus
文件:Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
HistoryEventEmitter thatg) {
if (taskIDName == null) {
return null;
}
TaskID taskID = TaskID.forName(taskIDName);
String finishTime = line.get("FINISH_TIME");
if (finishTime != null) {
return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
}
return null;
}