@Test public void testSelection() throws Exception { ConfigExtractor extractor = getTestConfig(false); WeightSelector selector = new WeightSelector(extractor, rnd); // should be 1 of each type - uniform int expected = OperationType.values().length; Operation op = null; Set<String> types = new HashSet<String>(); FileSystem fs = FileSystem.get(extractor.getConfig()); while (true) { op = selector.select(1, 1); if (op == null) { break; } // doesn't matter if they work or not op.run(fs); types.add(op.getType()); } assertEquals(types.size(), expected); }
@Test public void testArguments() throws Exception { ConfigExtractor extractor = getTestConfig(true); assertEquals(extractor.getOpCount().intValue(), Constants.OperationType .values().length); assertEquals(extractor.getMapAmount().intValue(), 2); assertEquals(extractor.getReducerAmount().intValue(), 2); Range<Long> apRange = extractor.getAppendSize(); assertEquals(apRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(apRange.getUpper().intValue(), Constants.MEGABYTES * 2); Range<Long> wRange = extractor.getWriteSize(); assertEquals(wRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(wRange.getUpper().intValue(), Constants.MEGABYTES * 2); Range<Long> trRange = extractor.getTruncateSize(); assertEquals(trRange.getLower().intValue(), 0); assertEquals(trRange.getUpper().intValue(), Constants.MEGABYTES * 1); Range<Long> bRange = extractor.getBlockSize(); assertEquals(bRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(bRange.getUpper().intValue(), Constants.MEGABYTES * 2); String resfile = extractor.getResultFile(); assertEquals(resfile, getResultFile().toString()); int durationMs = extractor.getDurationMilliseconds(); assertEquals(durationMs, 10 * 1000); }
/** * Selects an operation from the known operation set or returns null if none * are available by applying the weighting algorithms and then handing off the * weight operations to the selection object. * * @param elapsed * the currently elapsed time (milliseconds) of the running program * @param duration * the maximum amount of milliseconds of the running program * * @return operation or null if none left */ Operation select(int elapsed, int duration) { List<OperationWeight> validOps = new ArrayList<OperationWeight>(operations .size()); for (OperationType type : operations.keySet()) { OperationInfo opinfo = operations.get(type); if (opinfo == null || opinfo.amountLeft <= 0) { continue; } Weightable weighter = weights.get(opinfo.distribution); if (weighter != null) { OperationWeight weightOp = new OperationWeight(opinfo.operation, weighter.weight(elapsed, duration)); validOps.add(weightOp); } else { throw new RuntimeException("Unable to get weight for distribution " + opinfo.distribution); } } if (validOps.isEmpty()) { return null; } return getSelector().select(validOps); }
@Test public void testArguments() throws Exception { ConfigExtractor extractor = getTestConfig(true); assertEquals(extractor.getOpCount().intValue(), Constants.OperationType .values().length); assertEquals(extractor.getMapAmount().intValue(), 2); assertEquals(extractor.getReducerAmount().intValue(), 2); Range<Long> apRange = extractor.getAppendSize(); assertEquals(apRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(apRange.getUpper().intValue(), Constants.MEGABYTES * 2); Range<Long> wRange = extractor.getWriteSize(); assertEquals(wRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(wRange.getUpper().intValue(), Constants.MEGABYTES * 2); Range<Long> bRange = extractor.getBlockSize(); assertEquals(bRange.getLower().intValue(), Constants.MEGABYTES * 1); assertEquals(bRange.getUpper().intValue(), Constants.MEGABYTES * 2); String resfile = extractor.getResultFile(); assertEquals(resfile, getResultFile().toString()); int durationMs = extractor.getDurationMilliseconds(); assertEquals(durationMs, 10 * 1000); }
/** gets the test program arguments used for merging and main MR running */ private String[] getTestArgs(boolean sleep) { List<String> args = new LinkedList<String>(); // setup the options { args.add("-" + ConfigOption.WRITE_SIZE.getOpt()); args.add("1M,2M"); args.add("-" + ConfigOption.OPS.getOpt()); args.add(Constants.OperationType.values().length + ""); args.add("-" + ConfigOption.MAPS.getOpt()); args.add("2"); args.add("-" + ConfigOption.REDUCES.getOpt()); args.add("2"); args.add("-" + ConfigOption.APPEND_SIZE.getOpt()); args.add("1M,2M"); args.add("-" + ConfigOption.BLOCK_SIZE.getOpt()); args.add("1M,2M"); args.add("-" + ConfigOption.REPLICATION_AM.getOpt()); args.add("1,1"); if (sleep) { args.add("-" + ConfigOption.SLEEP_TIME.getOpt()); args.add("10,10"); } args.add("-" + ConfigOption.RESULT_FILE.getOpt()); args.add(getResultFile().toString()); args.add("-" + ConfigOption.BASE_DIR.getOpt()); args.add(getFlowLocation().toString()); args.add("-" + ConfigOption.DURATION.getOpt()); args.add("10"); args.add("-" + ConfigOption.DIR_SIZE.getOpt()); args.add("10"); args.add("-" + ConfigOption.FILES.getOpt()); args.add("10"); args.add("-" + ConfigOption.TRUNCATE_SIZE.getOpt()); args.add("0,1M"); } return args.toArray(new String[args.size()]); }
/** * Gets the base set of operations to use * * @return Map */ private Map<OperationType, OperationData> getBaseOperations() { Map<OperationType, OperationData> base = new HashMap<OperationType, OperationData>(); // add in all the operations // since they will all be applied unless changed OperationType[] types = OperationType.values(); for (OperationType type : types) { base.put(type, new OperationData(Distribution.UNIFORM, null)); } return base; }
/** * @return the option set to be used in command line parsing */ private Options getOptions() { Options cliopt = new Options(); cliopt.addOption(ConfigOption.MAPS); cliopt.addOption(ConfigOption.REDUCES); cliopt.addOption(ConfigOption.PACKET_SIZE); cliopt.addOption(ConfigOption.OPS); cliopt.addOption(ConfigOption.DURATION); cliopt.addOption(ConfigOption.EXIT_ON_ERROR); cliopt.addOption(ConfigOption.SLEEP_TIME); cliopt.addOption(ConfigOption.TRUNCATE_WAIT); cliopt.addOption(ConfigOption.FILES); cliopt.addOption(ConfigOption.DIR_SIZE); cliopt.addOption(ConfigOption.BASE_DIR); cliopt.addOption(ConfigOption.RESULT_FILE); cliopt.addOption(ConfigOption.CLEANUP); { String distStrs[] = new String[Distribution.values().length]; Distribution distValues[] = Distribution.values(); for (int i = 0; i < distValues.length; ++i) { distStrs[i] = distValues[i].lowerName(); } String opdesc = String.format(Constants.OP_DESCR, StringUtils .arrayToString(distStrs)); for (OperationType type : OperationType.values()) { String opname = type.lowerName(); cliopt.addOption(new Option(opname, true, opdesc)); } } cliopt.addOption(ConfigOption.REPLICATION_AM); cliopt.addOption(ConfigOption.BLOCK_SIZE); cliopt.addOption(ConfigOption.READ_SIZE); cliopt.addOption(ConfigOption.WRITE_SIZE); cliopt.addOption(ConfigOption.APPEND_SIZE); cliopt.addOption(ConfigOption.TRUNCATE_SIZE); cliopt.addOption(ConfigOption.RANDOM_SEED); cliopt.addOption(ConfigOption.QUEUE_NAME); cliopt.addOption(ConfigOption.HELP); return cliopt; }
/** * @return the map of operations to perform using config (percent may be null * if unspecified) */ Map<OperationType, OperationData> getOperations() { Map<OperationType, OperationData> operations = new HashMap<OperationType, OperationData>(); for (OperationType type : OperationType.values()) { String opname = type.lowerName(); String keyname = String.format(Constants.OP, opname); String kval = config.get(keyname); if (kval == null) { continue; } operations.put(type, new OperationData(kval)); } return operations; }
/** * Gets an operation instance (cached) for a given operation type * * @param type * the operation type to fetch for * * @return Operation operation instance or null if it can not be fetched. */ Operation getOperation(OperationType type) { Operation op = typedOperations.get(type); if (op != null) { return op; } switch (type) { case READ: op = new ReadOp(this.config, rnd); break; case LS: op = new ListOp(this.config, rnd); break; case MKDIR: op = new MkdirOp(this.config, rnd); break; case APPEND: op = new AppendOp(this.config, rnd); break; case RENAME: op = new RenameOp(this.config, rnd); break; case DELETE: op = new DeleteOp(this.config, rnd); break; case CREATE: op = new CreateOp(this.config, rnd); break; case TRUNCATE: op = new TruncateOp(this.config, rnd); break; } typedOperations.put(type, op); return op; }