public void waitForInternalState(TaskAttemptImpl attempt, TaskAttemptStateInternal finalState) throws Exception { int timeoutSecs = 0; TaskAttemptReport report = attempt.getReport(); TaskAttemptStateInternal iState = attempt.getInternalState(); while (!finalState.equals(iState) && timeoutSecs++ < 20) { System.out.println("TaskAttempt Internal State is : " + iState + " Waiting for Internal state : " + finalState + " progress : " + report.getProgress()); Thread.sleep(500); report = attempt.getReport(); iState = attempt.getInternalState(); } System.out.println("TaskAttempt Internal State is : " + iState); Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)", finalState, iState); }
@Test public void testContainerPassThrough() throws Exception { MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true) { @Override protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher() { @Override public void handle(ContainerLauncherEvent event) { if (event instanceof ContainerRemoteLaunchEvent) { containerObtainedByContainerLauncher = ((ContainerRemoteLaunchEvent) event).getAllocatedContainer(); } super.handle(event); } }; }; }; Job job = app.submit(new Configuration()); app.waitForState(job, JobState.SUCCEEDED); app.verifyCompleted(); Collection<Task> tasks = job.getTasks().values(); Collection<TaskAttempt> taskAttempts = tasks.iterator().next().getAttempts().values(); TaskAttemptImpl taskAttempt = (TaskAttemptImpl) taskAttempts.iterator().next(); // Container from RM should pass through to the launcher. Container object // should be the same. Assert.assertTrue(taskAttempt.container == containerObtainedByContainerLauncher); }
@Test public void testTaskFailWithUnusedContainer() throws Exception { MRApp app = new MRAppWithFailingTaskAndUnusedContainer(); Configuration conf = new Configuration(); int maxAttempts = 1; conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); // disable uberization (requires entire job to be reattempted, so max for // subtask attempts is overridden to 1) conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); Map<TaskId, Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); app.waitForState(task, TaskState.SCHEDULED); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator() .next().getAttempts(); Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts .size()); TaskAttempt attempt = attempts.values().iterator().next(); app.waitForInternalState((TaskAttemptImpl) attempt, TaskAttemptStateInternal.ASSIGNED); app.getDispatcher().getEventHandler().handle( new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_CONTAINER_COMPLETED)); app.waitForState(job, JobState.FAILED); }
@Test(timeout = 15000) public void testSlowNM() throws Exception { conf = new Configuration(); int maxAttempts = 1; conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); // set timeout low for the test conf.setInt("yarn.rpc.nm-command-timeout", 3000); conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName()); YarnRPC rpc = YarnRPC.create(conf); String bindAddr = "localhost:0"; InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); NMTokenSecretManagerInNM tokenSecretManager = new NMTokenSecretManagerInNM(); MasterKey masterKey = Records.newRecord(MasterKey.class); masterKey.setBytes(ByteBuffer.wrap("key".getBytes())); tokenSecretManager.setMasterKey(masterKey); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "token"); server = rpc.getServer(ContainerManagementProtocol.class, new DummyContainerManager(), addr, conf, tokenSecretManager, 1); server.start(); MRApp app = new MRAppWithSlowNM(tokenSecretManager); try { Job job = app.submit(conf); app.waitForState(job, JobState.RUNNING); Map<TaskId, Task> tasks = job.getTasks(); Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); Task task = tasks.values().iterator().next(); app.waitForState(task, TaskState.SCHEDULED); Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator() .next().getAttempts(); Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts.size()); TaskAttempt attempt = attempts.values().iterator().next(); app.waitForInternalState((TaskAttemptImpl) attempt, TaskAttemptStateInternal.ASSIGNED); app.waitForState(job, JobState.FAILED); String diagnostics = attempt.getDiagnostics().toString(); LOG.info("attempt.getDiagnostics: " + diagnostics); Assert.assertTrue(diagnostics.contains("Container launch failed for " + "container_0_0000_01_000000 : ")); Assert .assertTrue(diagnostics .contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel")); } finally { server.stop(); app.stop(); } }