Java 类org.apache.hadoop.mapreduce.v2.api.records.Locality 实例源码

项目:hadoop    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:big-c    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hadoop-plus    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:FlexMap    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hops    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hadoop-TCP    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hardfs    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hadoop-on-lustre2    文件:TaskAttemptImpl.java   
private void computeRackAndLocality() {
  NodeId containerNodeId = container.getNodeId();
  nodeRackName = RackResolver.resolve(
      containerNodeId.getHost()).getNetworkLocation();

  locality = Locality.OFF_SWITCH;
  if (dataLocalHosts.size() > 0) {
    String cHost = resolveHost(containerNodeId.getHost());
    if (dataLocalHosts.contains(cHost)) {
      locality = Locality.NODE_LOCAL;
    }
  }
  if (locality == Locality.OFF_SWITCH) {
    if (dataLocalRacks.contains(nodeRackName)) {
      locality = Locality.RACK_LOCAL;
    }
  }
}
项目:hadoop    文件:TaskAttemptImpl.java   
public TaskAttemptImpl(TaskId taskId, int i, 
    EventHandler eventHandler,
    TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
    JobConf conf, String[] dataLocalHosts,
    Token<JobTokenIdentifier> jobToken,
    Credentials credentials, Clock clock,
    AppContext appContext) {
  oldJobId = TypeConverter.fromYarn(taskId.getJobId());
  this.conf = conf;
  this.clock = clock;
  attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
  attemptId.setTaskId(taskId);
  attemptId.setId(i);
  this.taskAttemptListener = taskAttemptListener;
  this.appContext = appContext;

  // Initialize reportedStatus
  reportedStatus = new TaskAttemptStatus();
  initTaskAttemptStatus(reportedStatus);

  ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  readLock = readWriteLock.readLock();
  writeLock = readWriteLock.writeLock();

  this.credentials = credentials;
  this.jobToken = jobToken;
  this.eventHandler = eventHandler;
  this.jobFile = jobFile;
  this.partition = partition;

  //TODO:create the resource reqt for this Task attempt
  this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
  this.resourceCapability.setMemory(
      getMemoryRequired(conf, taskId.getTaskType()));
  this.resourceCapability.setVirtualCores(
      getCpuRequired(conf, taskId.getTaskType()));
  this.resourceCapability.setGpuCores(
      getGpuRequired(conf, taskId.getTaskType()));

  this.dataLocalHosts = resolveHosts(dataLocalHosts);
  RackResolver.init(conf);
  this.dataLocalRacks = new HashSet<String>(); 
  for (String host : this.dataLocalHosts) {
    this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
  }

  locality = Locality.OFF_SWITCH;
  avataar = Avataar.VIRGIN;

  // This "this leak" is okay because the retained pointer is in an
  //  instance variable.
  stateMachine = stateMachineFactory.make(this);
}
项目:hadoop    文件:TaskAttemptImpl.java   
public Locality getLocality() {
  return locality;
}
项目:hadoop    文件:TaskAttemptImpl.java   
public void setLocality(Locality locality) {
  this.locality = locality;
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:hadoop    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}
项目:aliyun-oss-hadoop-fs    文件:TaskAttemptImpl.java   
public TaskAttemptImpl(TaskId taskId, int i, 
    EventHandler eventHandler,
    TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
    JobConf conf, String[] dataLocalHosts,
    Token<JobTokenIdentifier> jobToken,
    Credentials credentials, Clock clock,
    AppContext appContext) {
  oldJobId = TypeConverter.fromYarn(taskId.getJobId());
  this.conf = conf;
  this.clock = clock;
  attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
  attemptId.setTaskId(taskId);
  attemptId.setId(i);
  this.taskAttemptListener = taskAttemptListener;
  this.appContext = appContext;

  // Initialize reportedStatus
  reportedStatus = new TaskAttemptStatus();
  initTaskAttemptStatus(reportedStatus);

  ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  readLock = readWriteLock.readLock();
  writeLock = readWriteLock.writeLock();

  this.credentials = credentials;
  this.jobToken = jobToken;
  this.eventHandler = eventHandler;
  this.jobFile = jobFile;
  this.partition = partition;

  //TODO:create the resource reqt for this Task attempt
  this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
  this.resourceCapability.setMemory(
      getMemoryRequired(conf, taskId.getTaskType()));
  this.resourceCapability.setVirtualCores(
      getCpuRequired(conf, taskId.getTaskType()));

  this.dataLocalHosts = resolveHosts(dataLocalHosts);
  RackResolver.init(conf);
  this.dataLocalRacks = new HashSet<String>(); 
  for (String host : this.dataLocalHosts) {
    this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
  }

  locality = Locality.OFF_SWITCH;
  avataar = Avataar.VIRGIN;

  // This "this leak" is okay because the retained pointer is in an
  //  instance variable.
  stateMachine = stateMachineFactory.make(this);
}
项目:aliyun-oss-hadoop-fs    文件:TaskAttemptImpl.java   
public Locality getLocality() {
  return locality;
}
项目:aliyun-oss-hadoop-fs    文件:TaskAttemptImpl.java   
public void setLocality(Locality locality) {
  this.locality = locality;
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:aliyun-oss-hadoop-fs    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);
  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}
项目:big-c    文件:TaskAttemptImpl.java   
public TaskAttemptImpl(TaskId taskId, int i, 
    EventHandler eventHandler,
    TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
    JobConf conf, String[] dataLocalHosts,
    Token<JobTokenIdentifier> jobToken,
    Credentials credentials, Clock clock,
    AppContext appContext) {
  oldJobId = TypeConverter.fromYarn(taskId.getJobId());
  this.conf = conf;
  this.clock = clock;
  attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
  attemptId.setTaskId(taskId);
  attemptId.setId(i);
  this.taskAttemptListener = taskAttemptListener;
  this.appContext = appContext;

  // Initialize reportedStatus
  reportedStatus = new TaskAttemptStatus();
  initTaskAttemptStatus(reportedStatus);

  ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  readLock = readWriteLock.readLock();
  writeLock = readWriteLock.writeLock();

  this.credentials = credentials;
  this.jobToken = jobToken;
  this.eventHandler = eventHandler;
  this.jobFile = jobFile;
  this.partition = partition;

  //TODO:create the resource reqt for this Task attempt
  this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
  this.resourceCapability.setMemory(
      getMemoryRequired(conf, taskId.getTaskType()));
  this.resourceCapability.setVirtualCores(
      getCpuRequired(conf, taskId.getTaskType()));

  this.dataLocalHosts = resolveHosts(dataLocalHosts);
  RackResolver.init(conf);
  this.dataLocalRacks = new HashSet<String>(); 
  for (String host : this.dataLocalHosts) {
    this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
  }

  locality = Locality.OFF_SWITCH;
  avataar = Avataar.VIRGIN;

  // This "this leak" is okay because the retained pointer is in an
  //  instance variable.
  stateMachine = stateMachineFactory.make(this);
}
项目:big-c    文件:TaskAttemptImpl.java   
public Locality getLocality() {
  return locality;
}
项目:big-c    文件:TaskAttemptImpl.java   
public void setLocality(Locality locality) {
  this.locality = locality;
}
项目:big-c    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:big-c    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:big-c    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TaskAttemptImpl.java   
public TaskAttemptImpl(TaskId taskId, int i, 
    EventHandler eventHandler,
    TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
    JobConf conf, String[] dataLocalHosts,
    Token<JobTokenIdentifier> jobToken,
    Credentials credentials, Clock clock,
    AppContext appContext) {
  oldJobId = TypeConverter.fromYarn(taskId.getJobId());
  this.conf = conf;
  this.clock = clock;
  attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
  attemptId.setTaskId(taskId);
  attemptId.setId(i);
  this.taskAttemptListener = taskAttemptListener;
  this.appContext = appContext;

  // Initialize reportedStatus
  reportedStatus = new TaskAttemptStatus();
  initTaskAttemptStatus(reportedStatus);

  ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  readLock = readWriteLock.readLock();
  writeLock = readWriteLock.writeLock();

  this.credentials = credentials;
  this.jobToken = jobToken;
  this.eventHandler = eventHandler;
  this.jobFile = jobFile;
  this.partition = partition;

  //TODO:create the resource reqt for this Task attempt
  this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
  this.resourceCapability.setMemory(
      getMemoryRequired(conf, taskId.getTaskType()));
  this.resourceCapability.setVirtualCores(
      getCpuRequired(conf, taskId.getTaskType()));

  this.dataLocalHosts = resolveHosts(dataLocalHosts);
  RackResolver.init(conf);
  this.dataLocalRacks = new HashSet<String>(); 
  for (String host : this.dataLocalHosts) {
    this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
  }

  locality = Locality.OFF_SWITCH;
  avataar = Avataar.VIRGIN;

  // This "this leak" is okay because the retained pointer is in an
  //  instance variable.
  stateMachine = stateMachineFactory.make(this);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TaskAttemptImpl.java   
public Locality getLocality() {
  return locality;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TaskAttemptImpl.java   
public void setLocality(Locality locality) {
  this.locality = locality;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        new Token(), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}
项目:hadoop-plus    文件:TaskAttemptImpl.java   
public TaskAttemptImpl(TaskId taskId, int i, 
    EventHandler eventHandler,
    TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
    JobConf conf, String[] dataLocalHosts,
    Token<JobTokenIdentifier> jobToken,
    Credentials credentials, Clock clock,
    AppContext appContext) {
  oldJobId = TypeConverter.fromYarn(taskId.getJobId());
  this.conf = conf;
  this.clock = clock;
  attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
  attemptId.setTaskId(taskId);
  attemptId.setId(i);
  this.taskAttemptListener = taskAttemptListener;
  this.appContext = appContext;

  // Initialize reportedStatus
  reportedStatus = new TaskAttemptStatus();
  initTaskAttemptStatus(reportedStatus);

  ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  readLock = readWriteLock.readLock();
  writeLock = readWriteLock.writeLock();

  this.credentials = credentials;
  this.jobToken = jobToken;
  this.eventHandler = eventHandler;
  this.jobFile = jobFile;
  this.partition = partition;

  //TODO:create the resource reqt for this Task attempt
  this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
  this.resourceCapability.setMemory(
      getMemoryRequired(conf, taskId.getTaskType()));
  this.resourceCapability.setVirtualCores(
      getCpuRequired(conf, taskId.getTaskType()));
  /*
   * @author Tim and Lism
   * @date 2013-10-16
   * @description Set GPU resource requirement from configuration file
   */
  this.resourceCapability.setGPUCores(getGpuRequired(conf,taskId.getTaskType()));
  this.resourceCapability.setGPUId(null);

  this.dataLocalHosts = resolveHosts(dataLocalHosts);
  RackResolver.init(conf);
  this.dataLocalRacks = new HashSet<String>(); 
  for (String host : this.dataLocalHosts) {
    this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
  }

  locality = Locality.OFF_SWITCH;
  avataar = Avataar.VIRGIN;

  // This "this leak" is okay because the retained pointer is in an
  //  instance variable.
  stateMachine = stateMachineFactory.make(this);
}
项目:hadoop-plus    文件:TaskAttemptImpl.java   
public Locality getLocality() {
  return locality;
}
项目:hadoop-plus    文件:TaskAttemptImpl.java   
public void setLocality(Locality locality) {
  this.locality = locality;
}
项目:hadoop-plus    文件:TestTaskAttempt.java   
@Test
public void testLaunchFailedWhileKilling() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        mock(Token.class), new Credentials(),
        new SystemClock(), null);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_KILL));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
  assertFalse(eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local node", 
      Locality.NODE_LOCAL, taImpl.getLocality());
}
项目:hadoop-plus    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileRunning() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        mock(Token.class), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.2", 0);
  ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  assertEquals("Task attempt is not in running state", taImpl.getState(),
      TaskAttemptState.RUNNING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is not assigned on the local rack",
      Locality.RACK_LOCAL, taImpl.getLocality());
}
项目:hadoop-plus    文件:TestTaskAttempt.java   
@Test
public void testContainerCleanedWhileCommitting() throws Exception {
  ApplicationId appId = ApplicationId.newInstance(1, 2);
  ApplicationAttemptId appAttemptId =
    ApplicationAttemptId.newInstance(appId, 0);
  JobId jobId = MRBuilderUtils.newJobId(appId, 1);
  TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
  TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
  Path jobFile = mock(Path.class);

  MockEventHandler eventHandler = new MockEventHandler();
  TaskAttemptListener taListener = mock(TaskAttemptListener.class);
  when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));

  JobConf jobConf = new JobConf();
  jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
  jobConf.setBoolean("fs.file.impl.disable.cache", true);
  jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
  jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");

  TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
  when(splits.getLocations()).thenReturn(new String[] {});

  AppContext appCtx = mock(AppContext.class);
  ClusterInfo clusterInfo = mock(ClusterInfo.class);
  Resource resource = mock(Resource.class);
  when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
  when(resource.getMemory()).thenReturn(1024);

  TaskAttemptImpl taImpl =
    new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
        splits, jobConf, taListener,
        mock(Token.class), new Credentials(),
        new SystemClock(), appCtx);

  NodeId nid = NodeId.newInstance("127.0.0.1", 0);
  ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
  Container container = mock(Container.class);
  when(container.getId()).thenReturn(contId);
  when(container.getNodeId()).thenReturn(nid);
  when(container.getNodeHttpAddress()).thenReturn("localhost:0");

  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_SCHEDULE));
  taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
      container, mock(Map.class)));
  taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_COMMIT_PENDING));

  assertEquals("Task attempt is not in commit pending state", taImpl.getState(),
      TaskAttemptState.COMMIT_PENDING);
  taImpl.handle(new TaskAttemptEvent(attemptId,
      TaskAttemptEventType.TA_CONTAINER_CLEANED));
  assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
      eventHandler.internalError);
  assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
      taImpl.getLocality());
}