private static TimelineEntity generateEntity() { TimelineEntity entity = new TimelineEntity(); entity.setEntityId("entity id"); entity.setEntityType("entity type"); entity.setStartTime(System.currentTimeMillis()); for (int i = 0; i < 2; ++i) { TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType("test event type " + i); event.addEventInfo("key1", "val1"); event.addEventInfo("key2", "val2"); entity.addEvent(event); } entity.addRelatedEntity("test ref type 1", "test ref id 1"); entity.addRelatedEntity("test ref type 2", "test ref id 2"); entity.addPrimaryFilter("pkey1", "pval1"); entity.addPrimaryFilter("pkey2", "pval2"); entity.addOtherInfo("okey1", "oval1"); entity.addOtherInfo("okey2", "oval2"); entity.setDomainId("domain id 1"); return entity; }
private void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { TimelineEntity entity = createApplicationEntity(event.getApplicationId()); Map<String, Object> entityInfo = new HashMap<String, Object>(); entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, event.getApplicationName()); entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO, event.getApplicationType()); entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, event.getUser()); entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, event.getQueue()); entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO, event.getSubmittedTime()); entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType( ApplicationMetricsConstants.CREATED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); putEntity(entity); }
private void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) { TimelineEntity entity = createAppAttemptEntity(event.getApplicationAttemptId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType( AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put( AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, event.getTrackingUrl()); eventInfo.put( AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, event.getOriginalTrackingURL()); eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost()); eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO, event.getRpcPort()); eventInfo.put( AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event.getMasterContainerId().toString()); tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); putEntity(entity); }
private void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) { TimelineEntity entity = createAppAttemptEntity(event.getApplicationAttemptId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put( AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO, event.getTrackingUrl()); eventInfo.put( AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO, event.getOriginalTrackingURL()); eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, event.getDiagnosticsInfo()); eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event.getFinalApplicationStatus().toString()); eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event.getYarnApplicationAttemptState().toString()); tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); putEntity(entity); }
private void publishContainerCreatedEvent(ContainerCreatedEvent event) { TimelineEntity entity = createContainerEntity(event.getContainerId()); Map<String, Object> entityInfo = new HashMap<String, Object>(); entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event.getAllocatedResource().getMemory()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event.getAllocatedResource().getVirtualCores()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_GCORE_ENTITY_INFO, event.getAllocatedResource().getGpuCores()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event.getAllocatedNode().getHost()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event.getAllocatedNode().getPort()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, event.getAllocatedPriority().getPriority()); entityInfo.put( ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, event.getNodeHttpAddress()); entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); putEntity(entity); }
private void publishContainerFinishedEvent(ContainerFinishedEvent event) { TimelineEntity entity = createContainerEntity(event.getContainerId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, event.getDiagnosticsInfo()); eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO, event.getContainerExitStatus()); eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event.getContainerState().toString()); tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); putEntity(entity); }
@Test public void testGetEvents() throws Exception { WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .path("type_1").path("events") .queryParam("entityId", "id_1") .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); TimelineEvents events = response.getEntity(TimelineEvents.class); Assert.assertNotNull(events); Assert.assertEquals(1, events.getAllEvents().size()); TimelineEvents.EventsOfOneEntity partEvents = events.getAllEvents().get(0); Assert.assertEquals(2, partEvents.getEvents().size()); TimelineEvent event1 = partEvents.getEvents().get(0); Assert.assertEquals(456l, event1.getTimestamp()); Assert.assertEquals("end_event", event1.getEventType()); Assert.assertEquals(1, event1.getEventInfo().size()); TimelineEvent event2 = partEvents.getEvents().get(1); Assert.assertEquals(123l, event2.getTimestamp()); Assert.assertEquals("start_event", event2.getEventType()); Assert.assertEquals(0, event2.getEventInfo().size()); }
/** * Create a test entity */ protected static TimelineEntity createEntity(String entityId, String entityType, Long startTime, List<TimelineEvent> events, Map<String, Set<String>> relatedEntities, Map<String, Set<Object>> primaryFilters, Map<String, Object> otherInfo, String domainId) { TimelineEntity entity = new TimelineEntity(); entity.setEntityId(entityId); entity.setEntityType(entityType); entity.setStartTime(startTime); entity.setEvents(events); if (relatedEntities != null) { for (Entry<String, Set<String>> e : relatedEntities.entrySet()) { for (String v : e.getValue()) { entity.addRelatedEntity(e.getKey(), v); } } } else { entity.setRelatedEntities(null); } entity.setPrimaryFilters(primaryFilters); entity.setOtherInfo(otherInfo); entity.setDomainId(domainId); return entity; }
private static void publishContainerEndEvent( final TimelineClient timelineClient, ContainerStatus container, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getContainerId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_END.toString()); event.addEventInfo("State", container.getState().name()); event.addEventInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); try { timelineClient.putEntities(entity); } catch (YarnException | IOException e) { LOG.error("Container end event could not be published for " + container.getContainerId().toString(), e); } }
private static void publishApplicationAttemptEvent( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setDomainId(domainId); entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); entity.addEvent(event); try { timelineClient.putEntities(entity); } catch (YarnException | IOException e) { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " + appAttemptId.toString(), e); } }
private void publishContainerCreatedEvent(ContainerCreatedEvent event) { TimelineEntity entity = createContainerEntity(event.getContainerId()); Map<String, Object> entityInfo = new HashMap<String, Object>(); entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event.getAllocatedResource().getMemory()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event.getAllocatedResource().getVirtualCores()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event.getAllocatedNode().getHost()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event.getAllocatedNode().getPort()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, event.getAllocatedPriority().getPriority()); entityInfo.put( ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, event.getNodeHttpAddress()); entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); putEntity(entity); }
/** * Get the unique start time for a given entity as a byte array that sorts the * timestamps in reverse order (see * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start * time doesn't exist, set it based on the information provided. Should only * be called when a lock has been obtained on the entity. * * @param entityId * The id of the entity * @param entityType * The type of the entity * @param startTime * The start time of the entity, or null * @param events * A list of events for the entity, or null * @return A StartAndInsertTime * @throws IOException */ private Long getAndSetStartTime(String entityId, String entityType, Long startTime, List<TimelineEvent> events) throws IOException { EntityIdentifier entity = new EntityIdentifier(entityId, entityType); Long time = startTimeWriteCache.get(entity); if (time != null) { // return the value in the cache return time; } if (startTime == null && events != null) { // calculate best guess start time based on lowest event time startTime = Long.MAX_VALUE; for (TimelineEvent e : events) { if (e.getTimestamp() < startTime) { startTime = e.getTimestamp(); } } } // check the provided start time matches the db return checkStartTimeInDb(entity, startTime); }
/** * Creates an event object from the given key, offset, and value. If the event * type is not contained in the specified set of event types, returns null. */ private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key, int offset, byte[] value) throws IOException { KeyParser kp = new KeyParser(key, offset); long ts = kp.getNextLong(); String tstype = kp.getNextString(); if (eventTypes == null || eventTypes.contains(tstype)) { TimelineEvent event = new TimelineEvent(); event.setTimestamp(ts); event.setEventType(tstype); Object o = fstConf.asObject(value); if (o == null) { event.setEventInfo(null); } else if (o instanceof Map) { @SuppressWarnings("unchecked") Map<String, Object> m = (Map<String, Object>) o; event.setEventInfo(m); } else { throw new IOException("Couldn't deserialize event info map"); } return event; } return null; }
private void publishApplicationFinishedEvent(ApplicationFinishedEvent event) { TimelineEntity entity = createApplicationEntity(event.getApplicationId()); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType( ApplicationMetricsConstants.FINISHED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); Map<String, Object> eventInfo = new HashMap<String, Object>(); eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO, event.getDiagnosticsInfo()); eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event.getFinalApplicationStatus().toString()); eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event.getYarnApplicationState().toString()); if (event.getLatestApplicationAttemptId() != null) { eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO, event.getLatestApplicationAttemptId().toString()); } tEvent.setEventInfo(eventInfo); entity.addEvent(tEvent); putEntity(entity); }
private void publishContainerCreatedEvent(ContainerCreatedEvent event) { TimelineEntity entity = createContainerEntity(event.getContainerId()); Map<String, Object> entityInfo = new HashMap<String, Object>(); entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event.getAllocatedResource().getMemory()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event.getAllocatedResource().getVirtualCores()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event.getAllocatedNode().getHost()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event.getAllocatedNode().getPort()); entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, event.getAllocatedPriority().getPriority()); entity.setOtherInfo(entityInfo); TimelineEvent tEvent = new TimelineEvent(); tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE); tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); putEntity(entity); }
/** * Creates an event object from the given key, offset, and value. If the * event type is not contained in the specified set of event types, * returns null. */ private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key, int offset, byte[] value) throws IOException { KeyParser kp = new KeyParser(key, offset); long ts = kp.getNextLong(); String tstype = kp.getNextString(); if (eventTypes == null || eventTypes.contains(tstype)) { TimelineEvent event = new TimelineEvent(); event.setTimestamp(ts); event.setEventType(tstype); Object o = GenericObjectMapper.read(value); if (o == null) { event.setEventInfo(null); } else if (o instanceof Map) { @SuppressWarnings("unchecked") Map<String, Object> m = (Map<String, Object>) o; event.setEventInfo(m); } else { throw new IOException("Couldn't deserialize event info map"); } return event; } return null; }