@Test public void testGetEvents() throws Exception { WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .path("type_1").path("events") .queryParam("entityId", "id_1") .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); TimelineEvents events = response.getEntity(TimelineEvents.class); Assert.assertNotNull(events); Assert.assertEquals(1, events.getAllEvents().size()); TimelineEvents.EventsOfOneEntity partEvents = events.getAllEvents().get(0); Assert.assertEquals(2, partEvents.getEvents().size()); TimelineEvent event1 = partEvents.getEvents().get(0); Assert.assertEquals(456l, event1.getTimestamp()); Assert.assertEquals("end_event", event1.getEventType()); Assert.assertEquals(1, event1.getEventInfo().size()); TimelineEvent event2 = partEvents.getEvents().get(1); Assert.assertEquals(123l, event2.getTimestamp()); Assert.assertEquals("start_event", event2.getEventType()); Assert.assertEquals(0, event2.getEventInfo().size()); }
/** * Get the events whose entities the given user has access to. The meaning of * each argument has been documented with * {@link TimelineReader#getEntityTimelines}. * * @see TimelineReader#getEntityTimelines */ public TimelineEvents getEvents( String entityType, SortedSet<String> entityIds, SortedSet<String> eventTypes, Long windowStart, Long windowEnd, Long limit, UserGroupInformation callerUGI) throws YarnException, IOException { long startTime = Time.monotonicNow(); metrics.incrGetEventsOps(); try { TimelineEvents events = doGetEvents( entityType, entityIds, eventTypes, windowStart, windowEnd, limit, callerUGI); metrics.incrGetEventsTotal(events.getAllEvents().size()); return events; } finally { metrics.addGetEventsTime(Time.monotonicNow() - startTime); } }
@Override public TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) throws IOException { LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds); TimelineEvents returnEvents = new TimelineEvents(); List<EntityCacheItem> relatedCacheItems = new ArrayList<>(); for (String entityId : entityIds) { LOG.debug("getEntityTimeline type={} id={}", entityType, entityId); List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType, relatedCacheItems); for (TimelineStore store : stores) { LOG.debug("Try timeline store {}:{} for the request", store.getName(), store.toString()); SortedSet<String> entityIdSet = new TreeSet<>(); entityIdSet.add(entityId); TimelineEvents events = store.getEntityTimelines(entityType, entityIdSet, limit, windowStart, windowEnd, eventTypes); if (events != null) { returnEvents.addEvents(events.getAllEvents()); } } } return returnEvents; }
@Override public synchronized TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) { TimelineEvents allEvents = new TimelineEvents(); if (entityIds == null) { return allEvents; } if (limit == null) { limit = DEFAULT_LIMIT; } if (windowStart == null) { windowStart = Long.MIN_VALUE; } if (windowEnd == null) { windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); TimelineEntity entity = entities.get(entityID); if (entity == null) { continue; } EventsOfOneEntity events = new EventsOfOneEntity(); events.setEntityId(entityId); events.setEntityType(entityType); for (TimelineEvent event : entity.getEvents()) { if (events.getEvents().size() >= limit) { break; } if (event.getTimestamp() <= windowStart) { continue; } if (event.getTimestamp() > windowEnd) { continue; } if (eventTypes != null && !eventTypes.contains(event.getEventType())) { continue; } events.addEvent(event); } allEvents.addEvent(events); } return allEvents; }
/** * Get the events whose entities the given user has access to. The meaning of * each argument has been documented with * {@link TimelineReader#getEntityTimelines}. * * @see TimelineReader#getEntityTimelines */ public TimelineEvents getEvents( String entityType, SortedSet<String> entityIds, SortedSet<String> eventTypes, Long windowStart, Long windowEnd, Long limit, UserGroupInformation callerUGI) throws YarnException, IOException { TimelineEvents events = null; events = store.getEntityTimelines( entityType, entityIds, limit, windowStart, windowEnd, eventTypes); if (events != null) { Iterator<TimelineEvents.EventsOfOneEntity> eventsItr = events.getAllEvents().iterator(); while (eventsItr.hasNext()) { TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); try { TimelineEntity entity = store.getEntity( eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType(), EnumSet.of(Field.PRIMARY_FILTERS)); addDefaultDomainIdIfAbsent(entity); // check ACLs if (!timelineACLsManager.checkAccess( callerUGI, ApplicationAccessType.VIEW_APP, entity)) { eventsItr.remove(); } } catch (Exception e) { LOG.error("Error when verifying access for user " + callerUGI + " on the events of the timeline entity " + new EntityIdentifier(eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType()), e); eventsItr.remove(); } } } if (events == null) { return new TimelineEvents(); } return events; }
private TimelineEvents doGetEvents( String entityType, SortedSet<String> entityIds, SortedSet<String> eventTypes, Long windowStart, Long windowEnd, Long limit, UserGroupInformation callerUGI) throws YarnException, IOException { TimelineEvents events = null; events = store.getEntityTimelines( entityType, entityIds, limit, windowStart, windowEnd, eventTypes); if (events != null) { Iterator<TimelineEvents.EventsOfOneEntity> eventsItr = events.getAllEvents().iterator(); while (eventsItr.hasNext()) { TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next(); try { TimelineEntity entity = store.getEntity( eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType(), EnumSet.of(Field.PRIMARY_FILTERS)); addDefaultDomainIdIfAbsent(entity); // check ACLs if (!timelineACLsManager.checkAccess( callerUGI, ApplicationAccessType.VIEW_APP, entity)) { eventsItr.remove(); } } catch (Exception e) { LOG.warn("Error when verifying access for user " + callerUGI + " on the events of the timeline entity " + new EntityIdentifier(eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType()), e); eventsItr.remove(); } } } if (events == null) { return new TimelineEvents(); } return events; }
@Override public synchronized TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) { if (getServiceStopped()) { LOG.info("Service stopped, return null for the storage"); return null; } TimelineEvents allEvents = new TimelineEvents(); if (entityIds == null) { return allEvents; } if (limit == null) { limit = DEFAULT_LIMIT; } if (windowStart == null) { windowStart = Long.MIN_VALUE; } if (windowEnd == null) { windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); TimelineEntity entity = entities.get(entityID); if (entity == null) { continue; } EventsOfOneEntity events = new EventsOfOneEntity(); events.setEntityId(entityId); events.setEntityType(entityType); for (TimelineEvent event : entity.getEvents()) { if (events.getEvents().size() >= limit) { break; } if (event.getTimestamp() <= windowStart) { continue; } if (event.getTimestamp() > windowEnd) { continue; } if (eventTypes != null && !eventTypes.contains(event.getEventType())) { continue; } events.addEvent(event); } allEvents.addEvent(events); } return allEvents; }
@Test public void testEvents() throws Exception { TimelineEvents events = new TimelineEvents(); for (int j = 0; j < 2; ++j) { TimelineEvents.EventsOfOneEntity partEvents = new TimelineEvents.EventsOfOneEntity(); partEvents.setEntityId("entity id " + j); partEvents.setEntityType("entity type " + j); for (int i = 0; i < 2; ++i) { TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType("event type " + i); event.addEventInfo("key1", "val1"); event.addEventInfo("key2", "val2"); partEvents.addEvent(event); } events.addEvent(partEvents); } LOG.info("Events in JSON:"); LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(events, true)); Assert.assertEquals(2, events.getAllEvents().size()); TimelineEvents.EventsOfOneEntity partEvents1 = events.getAllEvents().get(0); Assert.assertEquals("entity id 0", partEvents1.getEntityId()); Assert.assertEquals("entity type 0", partEvents1.getEntityType()); Assert.assertEquals(2, partEvents1.getEvents().size()); TimelineEvent event11 = partEvents1.getEvents().get(0); Assert.assertEquals("event type 0", event11.getEventType()); Assert.assertEquals(2, event11.getEventInfo().size()); TimelineEvent event12 = partEvents1.getEvents().get(1); Assert.assertEquals("event type 1", event12.getEventType()); Assert.assertEquals(2, event12.getEventInfo().size()); TimelineEvents.EventsOfOneEntity partEvents2 = events.getAllEvents().get(1); Assert.assertEquals("entity id 1", partEvents2.getEntityId()); Assert.assertEquals("entity type 1", partEvents2.getEntityType()); Assert.assertEquals(2, partEvents2.getEvents().size()); TimelineEvent event21 = partEvents2.getEvents().get(0); Assert.assertEquals("event type 0", event21.getEventType()); Assert.assertEquals(2, event21.getEventInfo().size()); TimelineEvent event22 = partEvents2.getEvents().get(1); Assert.assertEquals("event type 1", event22.getEventType()); Assert.assertEquals(2, event22.getEventInfo().size()); }
@Override public TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) { TimelineEvents allEvents = new TimelineEvents(); if (entityIds == null) { return allEvents; } if (limit == null) { limit = DEFAULT_LIMIT; } if (windowStart == null) { windowStart = Long.MIN_VALUE; } if (windowEnd == null) { windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); TimelineEntity entity = entities.get(entityID); if (entity == null) { continue; } EventsOfOneEntity events = new EventsOfOneEntity(); events.setEntityId(entityId); events.setEntityType(entityType); for (TimelineEvent event : entity.getEvents()) { if (events.getEvents().size() >= limit) { break; } if (event.getTimestamp() <= windowStart) { continue; } if (event.getTimestamp() > windowEnd) { continue; } if (eventTypes != null && !eventTypes.contains(event.getEventType())) { continue; } events.addEvent(event); } allEvents.addEvent(events); } return allEvents; }
/** * This method retrieves the events for a list of entities all of the same * entity type. The events for each entity are sorted in order of their * timestamps, descending. * * @param entityType * The type of entities to retrieve events for. * @param entityIds * The entity IDs to retrieve events for. * @param limit * A limit on the number of events to return for each entity. If * null, defaults to {@link #DEFAULT_LIMIT} events per entity. * @param windowStart * If not null, retrieves only events later than the given time * (exclusive) * @param windowEnd * If not null, retrieves only events earlier than the given time * (inclusive) * @param eventTypes * Restricts the events returned to the given types. If null, events * of all types will be returned. * @return An {@link TimelineEvents} object. * @throws IOException */ TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) throws IOException;