private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) { if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } try { TimelinePutResponse response = timelineClient.putEntities( appContext.getApplicationAttemptId(), groupId, entity); if (response != null && !response.getErrors().isEmpty()) { int count = response.getErrors().size(); for (int i = 0; i < count; ++i) { TimelinePutError err = response.getErrors().get(i); if (err.getErrorCode() != 0) { LOG.warn("Could not post history event to ATS" + ", atsPutError=" + err.getErrorCode() + ", entityId=" + err.getEntityId()); } } } // Do nothing additional, ATS client library should handle throttling // or auto-disable as needed } catch (Exception e) { LOG.warn("Could not handle history events", e); } }
@Test public void testTimelinePutErrors() throws Exception { TimelinePutResponse TimelinePutErrors = new TimelinePutResponse(); TimelinePutError error1 = new TimelinePutError(); error1.setEntityId("entity id 1"); error1.setEntityId("entity type 1"); error1.setErrorCode(TimelinePutError.NO_START_TIME); TimelinePutErrors.addError(error1); List<TimelinePutError> response = new ArrayList<TimelinePutError>(); response.add(error1); TimelinePutError error2 = new TimelinePutError(); error2.setEntityId("entity id 2"); error2.setEntityId("entity type 2"); error2.setErrorCode(TimelinePutError.IO_EXCEPTION); response.add(error2); TimelinePutErrors.addErrors(response); LOG.info("Errors in JSON:"); LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(TimelinePutErrors, true)); Assert.assertEquals(3, TimelinePutErrors.getErrors().size()); TimelinePutError e = TimelinePutErrors.getErrors().get(0); Assert.assertEquals(error1.getEntityId(), e.getEntityId()); Assert.assertEquals(error1.getEntityType(), e.getEntityType()); Assert.assertEquals(error1.getErrorCode(), e.getErrorCode()); e = TimelinePutErrors.getErrors().get(1); Assert.assertEquals(error1.getEntityId(), e.getEntityId()); Assert.assertEquals(error1.getEntityType(), e.getEntityType()); Assert.assertEquals(error1.getErrorCode(), e.getErrorCode()); e = TimelinePutErrors.getErrors().get(2); Assert.assertEquals(error2.getEntityId(), e.getEntityId()); Assert.assertEquals(error2.getEntityType(), e.getEntityType()); Assert.assertEquals(error2.getErrorCode(), e.getErrorCode()); }
/** * Handle error and set it in response. */ private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) { TimelinePutError error = new TimelinePutError(); error.setEntityId(entity.getEntityId()); error.setEntityType(entity.getEntityType()); error.setErrorCode(errorCode); response.addError(error); }
private void handleEvent(DAGHistoryEvent event) { HistoryEventType eventType = event.getHistoryEvent().getEventType(); TezDAGID dagId = event.getDagID(); if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { DAGSubmittedEvent dagSubmittedEvent = (DAGSubmittedEvent) event.getHistoryEvent(); String dagName = dagSubmittedEvent.getDAGName(); if (dagName != null && dagName.startsWith( TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) { // Skip recording pre-warm DAG events skippedDAGs.add(dagId); return; } } if (eventType.equals(HistoryEventType.DAG_FINISHED)) { // Remove from set to keep size small // No more events should be seen after this point. if (skippedDAGs.remove(dagId)) { return; } } if (dagId != null && skippedDAGs.contains(dagId)) { // Skip pre-warm DAGs return; } try { TimelinePutResponse response = timelineClient.putEntities( HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent())); if (response != null && !response.getErrors().isEmpty()) { TimelinePutError err = response.getErrors().get(0); if (err.getErrorCode() != 0) { LOG.warn("Could not post history event to ATS, eventType=" + eventType + ", atsPutError=" + err.getErrorCode()); } } // Do nothing additional, ATS client library should handle throttling // or auto-disable as needed } catch (Exception e) { LOG.warn("Could not handle history event, eventType=" + eventType, e); } }
private void handleEvents(List<DAGHistoryEvent> events) { List<TimelineEntity> entities = new ArrayList<>(events.size()); for (DAGHistoryEvent event : events) { String domainId = getDomainForEvent(event); // skippedDags is updated in the above call so check again. if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) { continue; } List<TimelineEntity> eventEntities = HistoryEventTimelineConversion.convertToTimelineEntities( event.getHistoryEvent()); entities.addAll(eventEntities); if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { for (TimelineEntity entity: eventEntities) { historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } } } if (LOG.isDebugEnabled()) { LOG.debug("Sending event batch to Timeline, batchSize=" + events.size()); } try { TimelinePutResponse response = timelineClient.putEntities(entities.toArray(new TimelineEntity[entities.size()])); if (response != null && !response.getErrors().isEmpty()) { int count = response.getErrors().size(); for (int i = 0; i < count; ++i) { TimelinePutError err = response.getErrors().get(i); if (err.getErrorCode() != 0) { LOG.warn("Could not post history event to ATS" + ", atsPutError=" + err.getErrorCode() + ", entityId=" + err.getEntityId()); } } } // Do nothing additional, ATS client library should handle throttling // or auto-disable as needed } catch (Exception e) { LOG.warn("Could not handle history events", e); } }