/** * Sleep for a random amount of time between a given positive range * * @param sleepTime * positive long range for times to choose * * @return output data on operation */ List<OperationOutput> run(Range<Long> sleepTime) { List<OperationOutput> out = super.run(null); try { if (sleepTime != null) { long sleepMs = getSleepTime(sleepTime); long startTime = Timer.now(); sleep(sleepMs); long elapsedTime = Timer.elapsed(startTime); out.add(new OperationOutput(OutputType.LONG, getType(), ReportWriter.OK_TIME_TAKEN, elapsedTime)); out.add(new OperationOutput(OutputType.LONG, getType(), ReportWriter.SUCCESSES, 1L)); } } catch (InterruptedException e) { out.add(new OperationOutput(OutputType.LONG, getType(), ReportWriter.FAILURES, 1L)); LOG.warn("Error with sleeping", e); } return out; }
@Override // Mapper public void map(Object key, Object value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { logAndSetStatus(reporter, "Running slive mapper for dummy key " + key + " and dummy value " + value); //Add taskID to randomSeed to deterministically seed rnd. Random rnd = config.getRandomSeed() != null ? new Random(this.taskId + config.getRandomSeed()) : new Random(); WeightSelector selector = new WeightSelector(config, rnd); long startTime = Timer.now(); long opAm = 0; long sleepOps = 0; int duration = getConfig().getDurationMilliseconds(); Range<Long> sleepRange = getConfig().getSleepRange(); Operation sleeper = null; if (sleepRange != null) { sleeper = new SleepOp(getConfig(), rnd); } while (Timer.elapsed(startTime) < duration) { try { logAndSetStatus(reporter, "Attempting to select operation #" + (opAm + 1)); int currElapsed = (int) (Timer.elapsed(startTime)); Operation op = selector.select(currElapsed, duration); if (op == null) { // no ops left break; } else { // got a good op ++opAm; runOperation(op, reporter, output, opAm); } // do a sleep?? if (sleeper != null) { // these don't count against the number of operations ++sleepOps; runOperation(sleeper, reporter, output, sleepOps); } } catch (Exception e) { logAndSetStatus(reporter, "Failed at running due to " + StringUtils.stringifyException(e)); if (getConfig().shouldExitOnFirstError()) { break; } } } // write out any accumulated mapper stats { long timeTaken = Timer.elapsed(startTime); OperationOutput opCount = new OperationOutput(OutputType.LONG, OP_TYPE, ReportWriter.OP_COUNT, opAm); output.collect(opCount.getKey(), opCount.getOutputValue()); OperationOutput overallTime = new OperationOutput(OutputType.LONG, OP_TYPE, ReportWriter.OK_TIME_TAKEN, timeTaken); output.collect(overallTime.getKey(), overallTime.getOutputValue()); logAndSetStatus(reporter, "Finished " + opAm + " operations in " + timeTaken + " milliseconds"); } }
@Override // Mapper public void map(Object key, Object value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { logAndSetStatus(reporter, "Running slive mapper for dummy key " + key + " and dummy value " + value); long startTime = Timer.now(); long opAm = 0; long sleepOps = 0; int duration = getConfig().getDurationMilliseconds(); Range<Long> sleepRange = getConfig().getSleepRange(); Operation sleeper = null; if (sleepRange != null) { sleeper = new SleepOp(getConfig(), rnd); } WeightSelector selector = getSelector(); while (Timer.elapsed(startTime) < duration) { try { logAndSetStatus(reporter, "Attempting to select operation #" + (opAm + 1)); int currElapsed = (int) (Timer.elapsed(startTime)); Operation op = selector.select(currElapsed, duration); if (op == null) { // no ops left break; } else { // got a good op ++opAm; runOperation(op, reporter, output, opAm); } // do a sleep?? if (sleeper != null) { // these don't count against the number of operations ++sleepOps; runOperation(sleeper, reporter, output, sleepOps); } } catch (Exception e) { logAndSetStatus(reporter, "Failed at running due to " + StringUtils.stringifyException(e)); if (getConfig().shouldExitOnFirstError()) { break; } } } // write out any accumulated mapper stats { long timeTaken = Timer.elapsed(startTime); OperationOutput opCount = new OperationOutput(OutputType.LONG, OP_TYPE, ReportWriter.OP_COUNT, opAm); output.collect(opCount.getKey(), opCount.getOutputValue()); OperationOutput overallTime = new OperationOutput(OutputType.LONG, OP_TYPE, ReportWriter.OK_TIME_TAKEN, timeTaken); output.collect(overallTime.getKey(), overallTime.getOutputValue()); logAndSetStatus(reporter, "Finished " + opAm + " operations in " + timeTaken + " milliseconds"); } }
/** * This run() method simply sets up the default output container and adds in a * data member to keep track of the number of operations that occurred * * @param fs * FileSystem object to perform operations with * * @return List of operation outputs to be collected and output in the overall * map reduce operation (or empty or null if none) */ List<OperationOutput> run(FileSystem fs) { List<OperationOutput> out = new LinkedList<OperationOutput>(); out.add(new OperationOutput(OutputType.LONG, getType(), ReportWriter.OP_COUNT, 1L)); return out; }