/** * Verify timeline events */ private static void verifyEntityTimeline( EventsOfOneEntity retrievedEvents, String entityId, String entityType, TimelineEvent... actualEvents) { assertEquals(entityId, retrievedEvents.getEntityId()); assertEquals(entityType, retrievedEvents.getEntityType()); assertEquals(actualEvents.length, retrievedEvents.getEvents().size()); for (int i = 0; i < actualEvents.length; i++) { assertEquals(actualEvents[i], retrievedEvents.getEvents().get(i)); } }
@Override public synchronized TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) { TimelineEvents allEvents = new TimelineEvents(); if (entityIds == null) { return allEvents; } if (limit == null) { limit = DEFAULT_LIMIT; } if (windowStart == null) { windowStart = Long.MIN_VALUE; } if (windowEnd == null) { windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); TimelineEntity entity = entities.get(entityID); if (entity == null) { continue; } EventsOfOneEntity events = new EventsOfOneEntity(); events.setEntityId(entityId); events.setEntityType(entityType); for (TimelineEvent event : entity.getEvents()) { if (events.getEvents().size() >= limit) { break; } if (event.getTimestamp() <= windowStart) { continue; } if (event.getTimestamp() > windowEnd) { continue; } if (eventTypes != null && !eventTypes.contains(event.getEventType())) { continue; } events.addEvent(event); } allEvents.addEvent(events); } return allEvents; }
public void testGetEvents() throws IOException { // test getting entity timelines SortedSet<String> sortedSet = new TreeSet<String>(); sortedSet.add(entityId1); List<EventsOfOneEntity> timelines = store.getEntityTimelines(entityType1, sortedSet, null, null, null, null).getAllEvents(); assertEquals(1, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2, ev1); sortedSet.add(entityId1b); timelines = store.getEntityTimelines(entityType1, sortedSet, null, null, null, null).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2, ev1); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2, ev1); timelines = store.getEntityTimelines(entityType1, sortedSet, 1l, null, null, null).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2); timelines = store.getEntityTimelines(entityType1, sortedSet, null, 345l, null, null).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2); timelines = store.getEntityTimelines(entityType1, sortedSet, null, 123l, null, null).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2); timelines = store.getEntityTimelines(entityType1, sortedSet, null, null, 345l, null).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev1); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev1); timelines = store.getEntityTimelines(entityType1, sortedSet, null, null, 123l, null).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev1); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev1); timelines = store.getEntityTimelines(entityType1, sortedSet, null, null, null, Collections.singleton("end_event")).getAllEvents(); assertEquals(2, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId1, entityType1, ev2); verifyEntityTimeline(timelines.get(1), entityId1b, entityType1, ev2); sortedSet.add(entityId2); timelines = store.getEntityTimelines(entityType2, sortedSet, null, null, null, null).getAllEvents(); assertEquals(1, timelines.size()); verifyEntityTimeline(timelines.get(0), entityId2, entityType2, ev3, ev4); }
@Override public synchronized TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) { if (getServiceStopped()) { LOG.info("Service stopped, return null for the storage"); return null; } TimelineEvents allEvents = new TimelineEvents(); if (entityIds == null) { return allEvents; } if (limit == null) { limit = DEFAULT_LIMIT; } if (windowStart == null) { windowStart = Long.MIN_VALUE; } if (windowEnd == null) { windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); TimelineEntity entity = entities.get(entityID); if (entity == null) { continue; } EventsOfOneEntity events = new EventsOfOneEntity(); events.setEntityId(entityId); events.setEntityType(entityType); for (TimelineEvent event : entity.getEvents()) { if (events.getEvents().size() >= limit) { break; } if (event.getTimestamp() <= windowStart) { continue; } if (event.getTimestamp() > windowEnd) { continue; } if (eventTypes != null && !eventTypes.contains(event.getEventType())) { continue; } events.addEvent(event); } allEvents.addEvent(events); } return allEvents; }
@Override public TimelineEvents getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventTypes) { TimelineEvents allEvents = new TimelineEvents(); if (entityIds == null) { return allEvents; } if (limit == null) { limit = DEFAULT_LIMIT; } if (windowStart == null) { windowStart = Long.MIN_VALUE; } if (windowEnd == null) { windowEnd = Long.MAX_VALUE; } for (String entityId : entityIds) { EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); TimelineEntity entity = entities.get(entityID); if (entity == null) { continue; } EventsOfOneEntity events = new EventsOfOneEntity(); events.setEntityId(entityId); events.setEntityType(entityType); for (TimelineEvent event : entity.getEvents()) { if (events.getEvents().size() >= limit) { break; } if (event.getTimestamp() <= windowStart) { continue; } if (event.getTimestamp() > windowEnd) { continue; } if (eventTypes != null && !eventTypes.contains(event.getEventType())) { continue; } events.addEvent(event); } allEvents.addEvent(events); } return allEvents; }