@Test public void testFromTs() throws Exception { WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .path("type_1").queryParam("fromTs", Long.toString(beforeTime)) .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(0, response.getEntity(TimelineEntities.class).getEntities() .size()); response = r.path("ws").path("v1").path("timeline") .path("type_1").queryParam("fromTs", Long.toString( System.currentTimeMillis())) .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(3, response.getEntity(TimelineEntities.class).getEntities() .size()); }
private static ClientResponse mockEntityClientResponse( TimelineClientImpl client, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { doThrow(new ClientHandlerException(new ConnectException())).when(client) .doPostingObject(any(TimelineEntities.class), any(String.class)); return response; } doReturn(response).when(client) .doPostingObject(any(TimelineEntities.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); error.setEntityId("test entity id"); error.setEntityType("test entity type"); error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); TimelinePutResponse putResponse = new TimelinePutResponse(); if (hasError) { putResponse.addError(error); } when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); return response; }
@Override public Map<ApplicationId, ApplicationReport> getAllApplications() throws YarnException, IOException { TimelineEntities entities = timelineDataManager.getEntities( ApplicationMetricsConstants.ENTITY_TYPE, null, null, null, null, null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class), UserGroupInformation.getLoginUser()); Map<ApplicationId, ApplicationReport> apps = new LinkedHashMap<ApplicationId, ApplicationReport>(); if (entities != null && entities.getEntities() != null) { for (TimelineEntity entity : entities.getEntities()) { try { ApplicationReportExt app = generateApplicationReport(entity, ApplicationReportField.ALL); apps.put(app.appReport.getApplicationId(), app.appReport); } catch (Exception e) { LOG.error("Error on generating application report for " + entity.getEntityId(), e); } } } return apps; }
@Override public Map<ApplicationAttemptId, ApplicationAttemptReport> getApplicationAttempts(ApplicationId appId) throws YarnException, IOException { ApplicationReportExt app = getApplication( appId, ApplicationReportField.USER_AND_ACLS); checkAccess(app); TimelineEntities entities = timelineDataManager.getEntities( AppAttemptMetricsConstants.ENTITY_TYPE, new NameValuePair( AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER, appId .toString()), null, null, null, null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class), UserGroupInformation.getLoginUser()); Map<ApplicationAttemptId, ApplicationAttemptReport> appAttempts = new LinkedHashMap<ApplicationAttemptId, ApplicationAttemptReport>(); for (TimelineEntity entity : entities.getEntities()) { ApplicationAttemptReport appAttempt = convertToApplicationAttemptReport(entity); appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt); } return appAttempts; }
@Override public Map<ContainerId, ContainerReport> getContainers( ApplicationAttemptId appAttemptId) throws YarnException, IOException { ApplicationReportExt app = getApplication( appAttemptId.getApplicationId(), ApplicationReportField.USER_AND_ACLS); checkAccess(app); TimelineEntities entities = timelineDataManager.getEntities( ContainerMetricsConstants.ENTITY_TYPE, new NameValuePair( ContainerMetricsConstants.PARENT_PRIMARIY_FILTER, appAttemptId.toString()), null, null, null, null, null, Long.MAX_VALUE, EnumSet.allOf(Field.class), UserGroupInformation.getLoginUser()); Map<ContainerId, ContainerReport> containers = new LinkedHashMap<ContainerId, ContainerReport>(); if (entities != null && entities.getEntities() != null) { for (TimelineEntity entity : entities.getEntities()) { ContainerReport container = convertToContainerReport( entity, serverHttpAddress, app.appReport.getUser()); containers.put(container.getContainerId(), container); } } return containers; }
/** * Store the given entities into the timeline store, and return the errors * that happen during storing. */ @POST @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) public TimelinePutResponse postEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, TimelineEntities entities) { init(res); UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; LOG.error(msg); throw new ForbiddenException(msg); } try { return timelineDataManager.postEntities(entities, callerUGI); } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } }
@Test public void testGetOldEntitiesWithOutDomainId() throws Exception { TimelineEntities entities = dataManaer.getEntities( "OLD_ENTITY_TYPE_1", null, null, null, null, null, null, null, null, UserGroupInformation.getCurrentUser()); Assert.assertEquals(2, entities.getEntities().size()); Assert.assertEquals("OLD_ENTITY_ID_2", entities.getEntities().get(0).getEntityId()); Assert.assertEquals("OLD_ENTITY_TYPE_1", entities.getEntities().get(0).getEntityType()); Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID, entities.getEntities().get(0).getDomainId()); Assert.assertEquals("OLD_ENTITY_ID_1", entities.getEntities().get(1).getEntityId()); Assert.assertEquals("OLD_ENTITY_TYPE_1", entities.getEntities().get(1).getEntityType()); Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID, entities.getEntities().get(1).getDomainId()); }
@Test public void testFromId() throws Exception { WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .path("type_1").queryParam("fromId", "id_2") .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(2, response.getEntity(TimelineEntities.class).getEntities() .size()); response = r.path("ws").path("v1").path("timeline") .path("type_1").queryParam("fromId", "id_1") .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(3, response.getEntity(TimelineEntities.class).getEntities() .size()); }
@Test public void testPostEntitiesWithPrimaryFilter() throws Exception { TimelineEntities entities = new TimelineEntities(); TimelineEntity entity = new TimelineEntity(); Map<String, Set<Object>> filters = new HashMap<String, Set<Object>>(); filters.put(TimelineStore.SystemFilter.ENTITY_OWNER.toString(), new HashSet<Object>()); entity.setPrimaryFilters(filters); entity.setEntityId("test id 6"); entity.setEntityType("test type 6"); entity.setStartTime(System.currentTimeMillis()); entities.addEntity(entity); WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .queryParam("user.name", "tester") .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, entities); TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class); Assert.assertEquals(0, putResposne.getErrors().size()); }
@Test public void testRelatingToNonExistingEntity() throws IOException { TimelineEntity entityToStore = new TimelineEntity(); entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); entityToStore.setEntityId("TEST_ENTITY_ID_1"); entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entityToStore); store.put(entities); TimelineEntity entityToGet = store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); Assert.assertNotNull(entityToGet); Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); Assert.assertEquals("TEST_ENTITY_TYPE_1", entityToGet.getRelatedEntities().keySet().iterator().next()); Assert.assertEquals("TEST_ENTITY_ID_1", entityToGet.getRelatedEntities().values().iterator().next() .iterator().next()); }
private static ClientResponse mockEntityClientResponse( TimelineWriter spyTimelineWriter, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { doThrow(new ClientHandlerException(new ConnectException())).when( spyTimelineWriter).doPostingObject( any(TimelineEntities.class), any(String.class)); return response; } doReturn(response).when(spyTimelineWriter) .doPostingObject(any(TimelineEntities.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); error.setEntityId("test entity id"); error.setEntityType("test entity type"); error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); TimelinePutResponse putResponse = new TimelinePutResponse(); if (hasError) { putResponse.addError(error); } when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); return response; }
@Override public Map<ApplicationId, ApplicationReport> getApplications(long appsNum, long appStartedTimeBegin, long appStartedTimeEnd) throws YarnException, IOException { TimelineEntities entities = timelineDataManager.getEntities( ApplicationMetricsConstants.ENTITY_TYPE, null, null, appStartedTimeBegin, appStartedTimeEnd, null, null, appsNum == Long.MAX_VALUE ? this.maxLoadedApplications : appsNum, EnumSet.allOf(Field.class), UserGroupInformation.getLoginUser()); Map<ApplicationId, ApplicationReport> apps = new LinkedHashMap<ApplicationId, ApplicationReport>(); if (entities != null && entities.getEntities() != null) { for (TimelineEntity entity : entities.getEntities()) { try { ApplicationReportExt app = generateApplicationReport(entity, ApplicationReportField.ALL); apps.put(app.appReport.getApplicationId(), app.appReport); } catch (Exception e) { LOG.error("Error on generating application report for " + entity.getEntityId(), e); } } } return apps; }
@Override public TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, CheckAcl checkAcl) throws IOException { if (primaryFilter == null) { // if no primary filter is specified, prefix the lookup with // ENTITY_ENTRY_PREFIX return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false); } else { // if a primary filter is specified, prefix the lookup with // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + // ENTITY_ENTRY_PREFIX byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName()) .add(fstConf.asByteArray(primaryFilter.getValue()), true) .getBytesForLookup(); return getEntityByTime(base, entityType, limit, windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, true); } }
/** * Store the given entities into the timeline store, and return the errors * that happen during storing. */ @POST @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) public TimelinePutResponse postEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, TimelineEntities entities) { init(res); UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; LOG.error(msg); throw new ForbiddenException(msg); } try { return timelineDataManager.postEntities(entities, callerUGI); } catch (BadRequestException bre) { throw bre; } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } }
@Test public void testPostIncompleteEntities() throws Exception { TimelineEntities entities = new TimelineEntities(); TimelineEntity entity1 = new TimelineEntity(); entity1.setEntityId("test id 1"); entity1.setEntityType("test type 1"); entity1.setStartTime(System.currentTimeMillis()); entity1.setDomainId("domain_id_1"); entities.addEntity(entity1); // Add an entity with no id or type. entities.addEntity(new TimelineEntity()); WebResource r = resource(); // One of the entities has no id or type. HTTP 400 will be returned ClientResponse response = r.path("ws").path("v1").path("timeline") .queryParam("user.name", "tester").accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON).post(ClientResponse.class, entities); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(ClientResponse.Status.BAD_REQUEST, response.getClientResponseStatus()); }
@Test public void testRelatingToEntityInSamePut() throws IOException { TimelineEntity entityToRelate = new TimelineEntity(); entityToRelate.setEntityType("TEST_ENTITY_TYPE_2"); entityToRelate.setEntityId("TEST_ENTITY_ID_2"); entityToRelate.setDomainId("TEST_DOMAIN"); TimelineEntity entityToStore = new TimelineEntity(); entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); entityToStore.setEntityId("TEST_ENTITY_ID_1"); entityToStore.setDomainId("TEST_DOMAIN"); entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entityToStore); entities.addEntity(entityToRelate); store.put(entities); TimelineEntity entityToGet = store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); Assert.assertNotNull(entityToGet); Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId()); Assert.assertEquals("TEST_ENTITY_TYPE_1", entityToGet.getRelatedEntities().keySet().iterator().next()); Assert.assertEquals("TEST_ENTITY_ID_1", entityToGet.getRelatedEntities().values().iterator().next() .iterator().next()); }
@Override public TimelineEntities getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) throws IOException { if (primaryFilter == null) { // if no primary filter is specified, prefix the lookup with // ENTITY_ENTRY_PREFIX return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields); } else { // if a primary filter is specified, prefix the lookup with // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + // ENTITY_ENTRY_PREFIX byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) .add(primaryFilter.getName()) .add(GenericObjectMapper.write(primaryFilter.getValue()), true) .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); return getEntityByTime(base, entityType, limit, windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields); } }