private void runOneTest(String description, Configuration conf) throws Exception { int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long startKey = (long)preloadKeys * numServers; long endKey = startKey + (long)writeKeys * numServers; status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", description, numServers, startKey, endKey)); if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); long time = System.currentTimeMillis(); preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); } int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary status(description + " preload took " + (System.currentTimeMillis()-time)/1000 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); Thread.sleep(waitTime); } MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); // reader.getMetrics().enable(); reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); writer.start(startKey, endKey, writeThreads); reader.start(startKey, endKey, readThreads); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); // reader.abortAndWaitForFinish(); status("Readers and writers stopped for test " + description); boolean success = writer.getNumWriteFailures() == 0; if (!success) { LOG.error("Write failed"); } else { success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; if (!success) { LOG.error("Read failed"); } } // Dump perf regardless of the result. /*StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); Assert.assertTrue(success); }
private void runOneTest(String description, Configuration conf) throws Exception { int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize(); long startKey = (long)preloadKeys * numServers; long endKey = startKey + (long)writeKeys * numServers; status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", description, numServers, startKey, endKey)); TableName tn = TableName.valueOf(TABLE_NAME); if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn); long time = System.currentTimeMillis(); preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); } int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary status(description + " preload took " + (System.currentTimeMillis()-time)/1000 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); Thread.sleep(waitTime); } MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn); MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100); // reader.getMetrics().enable(); reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); writer.start(startKey, endKey, writeThreads); reader.start(startKey, endKey, readThreads); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); // reader.abortAndWaitForFinish(); status("Readers and writers stopped for test " + description); boolean success = writer.getNumWriteFailures() == 0; if (!success) { LOG.error("Write failed"); } else { success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; if (!success) { LOG.error("Read failed"); } } // Dump perf regardless of the result. /*StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); Assert.assertTrue(success); }
private void runOneTest(String description, Configuration conf) throws Exception { int numServers = util.getHBaseClusterInterface() .getClusterMetrics().getLiveServerMetrics().size(); long startKey = preloadKeys * numServers; long endKey = startKey + writeKeys * numServers; status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", description, numServers, startKey, endKey)); if (preloadKeys > 0) { MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); long time = System.currentTimeMillis(); preloader.start(0, startKey, writeThreads); preloader.waitForFinish(); if (preloader.getNumWriteFailures() > 0) { throw new IOException("Preload failed"); } int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary status(description + " preload took " + (System.currentTimeMillis()-time)/1000 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); Thread.sleep(waitTime); } MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); // reader.getMetrics().enable(); reader.linkToWriter(writer); long testStartTime = System.currentTimeMillis(); writer.start(startKey, endKey, writeThreads); reader.start(startKey, endKey, readThreads); writer.waitForFinish(); reader.waitForFinish(); // reader.waitForVerification(300000); // reader.abortAndWaitForFinish(); status("Readers and writers stopped for test " + description); boolean success = writer.getNumWriteFailures() == 0; if (!success) { LOG.error("Write failed"); } else { success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; if (!success) { LOG.error("Read failed"); } } // Dump perf regardless of the result. /*StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); Assert.assertTrue(success); }