Java 类org.apache.hadoop.mapred.JvmManager.JvmManagerForType.JvmRunner 实例源码
项目:mammoth
文件:JvmManager.java
private void spawnNewJvm(JobID jobId, JvmEnv env,
TaskRunner t) {
JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
//spawn the JVM in a new thread. Note that there will be very little
//extra overhead of launching the new thread for a new JVM since
//most of the cost is involved in launching the process. Moreover,
//since we are going to be using the JVM for running many tasks,
//the thread launch cost becomes trivial when amortized over all
//tasks. Doing it this way also keeps code simple.
jvmRunner.setDaemon(true);
jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
setRunningTaskForJvm(jvmRunner.jvmId, t);
LOG.info(jvmRunner.getName());
jvmRunner.start();
}
项目:mammoth
文件:JvmManager.java
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
throws IOException {
/*if (jvmToRunningTask.containsKey(jvmId)) {
//Incase of JVM reuse, tasks are returned to previously launched
//JVM via this method. However when a new task is launched
//the task being returned has to be initialized.
TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
Task task = taskRunner.getTaskInProgress().getTask();
jvmRunner.taskGiven(task);
return taskRunner.getTaskInProgress();
}*/
if (jvmToPendingTasks.containsKey(jvmId)) {
//Incase of JVM reuse, tasks are returned to previously launched
//JVM via this method. However when a new task is launched
//the task being returned has to be initialized.
List<TaskRunner> taskRunners = jvmToPendingTasks.get(jvmId);
if (taskRunners.size() == 0) {
return null;
}
JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
TaskRunner tr= taskRunners.remove(0);
if (!this.jvmToRunningTasks.containsKey(jvmId)) {
this.jvmToRunningTasks.put(jvmId, new ArrayList<TaskRunner>());
}
this.jvmToRunningTasks.get(jvmId).add(tr);
TaskInProgress tip = tr.getTaskInProgress();
jvmRunner.taskGiven(tip.getTask());
return tip;
}
return null;
}
项目:mammoth
文件:JvmManager.java
synchronized public void taskFinished(TaskRunner tr) {
JVMId jvmId = runningTaskToJvm.remove(tr);
if (jvmId != null) {
jvmToRunningTasks.get(jvmId).remove(tr);
if (jvmToRunningTasks.get(jvmId).size() == 0) {
jvmToRunningTasks.remove(jvmId);
}
JvmRunner jvmRunner;
if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
jvmRunner.taskRan( tr.getTask());
}
}
}
项目:mammoth
文件:JvmManager.java
synchronized public void killJvm(JVMId jvmId) throws IOException,
InterruptedException {
JvmRunner jvmRunner;
if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
killJvmRunner(jvmRunner);
}
}
项目:mammoth
文件:JvmManager.java
synchronized public void stop() throws IOException, InterruptedException {
//since the kill() method invoked later on would remove
//an entry from the jvmIdToRunner map, we create a
//copy of the values and iterate over it (if we don't
//make a copy, we will encounter concurrentModification
//exception
List <JvmRunner> list = new ArrayList<JvmRunner>();
list.addAll(jvmIdToRunner.values());
for (JvmRunner jvm : list) {
killJvmRunner(jvm);
}
}
项目:mammoth
文件:JvmManager.java
private synchronized void reapJvm(
TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
if (t.getTaskInProgress().wasKilled()) {
//the task was killed in-flight
//no need to do the rest of the operations
return;
}
//boolean spawnNewJvm = false;
JobID jobId = t.getTask().getJobID();
int numJvmsSpawned = getSpawnedJvmNum(t.getTask().isMapTask());
JvmRunner runnerToKill = null;
if ((t.getTask().isMapTask()&& numJvmsSpawned < maxMapJvms) ||
(!t.getTask().isMapTask()&& numJvmsSpawned < maxReduceJvms)) {
Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =
jvmIdToRunner.entrySet().iterator();
while (jvmIter.hasNext()) {
JvmRunner jvmRunner = jvmIter.next().getValue();
JobID jId = jvmRunner.jvmId.getJobId();
//look for a free JVM for this job; if one exists then just break
if (jId.equals(jobId)){
setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
LOG.info("No new JVM spawned for jobId/taskid: " +
jobId+"/"+t.getTask().getTaskID() +
". Attempting to reuse: " + jvmRunner.jvmId);
return;
}
}
spawnNewJvm(jobId, env, t);
return;
}
LOG.fatal("Inconsistent state!!! " +
"JVM Manager reached an unstable state " +
"while reaping a JVM for task: " + t.getTask().getTaskID()+
" " + getDetails() + ". Aborting. ");
System.exit(-1);
}
项目:mammoth
文件:JvmManager.java
public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
this.env = env;
this.jvmId = new JVMId(jobId, rand.nextInt());
this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
this.firstTask = firstTask;
LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
}
项目:mapreduce-fork
文件:TestJvmManager.java
/**
* Create a bunch of tasks and use a special hash map to detect
* racy access to the various internal data structures of JvmManager.
* (Regression test for MAPREDUCE-2224)
*/
@Test
public void testForRaces() throws Exception {
JvmManagerForType mapJvmManager = jvmManager
.getJvmManagerForType(TaskType.MAP);
// Sub out the HashMaps for maps that will detect racy access.
mapJvmManager.jvmToRunningTask = new RaceHashMap<JVMId, TaskRunner>();
mapJvmManager.runningTaskToJvm = new RaceHashMap<TaskRunner, JVMId>();
mapJvmManager.jvmIdToRunner = new RaceHashMap<JVMId, JvmRunner>();
// Launch a bunch of JVMs, but only allow MAP_SLOTS to run at once.
final ExecutorService exec = Executors.newFixedThreadPool(MAP_SLOTS);
final AtomicReference<Throwable> failed =
new AtomicReference<Throwable>();
for (int i = 0; i < MAP_SLOTS*5; i++) {
JobConf taskConf = new JobConf(ttConf);
TaskAttemptID attemptID = new TaskAttemptID("test", 0, TaskType.MAP, i, 0);
Task task = new MapTask(null, attemptID, i, null, 1);
task.setConf(taskConf);
TaskInProgress tip = tt.new TaskInProgress(task, taskConf);
File pidFile = new File(TEST_DIR, "pid_" + i);
final TaskRunner taskRunner = task.createRunner(tt, tip);
// launch a jvm which sleeps for 60 seconds
final Vector<String> vargs = new Vector<String>(2);
vargs.add(writeScript("script_" + i, "echo hi\n", pidFile).getAbsolutePath());
final File workDir = new File(TEST_DIR, "work_" + i);
workDir.mkdir();
final File stdout = new File(TEST_DIR, "stdout_" + i);
final File stderr = new File(TEST_DIR, "stderr_" + i);
// launch the process and wait in a thread, till it finishes
Runnable launcher = new Runnable() {
public void run() {
try {
taskRunner.launchJvmAndWait(null, vargs, stdout, stderr, 100,
workDir, null);
} catch (Throwable t) {
failed.compareAndSet(null, t);
exec.shutdownNow();
return;
}
}
};
exec.submit(launcher);
}
exec.shutdown();
exec.awaitTermination(3, TimeUnit.MINUTES);
if (failed.get() != null) {
throw new RuntimeException(failed.get());
}
}
项目:mammoth
文件:JvmManager.java
private synchronized void killJvmRunner(JvmRunner jvmRunner
) throws IOException,
InterruptedException {
jvmRunner.kill();
removeJvm(jvmRunner.jvmId);
}
项目:mammoth
文件:JvmManager.java
private synchronized void oldReapJvm(
TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
if (t.getTaskInProgress().wasKilled()) {
//the task was killed in-flight
//no need to do the rest of the operations
return;
}
boolean spawnNewJvm = false;
JobID jobId = t.getTask().getJobID();
//Check whether there is a free slot to start a new JVM.
//,or, Kill a (idle) JVM and launch a new one
//When this method is called, we *must*
// (1) spawn a new JVM (if we are below the max)
// (2) find an idle JVM (that belongs to the same job), or,
// (3) kill an idle JVM (from a different job)
// (the order of return is in the order above)
int numJvmsSpawned = getSpawnedJvmNum(t.getTask().isMapTask());
JvmRunner runnerToKill = null;
if ((t.getTask().isMapTask()&& numJvmsSpawned >= maxMapJvms) ||
(t.getTask().isMapTask()&& numJvmsSpawned >= maxReduceJvms)) {
//go through the list of JVMs for all jobs.
Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =
jvmIdToRunner.entrySet().iterator();
while (jvmIter.hasNext()) {
JvmRunner jvmRunner = jvmIter.next().getValue();
JobID jId = jvmRunner.jvmId.getJobId();
//look for a free JVM for this job; if one exists then just break
if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
LOG.info("No new JVM spawned for jobId/taskid: " +
jobId+"/"+t.getTask().getTaskID() +
". Attempting to reuse: " + jvmRunner.jvmId);
return;
}
//Cases when a JVM is killed:
// (1) the JVM under consideration belongs to the same job
// (passed in the argument). In this case, kill only when
// the JVM ran all the tasks it was scheduled to run (in terms
// of count).
// (2) the JVM under consideration belongs to a different job and is
// currently not busy
//But in both the above cases, we see if we can assign the current
//task to an idle JVM (hence we continue the loop even on a match)
if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
(!jId.equals(jobId) && !jvmRunner.isBusy())) {
runnerToKill = jvmRunner;
spawnNewJvm = true;
}
}
} else {
spawnNewJvm = true;
}
if (spawnNewJvm) {
if (runnerToKill != null) {
LOG.info("Killing JVM: " + runnerToKill.jvmId);
killJvmRunner(runnerToKill);
}
spawnNewJvm(jobId, env, t);
return;
}
//*MUST* never reach this
LOG.fatal("Inconsistent state!!! " +
"JVM Manager reached an unstable state " +
"while reaping a JVM for task: " + t.getTask().getTaskID()+
" " + getDetails() + ". Aborting. ");
System.exit(-1);
}