@Test public void testPostEntitiesWithError() throws Exception { mockEntityClientResponse(client, ClientResponse.Status.OK, true, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(1, response.getErrors().size()); Assert.assertEquals("test entity id", response.getErrors().get(0) .getEntityId()); Assert.assertEquals("test entity type", response.getErrors().get(0) .getEntityType()); Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, response.getErrors().get(0).getErrorCode()); } catch (YarnException e) { Assert.fail("Exception is not expected"); } }
private static ClientResponse mockEntityClientResponse( TimelineClientImpl client, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { doThrow(new ClientHandlerException(new ConnectException())).when(client) .doPostingObject(any(TimelineEntities.class), any(String.class)); return response; } doReturn(response).when(client) .doPostingObject(any(TimelineEntities.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); error.setEntityId("test entity id"); error.setEntityType("test entity type"); error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); TimelinePutResponse putResponse = new TimelinePutResponse(); if (hasError) { putResponse.addError(error); } when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); return response; }
/** * Store the given entities into the timeline store, and return the errors * that happen during storing. */ @POST @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) public TimelinePutResponse postEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, TimelineEntities entities) { init(res); UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; LOG.error(msg); throw new ForbiddenException(msg); } try { return timelineDataManager.postEntities(entities, callerUGI); } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } }
@Test public void testPutTimelineEntities() throws Exception { KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { @Override public Void call() throws Exception { TimelineClient client = createTimelineClientForUGI(); TimelineEntity entityToStore = new TimelineEntity(); entityToStore.setEntityType( TestTimelineAuthenticationFilter.class.getName()); entityToStore.setEntityId("entity1"); entityToStore.setStartTime(0L); TimelinePutResponse putResponse = client.putEntities(entityToStore); Assert.assertEquals(0, putResponse.getErrors().size()); TimelineEntity entityToRead = testTimelineServer.getTimelineStore().getEntity( "entity1", TestTimelineAuthenticationFilter.class.getName(), null); Assert.assertNotNull(entityToRead); return null; } }); }
@Test public void testPostEntitiesWithPrimaryFilter() throws Exception { TimelineEntities entities = new TimelineEntities(); TimelineEntity entity = new TimelineEntity(); Map<String, Set<Object>> filters = new HashMap<String, Set<Object>>(); filters.put(TimelineStore.SystemFilter.ENTITY_OWNER.toString(), new HashSet<Object>()); entity.setPrimaryFilters(filters); entity.setEntityId("test id 6"); entity.setEntityType("test type 6"); entity.setStartTime(System.currentTimeMillis()); entities.addEntity(entity); WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .queryParam("user.name", "tester") .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, entities); TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class); Assert.assertEquals(0, putResposne.getErrors().size()); }
@Test public void testPostEntitiesWithError() throws Exception { mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(1, response.getErrors().size()); Assert.assertEquals("test entity id", response.getErrors().get(0) .getEntityId()); Assert.assertEquals("test entity type", response.getErrors().get(0) .getEntityType()); Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, response.getErrors().get(0).getErrorCode()); } catch (YarnException e) { Assert.fail("Exception is not expected"); } }
private static ClientResponse mockEntityClientResponse( TimelineWriter spyTimelineWriter, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { doThrow(new ClientHandlerException(new ConnectException())).when( spyTimelineWriter).doPostingObject( any(TimelineEntities.class), any(String.class)); return response; } doReturn(response).when(spyTimelineWriter) .doPostingObject(any(TimelineEntities.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); error.setEntityId("test entity id"); error.setEntityType("test entity type"); error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); TimelinePutResponse putResponse = new TimelinePutResponse(); if (hasError) { putResponse.addError(error); } when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); return response; }
/** * Store the given entities into the timeline store, and return the errors * that happen during storing. */ @POST @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) public TimelinePutResponse postEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, TimelineEntities entities) { init(res); UserGroupInformation callerUGI = getUser(req); if (callerUGI == null) { String msg = "The owner of the posted timeline entities is not set"; LOG.error(msg); throw new ForbiddenException(msg); } try { return timelineDataManager.postEntities(entities, callerUGI); } catch (BadRequestException bre) { throw bre; } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); } }
@Test public void testPostEntitiesTimelineServiceDefaultNotEnabled() throws Exception { YarnConfiguration conf = new YarnConfiguration(); // Unset the timeline service's enabled properties. // Make sure default value is pickup up conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED); TimelineClientImpl client = createTimelineClient(conf); mockEntityClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(0, response.getErrors().size()); } catch (YarnException e) { Assert .fail("putEntities should already return before throwing the exception"); } }
@Test public void testPutTimelineEntities() throws Exception { KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() { @Override public Void call() throws Exception { TimelineEntity entityToStore = new TimelineEntity(); entityToStore.setEntityType( TestTimelineAuthenticationFilter.class.getName()); entityToStore.setEntityId("entity1"); entityToStore.setStartTime(0L); TimelinePutResponse putResponse = client.putEntities(entityToStore); Assert.assertEquals(0, putResponse.getErrors().size()); TimelineEntity entityToRead = testTimelineServer.getTimelineStore().getEntity( "entity1", TestTimelineAuthenticationFilter.class.getName(), null); Assert.assertNotNull(entityToRead); return null; } }); }
public static ClientResponse mockEntityClientResponse( TimelineWriter spyTimelineWriter, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { doThrow(new ClientHandlerException(new ConnectException())).when( spyTimelineWriter).doPostingObject( any(TimelineEntities.class), any(String.class)); return response; } doReturn(response).when(spyTimelineWriter) .doPostingObject(any(TimelineEntities.class), any(String.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); error.setEntityId("test entity id"); error.setEntityType("test entity type"); error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); TimelinePutResponse putResponse = new TimelinePutResponse(); if (hasError) { putResponse.addError(error); } when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); return response; }
@Private @VisibleForTesting public void putEntity(TimelineEntity entity) { try { if (LOG.isDebugEnabled()) { LOG.debug("Publishing the entity " + entity.getEntityId() + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); } TimelinePutResponse response = client.putEntities(entity); List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); if (errors.size() == 0) { LOG.debug("Timeline entities are successfully put"); } else { for (TimelinePutResponse.TimelinePutError error : errors) { LOG.error( "Error when publishing entity [" + error.getEntityType() + "," + error.getEntityId() + "], server side error code: " + error.getErrorCode()); } } } catch (Exception e) { LOG.error("Error when publishing entity [" + entity.getEntityType() + "," + entity.getEntityId() + "]", e); } }
private void publishApplicationAttemptEvent( final TimelineClient timelineClient, String appAttemptId, DSEvent appEvent, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(appAttemptId); entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); entity.setDomainId(domainId); entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setEventType(appEvent.toString()); event.setTimestamp(System.currentTimeMillis()); entity.addEvent(event); try { TimelinePutResponse response = timelineClient.putEntities(entity); processTimelineResponseErrors(response); } catch (YarnException | IOException | ClientHandlerException e) { LOG.error("App Attempt " + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") + " event could not be published for " + appAttemptId.toString(), e); } }
@Test public void testPostEntitiesWithError() throws Exception { mockClientResponse(client, ClientResponse.Status.OK, true, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(1, response.getErrors().size()); Assert.assertEquals("test entity id", response.getErrors().get(0) .getEntityId()); Assert.assertEquals("test entity type", response.getErrors().get(0) .getEntityType()); Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, response.getErrors().get(0).getErrorCode()); } catch (YarnException e) { Assert.fail("Exception is not expected"); } }
private static ClientResponse mockClientResponse(TimelineClientImpl client, ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { ClientResponse response = mock(ClientResponse.class); if (hasRuntimeError) { doThrow(new ClientHandlerException(new ConnectException())).when(client) .doPostingEntities(any(TimelineEntities.class)); return response; } doReturn(response).when(client) .doPostingEntities(any(TimelineEntities.class)); when(response.getClientResponseStatus()).thenReturn(status); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); error.setEntityId("test entity id"); error.setEntityType("test entity type"); error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); TimelinePutResponse putResponse = new TimelinePutResponse(); if (hasError) { putResponse.addError(error); } when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); return response; }
private void logEntity(TimelineEntityGroupId groupId, TimelineEntity entity, String domainId) { if (historyACLPolicyManager != null && domainId != null && !domainId.isEmpty()) { historyACLPolicyManager.updateTimelineEntityDomain(entity, domainId); } try { TimelinePutResponse response = timelineClient.putEntities( appContext.getApplicationAttemptId(), groupId, entity); if (response != null && !response.getErrors().isEmpty()) { int count = response.getErrors().size(); for (int i = 0; i < count; ++i) { TimelinePutError err = response.getErrors().get(i); if (err.getErrorCode() != 0) { LOG.warn("Could not post history event to ATS" + ", atsPutError=" + err.getErrorCode() + ", entityId=" + err.getEntityId()); } } } // Do nothing additional, ATS client library should handle throttling // or auto-disable as needed } catch (Exception e) { LOG.warn("Could not handle history events", e); } }
@Override public TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException { TimelineEntities entitiesContainer = new TimelineEntities(); entitiesContainer.addEntities(Arrays.asList(entities)); ClientResponse resp = doPosting(entitiesContainer, null); return resp.getEntity(TimelinePutResponse.class); }
@Test public void testPostEntities() throws Exception { mockEntityClientResponse(client, ClientResponse.Status.OK, false, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(0, response.getErrors().size()); } catch (YarnException e) { Assert.fail("Exception is not expected"); } }
@Test public void testPutEntities() throws Exception { TestTimelineClient client = new TestTimelineClient(); try { client.init(conf); client.start(); TimelineEntity expectedEntity = new TimelineEntity(); expectedEntity.setEntityType("test entity type"); expectedEntity.setEntityId("test entity id"); expectedEntity.setDomainId("test domain id"); TimelineEvent event = new TimelineEvent(); event.setEventType("test event type"); event.setTimestamp(0L); expectedEntity.addEvent(event); TimelinePutResponse response = client.putEntities(expectedEntity); Assert.assertEquals(0, response.getErrors().size()); Assert.assertTrue(client.resp.toString().contains("https")); TimelineEntity actualEntity = store.getEntity( expectedEntity.getEntityId(), expectedEntity.getEntityType(), EnumSet.allOf(Field.class)); Assert.assertNotNull(actualEntity); Assert.assertEquals( expectedEntity.getEntityId(), actualEntity.getEntityId()); Assert.assertEquals( expectedEntity.getEntityType(), actualEntity.getEntityType()); } finally { client.stop(); client.close(); } }
@Test public void testPostEntities() throws Exception { TimelineEntities entities = new TimelineEntities(); TimelineEntity entity = new TimelineEntity(); entity.setEntityId("test id 1"); entity.setEntityType("test type 1"); entity.setStartTime(System.currentTimeMillis()); entity.setDomainId("domain_id_1"); entities.addEntity(entity); WebResource r = resource(); // No owner, will be rejected ClientResponse response = r.path("ws").path("v1").path("timeline") .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, entities); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); assertEquals(ClientResponse.Status.FORBIDDEN, response.getClientResponseStatus()); response = r.path("ws").path("v1").path("timeline") .queryParam("user.name", "tester") .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, entities); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class); Assert.assertNotNull(putResposne); Assert.assertEquals(0, putResposne.getErrors().size()); // verify the entity exists in the store response = r.path("ws").path("v1").path("timeline") .path("test type 1").path("test id 1") .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); entity = response.getEntity(TimelineEntity.class); Assert.assertNotNull(entity); Assert.assertEquals("test id 1", entity.getEntityId()); Assert.assertEquals("test type 1", entity.getEntityType()); }
@Test public void testPostEntitiesToDefaultDomain() throws Exception { AdminACLsManager oldAdminACLsManager = timelineACLsManager.setAdminACLsManager(adminACLsManager); try { TimelineEntities entities = new TimelineEntities(); TimelineEntity entity = new TimelineEntity(); entity.setEntityId("test id 7"); entity.setEntityType("test type 7"); entity.setStartTime(System.currentTimeMillis()); entities.addEntity(entity); WebResource r = resource(); ClientResponse response = r.path("ws").path("v1").path("timeline") .queryParam("user.name", "anybody_1") .accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, entities); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); TimelinePutResponse putResposne = response.getEntity(TimelinePutResponse.class); Assert.assertNotNull(putResposne); Assert.assertEquals(0, putResposne.getErrors().size()); // verify the entity exists in the store response = r.path("ws").path("v1").path("timeline") .path("test type 7").path("test id 7") .queryParam("user.name", "any_body_2") .accept(MediaType.APPLICATION_JSON) .get(ClientResponse.class); assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType()); entity = response.getEntity(TimelineEntity.class); Assert.assertNotNull(entity); Assert.assertEquals("test id 7", entity.getEntityId()); Assert.assertEquals("test type 7", entity.getEntityType()); Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId()); } finally { timelineACLsManager.setAdminACLsManager(oldAdminACLsManager); } }
private static void publishContainerStartEvent( final TimelineClient timelineClient, Container container, String domainId, UserGroupInformation ugi) { final TimelineEntity entity = new TimelineEntity(); entity.setEntityId(container.getId().toString()); entity.setEntityType(DSEntity.DS_CONTAINER.toString()); entity.setDomainId(domainId); entity.addPrimaryFilter("user", ugi.getShortUserName()); TimelineEvent event = new TimelineEvent(); event.setTimestamp(System.currentTimeMillis()); event.setEventType(DSEvent.DS_CONTAINER_START.toString()); event.addEventInfo("Node", container.getNodeId().toString()); event.addEventInfo("Resources", container.getResource().toString()); entity.addEvent(event); try { ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { @Override public TimelinePutResponse run() throws Exception { return timelineClient.putEntities(entity); } }); } catch (Exception e) { LOG.error("Container start event could not be published for " + container.getId().toString(), e instanceof UndeclaredThrowableException ? e.getCause() : e); } }
public TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException { TimelineEntities entitiesContainer = new TimelineEntities(); for (TimelineEntity entity : entities) { if (entity.getEntityId() == null || entity.getEntityType() == null) { throw new YarnException("Incomplete entity without entity id/type"); } entitiesContainer.addEntity(entity); } ClientResponse resp = doPosting(entitiesContainer, null); return resp.getEntity(TimelinePutResponse.class); }
@Override public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, TimelineEntity... entities) throws IOException, YarnException { if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) { throw new YarnException( "This API is not supported under current Timeline Service Version: " + timelineServiceVersion); } return timelineWriter.putEntities(appAttemptId, groupId, entities); }
@Test public void testPostEntities() throws Exception { mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false, false); try { TimelinePutResponse response = client.putEntities(generateEntity()); Assert.assertEquals(0, response.getErrors().size()); } catch (YarnException e) { Assert.fail("Exception is not expected"); } }