Java 类org.apache.hadoop.hbase.util.MultiThreadedReader 实例源码

项目:ditb    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long startKey = (long)preloadKeys * numServers;
  long endKey = startKey + (long)writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:HIndex    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long startKey = (long)preloadKeys * numServers;
  long endKey = startKey + (long)writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  TableName tn = TableName.valueOf(TABLE_NAME);
  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:hbase    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface()
    .getClusterMetrics().getLiveServerMetrics().size();
  long startKey = preloadKeys * numServers;
  long endKey = startKey + writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}
项目:PyroDB    文件:StripeCompactionsPerformanceEvaluation.java   
private void runOneTest(String description, Configuration conf) throws Exception {
  int numServers = util.getHBaseClusterInterface().getClusterStatus().getServersSize();
  long startKey = (long)preloadKeys * numServers;
  long endKey = startKey + (long)writeKeys * numServers;
  status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
      description, numServers, startKey, endKey));

  TableName tn = TableName.valueOf(TABLE_NAME);
  if (preloadKeys > 0) {
    MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, tn);
    long time = System.currentTimeMillis();
    preloader.start(0, startKey, writeThreads);
    preloader.waitForFinish();
    if (preloader.getNumWriteFailures() > 0) {
      throw new IOException("Preload failed");
    }
    int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
    status(description + " preload took " + (System.currentTimeMillis()-time)/1000
        + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
    Thread.sleep(waitTime);
  }

  MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tn);
  MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tn, 100);
  // reader.getMetrics().enable();
  reader.linkToWriter(writer);

  long testStartTime = System.currentTimeMillis();
  writer.start(startKey, endKey, writeThreads);
  reader.start(startKey, endKey, readThreads);
  writer.waitForFinish();
  reader.waitForFinish();
  // reader.waitForVerification(300000);
  // reader.abortAndWaitForFinish();
  status("Readers and writers stopped for test " + description);

  boolean success = writer.getNumWriteFailures() == 0;
  if (!success) {
    LOG.error("Write failed");
  } else {
    success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
    if (!success) {
      LOG.error("Read failed");
    }
  }

  // Dump perf regardless of the result.
  /*StringBuilder perfDump = new StringBuilder();
  for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
    perfDump.append(String.format(
        "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
  }
  if (dumpTimePerf) {
    Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
    while (timePerf.hasNext()) {
      Triple<Long, Double, Long> pt = timePerf.next();
      perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
          description, pt.getFirst(), pt.getThird(), pt.getSecond()));
    }
  }
  LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
  status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
  Assert.assertTrue(success);
}