Java 类org.apache.hadoop.mapreduce.jobhistory.EventReader 实例源码
项目:hadoop
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hadoop
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:hadoop
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:aliyun-oss-hadoop-fs
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:aliyun-oss-hadoop-fs
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:aliyun-oss-hadoop-fs
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:big-c
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:big-c
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:big-c
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:hadoop-2.6.0-cdh5.4.3
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:hadoop-2.6.0-cdh5.4.3
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:hadoop-plus
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hadoop-plus
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:FlexMap
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:FlexMap
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:hops
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hops
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:hops
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:hadoop-TCP
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hadoop-TCP
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:hardfs
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hardfs
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:hadoop-on-lustre2
文件:MRAppMaster.java
private List<AMInfo> readJustAMInfos() {
List<AMInfo> amInfos = new ArrayList<AMInfo>();
FSDataInputStream inputStream = null;
try {
inputStream = getPreviousJobHistoryStream(getConfig(), appAttemptID);
EventReader jobHistoryEventReader = new EventReader(inputStream);
// All AMInfos are contiguous. Track when the first AMStartedEvent
// appears.
boolean amStartedEventsBegan = false;
HistoryEvent event;
while ((event = jobHistoryEventReader.getNextEvent()) != null) {
if (event.getEventType() == EventType.AM_STARTED) {
if (!amStartedEventsBegan) {
// First AMStartedEvent.
amStartedEventsBegan = true;
}
AMStartedEvent amStartedEvent = (AMStartedEvent) event;
amInfos.add(MRBuilderUtils.newAMInfo(
amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
amStartedEvent.getContainerId(),
StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
amStartedEvent.getNodeManagerPort(),
amStartedEvent.getNodeManagerHttpPort()));
} else if (amStartedEventsBegan) {
// This means AMStartedEvents began and this event is a
// non-AMStarted event.
// No need to continue reading all the other events.
break;
}
}
} catch (IOException e) {
LOG.warn("Could not parse the old history file. "
+ "Will not have old AMinfos ", e);
} finally {
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
}
return amInfos;
}
项目:hadoop-on-lustre2
文件:TestJobHistoryParsing.java
@Test
public void testMultipleFailedTasks() throws Exception {
JobHistoryParser parser =
new JobHistoryParser(Mockito.mock(FSDataInputStream.class));
EventReader reader = Mockito.mock(EventReader.class);
final AtomicInteger numEventsRead = new AtomicInteger(0); // Hack!
final org.apache.hadoop.mapreduce.TaskType taskType =
org.apache.hadoop.mapreduce.TaskType.MAP;
final TaskID[] tids = new TaskID[2];
final JobID jid = new JobID("1", 1);
tids[0] = new TaskID(jid, taskType, 0);
tids[1] = new TaskID(jid, taskType, 1);
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
public HistoryEvent answer(InvocationOnMock invocation)
throws IOException {
// send two task start and two task fail events for tasks 0 and 1
int eventId = numEventsRead.getAndIncrement();
TaskID tid = tids[eventId & 0x1];
if (eventId < 2) {
return new TaskStartedEvent(tid, 0, taskType, "");
}
if (eventId < 4) {
TaskFailedEvent tfe = new TaskFailedEvent(tid, 0, taskType,
"failed", "FAILED", null, new Counters());
tfe.setDatum(tfe.getDatum());
return tfe;
}
if (eventId < 5) {
JobUnsuccessfulCompletionEvent juce =
new JobUnsuccessfulCompletionEvent(jid, 100L, 2, 0,
"JOB_FAILED", Collections.singletonList(
"Task failed: " + tids[0].toString()));
return juce;
}
return null;
}
});
JobInfo info = parser.parse(reader);
assertTrue("Task 0 not implicated",
info.getErrorInfo().contains(tids[0].toString()));
}
项目:hadoop-on-lustre2
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}
项目:mapreduce-fork
文件:CurrentJHParser.java
public CurrentJHParser(InputStream input) throws IOException {
reader = new EventReader(new DataInputStream(input));
}