@Test @SuppressWarnings("unchecked") public void testSplitRamdom() throws Exception { Tool tool = new InputSampler<Object,Object>(new Configuration()); int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES), // Use 0.999 probability to reduce the flakiness of the test because // the test will fail if the number of samples is less than (number of reduces + 1). "-splitRandom", "0.999f", "20", "100", input1, input2, output }); assertEquals(0, result); Object[] partitions = readPartitions(output); // must be 3 split points since NUM_REDUCES = 4: assertEquals(3, partitions.length); // check that the partition array is sorted: Object[] sortedPartitions = Arrays.copyOf(partitions, partitions.length); Arrays.sort(sortedPartitions, new LongWritable.Comparator()); assertArrayEquals(sortedPartitions, partitions); }
@Override public int run(String[] args) throws Exception { //get the class, run with the conf if (args.length < 1) { return printUsage(); } Tool tool = null; if (args[0].equals("Generator")) { tool = new Generator(); } else if (args[0].equals("Verify")) { tool = new Verify(); } else if (args[0].equals("Loop")) { tool = new Loop(); } else if (args[0].equals("Walker")) { tool = new Walker(); } else if (args[0].equals("Print")) { tool = new Print(); } else if (args[0].equals("Delete")) { tool = new Delete(); } else { return printUsage(); } args = Arrays.copyOfRange(args, 1, args.length); return ToolRunner.run(getConf(), tool, args); }
@Override public int runTestFromCommandLine() throws Exception { Tool tool = null; if (toRun.equals("Generator")) { tool = new Generator(); } else if (toRun.equals("Verify")) { tool = new Verify(); } else if (toRun.equals("Loop")) { Loop loop = new Loop(); loop.it = this; tool = loop; } else if (toRun.equals("Walker")) { tool = new Walker(); } else if (toRun.equals("Print")) { tool = new Print(); } else if (toRun.equals("Delete")) { tool = new Delete(); } else { usage(); throw new RuntimeException("Unknown arg"); } return ToolRunner.run(getConf(), tool, otherArgs); }
public static Job parseInputAndOutputParentDirectory(Tool tool, Configuration conf, String[] args) throws IOException { if (args.length != 2) { printUsage(tool, "<input> <output>"); return null; } Job job = new Job(conf); job.setJarByClass(tool.getClass()); FileSystem fs = FileSystem.get(conf); FileStatus[] status = fs.listStatus(new Path(args[0])); for (int i = 0; i < status.length; i++) { if(status[i].isDir()) { FileInputFormat.addInputPath(job, status[i].getPath()); } } FileOutputFormat.setOutputPath(job, new Path(args[1])); return job; }
private static Tool resolveClient(Configuration conf, ClassDescription client) { try { Class<?> aClass = client.resolve(conf.getClassLoader()); if (Tool.class.isAssignableFrom(aClass) == false) { throw new IllegalArgumentException(MessageFormat.format( "MapReduce client class must implement Tool interface: {0}", client.getClassName())); } Tool tool = ReflectionUtils.newInstance(aClass.asSubclass(Tool.class), conf); return tool; } catch (ReflectiveOperationException e) { throw new IllegalArgumentException(MessageFormat.format( "failed to resolve MapReduce client class: {0}", client.getClassName())); } }
@Test public void testRun_l2t10() throws Exception { // gen rowkeys file for later test Configuration conf = TEST_UTIL.getConfiguration(); String outputPath = "/run_b2t3"; Tool tool = new GetRandomRowsByRegions(conf); int status = tool.run(new String[] { "-b", "2", "-t", "3", VERTEX_TABLE, outputPath }); Assert.assertEquals(0, status); // merge content File tf = mergeResults(conf, outputPath, "rowkeys-1"); // run test File tPath = tf.getParentFile(); tPath = new File(tPath, "performanceTestResults_" + System.currentTimeMillis()); FileUtils.forceMkdir(tPath); tool = new HGraphClientPerformanceTest(conf); status = tool.run(new String[] { "-l", "2", "-t", "10", VERTEX_TABLE, EDGE_TABLE, tf.getAbsolutePath(), tPath.getAbsolutePath() }); Assert.assertEquals(0, status); // verify test results outputTestResults(tPath); }
@Test public void testRun_ml2t10() throws Exception { // gen rowkeys file for later test Configuration conf = TEST_UTIL.getConfiguration(); String outputPath = "/run_ml2t10"; Tool tool = new GetRandomRowsByRegions(conf); int status = tool.run(new String[] { "-b", "2", "-t", "3", VERTEX_TABLE, outputPath }); Assert.assertEquals(0, status); // merge content File tf = mergeResults(conf, outputPath, "rowkeys-2"); // run test File tPath = tf.getParentFile(); tPath = new File(tPath, "performanceTestResults_" + System.currentTimeMillis()); FileUtils.forceMkdir(tPath); tool = new HGraphClientPerformanceTest(conf); status = tool.run(new String[] { "-m", "-l", "2", "-t", "10", VERTEX_TABLE, EDGE_TABLE, tf.getAbsolutePath(), tPath.getAbsolutePath() }); Assert.assertEquals(0, status); // verify test results outputTestResults(tPath); }
@Test public void testRun_i2000l2t10() throws Exception { // gen rowkeys file for later test Configuration conf = TEST_UTIL.getConfiguration(); String outputPath = "/run_i2000l2t10"; Tool tool = new GetRandomRowsByRegions(conf); int status = tool.run(new String[] { "-b", "2", "-t", "3", VERTEX_TABLE, outputPath }); Assert.assertEquals(0, status); // merge content File tf = mergeResults(conf, outputPath, "rowkeys-2"); // run test File tPath = tf.getParentFile(); tPath = new File(tPath, "performanceTestResults_" + System.currentTimeMillis()); FileUtils.forceMkdir(tPath); tool = new HGraphClientPerformanceTest(conf); status = tool.run(new String[] { "-i", "2000", "-l", "2", "-t", "10", VERTEX_TABLE, EDGE_TABLE, tf.getAbsolutePath(), tPath.getAbsolutePath() }); Assert.assertEquals(0, status); // verify test results outputTestResults(tPath); }
private void runTool(Tool tool, Batch batch, ResultCollector resultCollector) throws IOException, InterruptedException, TransformerException { //create the input as a file on the cluster Configuration conf = new Configuration(); getProperties().setProperty(ConfigConstants.ITERATOR_USE_FILESYSTEM, "False"); propertiesToHadoopConfiguration(conf, getProperties()); conf.set(ConfigConstants.BATCH_ID, batch.getFullID()); String user = conf.get(ConfigConstants.HADOOP_USER, "newspapr"); conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); FileSystem fs = FileSystem.get(FileSystem.getDefaultUri(conf), conf, user); long time = System.currentTimeMillis(); String jobFolder = getProperties().getProperty(ConfigConstants.JOB_FOLDER); Path inputFile = createInputFile(batch, fs, time, jobFolder); Path outDir = new Path( jobFolder, "output_" + batch.getFullID() + "_" + time); runJob(tool, batch, resultCollector, conf, inputFile, outDir, user); }
public static int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception { PrintStream oldOut = System.out; PrintStream newOut = new PrintStream(out, true); try { System.setOut(newOut); return ToolRunner.run(conf, tool, args); } finally { System.setOut(oldOut); } }
private void executeTool(String toolMessage, Tool tool, String[] args, int expectedResult) throws Exception { LOG.info("Starting " + toolMessage); int res = ToolRunner.run(getConf(), tool, new String[] { "--upgrade" }); if (res != expectedResult) { LOG.error(toolMessage + "returned " + res + ", expected " + expectedResult); throw new Exception("Unexpected return code from " + toolMessage); } LOG.info("Successfully completed " + toolMessage); }
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, int valueMultiplier) throws Exception { TableName table = TableName.valueOf(args[args.length - 1]); Configuration conf = new Configuration(util.getConfiguration()); // populate input file FileSystem fs = FileSystem.get(conf); Path inputPath = fs.makeQualified(new Path(util .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); FSDataOutputStream op = fs.create(inputPath, true); op.write(Bytes.toBytes(data)); op.close(); LOG.debug(String.format("Wrote test data to file: %s", inputPath)); if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { LOG.debug("Forcing combiner."); conf.setInt("mapreduce.map.combine.minspills", 1); } // run the import List<String> argv = new ArrayList<String>(Arrays.asList(args)); argv.add(inputPath.toString()); Tool tool = new ImportTsv(); LOG.debug("Running ImportTsv with arguments: " + argv); try { // Job will fail if observer rejects entries without TTL assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); } finally { // Clean up if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { LOG.debug("Deleting test subdirectory"); util.cleanupDataTestDirOnTestFS(table.getNameAsString()); } } return tool; }
/** * Run an ImportTsv job and perform basic validation on the results. Returns * the ImportTsv <code>Tool</code> instance so that other tests can inspect it * for further validation as necessary. This method is static to insure * non-reliance on instance's util/conf facilities. * * @param args * Any arguments to pass BEFORE inputFile path is appended. * @param dataAvailable * @return The Tool instance used to run the test. */ private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, int valueMultiplier, boolean dataAvailable) throws Exception { String table = args[args.length - 1]; Configuration conf = new Configuration(util.getConfiguration()); // populate input file FileSystem fs = FileSystem.get(conf); Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); FSDataOutputStream op = fs.create(inputPath, true); op.write(Bytes.toBytes(data)); op.close(); LOG.debug(String.format("Wrote test data to file: %s", inputPath)); if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { LOG.debug("Forcing combiner."); conf.setInt("mapreduce.map.combine.minspills", 1); } // run the import List<String> argv = new ArrayList<String>(Arrays.asList(args)); argv.add(inputPath.toString()); Tool tool = new ImportTsv(); LOG.debug("Running ImportTsv with arguments: " + argv); assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { LOG.debug("Deleting test subdirectory"); util.cleanupDataTestDirOnTestFS(table); } return tool; }
@Test public void testGenerateAndLoad() throws Exception { LOG.info("Running test testGenerateAndLoad."); TableName table = TableName.valueOf(NAME + "-" + UUID.randomUUID()); String cf = "d"; Path hfiles = new Path( util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles"); String[] args = { format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles), format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", ImportTsv.COLUMNS_CONF_KEY, cf, cf), // configure the test harness to NOT delete the HFiles after they're // generated. We need those for doLoadIncrementalHFiles format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF), table.getNameAsString() }; // run the job, complete the load. util.createTable(table, new String[]{cf}); Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); doLoadIncrementalHFiles(hfiles, table); // validate post-conditions validateDeletedPartitionsFile(t.getConf()); // clean up after ourselves. util.deleteTable(table); util.cleanupDataTestDirOnTestFS(table.getNameAsString()); LOG.info("testGenerateAndLoad completed successfully."); }
@Override public int runTestFromCommandLine() throws Exception { Tool tool = null; Loop loop = new VisibilityLoop(); loop.it = this; tool = loop; return ToolRunner.run(getConf(), tool, otherArgs); }
@Override public int runTestFromCommandLine() throws Exception { Tool tool = null; if (toRun.equalsIgnoreCase("Generator")) { tool = new Generator(); } else if (toRun.equalsIgnoreCase("Verify")) { tool = new Verify(); } else if (toRun.equalsIgnoreCase("Loop")) { Loop loop = new Loop(); loop.it = this; tool = loop; } else if (toRun.equalsIgnoreCase("Walker")) { tool = new Walker(); } else if (toRun.equalsIgnoreCase("Print")) { tool = new Print(); } else if (toRun.equalsIgnoreCase("Delete")) { tool = new Delete(); } else if (toRun.equalsIgnoreCase("Clean")) { tool = new Clean(); } else if (toRun.equalsIgnoreCase("Search")) { tool = new Search(); } else { usage(); throw new RuntimeException("Unknown arg"); } return ToolRunner.run(getConf(), tool, otherArgs); }
private void goodExec(Class<?> theClass, String... args) throws InterruptedException, IOException { Entry<Integer,String> pair; if (Tool.class.isAssignableFrom(theClass) && ClusterType.STANDALONE == getClusterType()) { StandaloneClusterControl control = (StandaloneClusterControl) getClusterControl(); pair = control.execMapreduceWithStdout(theClass, args); } else { // We're already slurping stdout into memory (not redirecting to file). Might as well add it to error message. pair = getClusterControl().execWithStdout(theClass, args); } Assert.assertEquals("stdout=" + pair.getValue(), 0, pair.getKey().intValue()); }
@Test public void testSplitSample() throws Exception { Tool tool = new InputSampler<Object,Object>(new Configuration()); int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES), "-splitSample", "10", "100", input1, input2, output }); assertEquals(0, result); Object[] partitions = readPartitions(output); assertArrayEquals( new LongWritable[] { new LongWritable(2L), new LongWritable(7L), new LongWritable(20L),}, partitions); }
@Test public void testSplitInterval() throws Exception { Tool tool = new InputSampler<Object,Object>(new Configuration()); int result = tool.run(new String[] { "-r", Integer.toString(NUM_REDUCES), "-splitInterval", "0.5f", "0", input1, input2, output }); assertEquals(0, result); Object[] partitions = readPartitions(output); assertArrayEquals(new LongWritable[] { new LongWritable(7L), new LongWritable(9L), new LongWritable(35L),}, partitions); }
private int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception { PrintStream oldOut = System.out; PrintStream newOut = new PrintStream(out, true); try { System.setOut(newOut); return ToolRunner.run(conf, tool, args); } finally { System.setOut(oldOut); } }
private void runBalancerCli(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); final String[] args = { "-policy", "datanode" }; final Tool tool = new Cli(); tool.setConf(conf); final int r = tool.run(args); // start rebalancing assertEquals("Tools should exit 0 on success", 0, r); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); }
/** * Run an ImportTsv job and perform basic validation on the results. Returns * the ImportTsv <code>Tool</code> instance so that other tests can inspect it * for further validation as necessary. This method is static to insure * non-reliance on instance's util/conf facilities. * * @param args * Any arguments to pass BEFORE inputFile path is appended. * @param dataAvailable * @return The Tool instance used to run the test. */ private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, int valueMultiplier, boolean dataAvailable) throws Exception { String table = args[args.length - 1]; Configuration conf = new Configuration(util.getConfiguration()); // populate input file FileSystem fs = FileSystem.get(conf); Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); FSDataOutputStream op = fs.create(inputPath, true); op.write(Bytes.toBytes(data)); op.close(); LOG.debug(String.format("Wrote test data to file: %s", inputPath)); if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { LOG.debug("Forcing combiner."); conf.setInt("min.num.spills.for.combine", 1); } // run the import List<String> argv = new ArrayList<String>(Arrays.asList(args)); argv.add(inputPath.toString()); Tool tool = new ImportTsv(); LOG.debug("Running ImportTsv with arguments: " + argv); assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); validateTable(conf, table, family, valueMultiplier, dataAvailable); if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { LOG.debug("Deleting test subdirectory"); util.cleanupDataTestDirOnTestFS(table); } return tool; }
@Test public void testGenerateAndLoad() throws Exception { LOG.info("Running test testGenerateAndLoad."); String table = NAME + "-" + UUID.randomUUID(); String cf = "d"; Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); String[] args = { format("-D%s=%s", ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles), format("-D%s=HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", ImportTsv.COLUMNS_CONF_KEY, cf, cf), // configure the test harness to NOT delete the HFiles after they're // generated. We need those for doLoadIncrementalHFiles format("-D%s=false", TestImportTsv.DELETE_AFTER_LOAD_CONF), table }; // run the job, complete the load. util.createTable(table, cf); Tool t = TestImportTsv.doMROnTableTest(util, cf, simple_tsv, args); doLoadIncrementalHFiles(hfiles, table); // validate post-conditions validateDeletedPartitionsFile(t.getConf()); // clean up after ourselves. util.deleteTable(table); util.cleanupDataTestDirOnTestFS(table); LOG.info("testGenerateAndLoad completed successfully."); }