@Override protected Task createRemoteTask() { TaskSplitIndex splitIndex[] = new TaskSplitIndex[splitInfos.length]; int i=0; for(TaskSplitMetaInfo splitInfo:splitInfos){ splitIndex[i] = splitInfo.getSplitIndex(); i++; } MapTask mapTask = new MultiMapTask("", TypeConverter.fromYarn(getID()), partition,splitIndex, 1); // YARN doesn't have the concept of slots per task, set it as 1. //new MultiMapTask(); mapTask.setUser(conf.get(MRJobConfig.USER_NAME)); mapTask.setConf(conf); mapTask.setTaskType(TaskType.MULTI_MAP); return mapTask; }
@Override public void readFields(DataInput in) throws IOException { //((Task)this).readFields(in); super.readFields(in); if (isMapOrReduce()) { int splitLength=in.readInt(); LOG.info("serial write:splitlenth"+splitLength); splitMetaInfos = new TaskSplitIndex[splitLength]; for(int i=0;i<splitLength;i++){ splitMetaInfos[i]=new TaskSplitIndex(); splitMetaInfos[i].readFields(in); LOG.info("serial read"+splitMetaInfos[i].toString()); } } }
@Override public void write(DataOutput out) throws IOException { super.write(out); if (isMapOrReduce()) { if (splitMetaInfo != null) { splitMetaInfo.write(out); } else { new TaskSplitIndex().write(out); } //TODO do we really need to set this to null? splitMetaInfo = null; } }
public MultiMapTask(String jobFile, TaskAttemptID taskId, int partition, TaskSplitIndex[] splitIndex, int numSlotsRequired){ super(jobFile,taskId,partition,splitIndex[0],numSlotsRequired); this.splitMetaInfos=splitIndex; }
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException, ClassNotFoundException { InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); RecordReader<INKEY,INVALUE> in = isSkipping() ? new SkippingRecordReader<INKEY,INVALUE>(inputSplit, umbilical, reporter) : new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, reporter); job.setBoolean("mapred.skip.on", isSkipping()); int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = new MapOutputBuffer(umbilical, job, reporter); } else { collector = new DirectMapOutputCollector(umbilical, job, reporter); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); } finally { //close in.close(); // close input collector.close(); } }
@SuppressWarnings("unchecked") public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk( TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter) throws IOException { Path file = new Path(splitMetaInfo.getSplitLocation()); long offset = splitMetaInfo.getStartOffset(); // Split information read from local filesystem. FileSystem fs = FileSystem.getLocal(jobConf); file = fs.makeQualified(file); LOG.info("Reading input split file from : " + file); FSDataInputStream inFile = fs.open(file); inFile.seek(offset); String className = Text.readString(inFile); Class<org.apache.hadoop.mapreduce.InputSplit> cls; try { cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className); } catch (ClassNotFoundException ce) { IOException wrap = new IOException("Split class " + className + " not found"); wrap.initCause(ce); throw wrap; } SerializationFactory factory = new SerializationFactory(jobConf); Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory .getDeserializer(cls); deserializer.open(inFile); org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null); long pos = inFile.getPos(); if (splitBytesCounter != null) { splitBytesCounter.increment(pos - offset); } inFile.close(); return split; }
@SuppressWarnings("unchecked") public static InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo, JobConf jobConf, TezCounter splitBytesCounter) throws IOException { Path file = new Path(splitMetaInfo.getSplitLocation()); FileSystem fs = FileSystem.getLocal(jobConf); file = fs.makeQualified(file); LOG.info("Reading input split file from : " + file); long offset = splitMetaInfo.getStartOffset(); FSDataInputStream inFile = fs.open(file); inFile.seek(offset); String className = Text.readString(inFile); Class<org.apache.hadoop.mapred.InputSplit> cls; try { cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className); } catch (ClassNotFoundException ce) { IOException wrap = new IOException("Split class " + className + " not found"); wrap.initCause(ce); throw wrap; } SerializationFactory factory = new SerializationFactory(jobConf); Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory .getDeserializer(cls); deserializer.open(inFile); org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null); long pos = inFile.getPos(); if (splitBytesCounter != null) { splitBytesCounter.increment(pos - offset); } inFile.close(); return split; }
@Private void initializeInternal() throws IOException { // Primarily for visibility rrLock.lock(); try { if (splitInfoViaEvents) { if (useNewApi) { mrReader = new MRReaderMapReduce(jobConf, getContext().getCounters(), inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(), getContext() .getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext() .getTaskIndex(), getContext().getTaskAttemptNumber()); } else { mrReader = new MRReaderMapred(jobConf, getContext().getCounters(), inputRecordCounter); } } else { TaskSplitMetaInfo[] allMetaInfo = MRInputUtils.readSplits(jobConf); TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[getContext().getTaskIndex()]; TaskSplitIndex splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(), thisTaskMetaInfo.getStartOffset()); if (useNewApi) { org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInputUtils .getNewSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters() .findCounter(TaskCounter.SPLIT_RAW_BYTES)); mrReader = new MRReaderMapReduce(jobConf, newInputSplit, getContext().getCounters(), inputRecordCounter, getContext().getApplicationId().getClusterTimestamp(), getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), getContext().getTaskIndex(), getContext().getTaskAttemptNumber()); } else { org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInputUtils .getOldSplitDetailsFromDisk(splitMetaInfo, jobConf, getContext().getCounters() .findCounter(TaskCounter.SPLIT_RAW_BYTES)); mrReader = new MRReaderMapred(jobConf, oldInputSplit, getContext().getCounters(), inputRecordCounter); } } } finally { rrLock.unlock(); } LOG.info("Initialzed MRInput: " + getContext().getSourceVertexName()); }
public TestMapTask(String jobFile, TaskAttemptID taskId, int partition, TaskSplitIndex splitIndex, int numSlotsRequired) { super(jobFile, taskId, partition, splitIndex, numSlotsRequired); }
public MapTask(String jobFile, TaskAttemptID taskId, int partition, TaskSplitIndex splitIndex, int numSlotsRequired) { super(jobFile, taskId, partition, numSlotsRequired); this.splitMetaInfo = splitIndex; }
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException, ClassNotFoundException { InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); RecordReader<INKEY,INVALUE> in = isSkipping() ? new SkippingRecordReader<INKEY,INVALUE>(umbilical, reporter, job) : new TrackedRecordReader<INKEY,INVALUE>(reporter, job); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector<OUTKEY, OUTVALUE> collector = null; if (numReduceTasks > 0) { collector = createSortingCollector(job, reporter); } else { collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>(); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); collector.init(context); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { runner.run(in, new OldOutputCollector(collector, conf), reporter); mapPhase.complete(); // start the sort phase only if there are reducers if (numReduceTasks > 0) { setPhase(TaskStatus.Phase.SORT); } statusUpdate(umbilical); collector.flush(); in.close(); in = null; collector.close(); collector = null; } finally { closeQuietly(in); closeQuietly(collector); } }
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); try { input.initialize(split, mapperContext); mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }
/** * Main method. */ boolean run(String[] args) throws ClassNotFoundException, IOException, InterruptedException { if (args.length < 1) { System.out.println("Usage: IsolationRunner <path>/job.xml " + "<optional-user-name>"); return false; } File jobFilename = new File(args[0]); if (!jobFilename.exists() || !jobFilename.isFile()) { System.out.println(jobFilename + " is not a valid job file."); return false; } String user; if (args.length > 1) { user = args[1]; } else { user = UserGroupInformation.getCurrentUser().getShortUserName(); } JobConf conf = new JobConf(new Path(jobFilename.toString())); conf.setUser(user); TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id")); if (taskId == null) { System.out.println("mapred.task.id not found in configuration;" + " job.xml is not a task config"); } boolean isMap = conf.getBoolean("mapred.task.is.map", true); if (!isMap) { System.out.println("Only map tasks are supported."); return false; } int partition = conf.getInt("mapred.task.partition", 0); // setup the local and user working directories FileSystem local = FileSystem.getLocal(conf); LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); Path workDirName; boolean workDirExists = lDirAlloc.ifExists(MRConstants.WORKDIR, conf); if (workDirExists) { workDirName = TaskRunner.formWorkDir(lDirAlloc, conf); } else { workDirName = lDirAlloc.getLocalPathForWrite(MRConstants.WORKDIR, conf); } local.setWorkingDirectory(new Path(workDirName.toString())); FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory()); // set up a classloader with the right classpath ClassLoader classLoader = makeClassLoader(conf, new File(workDirName.toString())); Thread.currentThread().setContextClassLoader(classLoader); conf.setClassLoader(classLoader); // split.dta file is used only by IsolationRunner. The file can now be in // any of the configured local disks, so use LocalDirAllocator to find out // where it is. Path localMetaSplit = new LocalDirAllocator("mapred.local.dir").getLocalPathToRead( TaskTracker.getLocalSplitFile(conf.getUser(), taskId.getJobID() .toString(), taskId.toString()), conf); DataInputStream splitFile = FileSystem.getLocal(conf).open(localMetaSplit); TaskSplitIndex splitIndex = new TaskSplitIndex(); splitIndex.readFields(splitFile); splitFile.close(); Task task = new MapTask(jobFilename.toString(), taskId, partition, splitIndex, 1); task.setConf(conf); task.run(conf, new FakeUmbilical()); return true; }
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException, ClassNotFoundException { InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); updateJobWithSplit(job, inputSplit); reporter.setInputSplit(inputSplit); RecordReader<INKEY,INVALUE> rawIn = // open input job.getInputFormat().getRecordReader(inputSplit, job, reporter); RecordReader<INKEY,INVALUE> in = isSkipping() ? new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) : new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter); job.setBoolean("mapred.skip.on", isSkipping()); int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = createSortingCollector(job, reporter); } else { collector = new DirectMapOutputCollector<OUTKEY, OUTVALUE>(); MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); collector.init(context); } MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); } finally { //close in.close(); // close input collector.close(); } }
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (inputFormat.createRecordReader(split, taskContext), reporter); job.setBoolean("mapred.skip.on", isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); input.initialize(split, mapperContext); mapper.run(mapperContext); statusUpdate(umbilical); input.close(); output.close(mapperContext); }