Java 类org.apache.hadoop.hbase.client.metrics.ScanMetrics 实例源码

项目:ditb    文件:ClientSideRegionScanner.java   
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
    Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics)
        throws IOException {

  // region is immutable, set isolation level
  scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);

  // open region from the snapshot directory
  this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);

  // create an internal region scanner
  this.scanner = region.getScanner(scan);
  values = new ArrayList<Cell>();

  if (scanMetrics == null) {
    initScanMetrics(scan);
  } else {
    this.scanMetrics = scanMetrics;
  }
  region.startRegionOperation();
}
项目:ditb    文件:TableRecordReaderImpl.java   
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
    Method getCounter, TaskAttemptContext context, long numStale) {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (getCounter == null) {
    return;
  }

  try {
    for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
      Counter ct = (Counter)getCounter.invoke(context,
          HBASE_COUNTER_GROUP_NAME, entry.getKey());

      ct.increment(entry.getValue());
    }
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCAN_RESULTS_STALE")).increment(numStale);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
项目:ditb    文件:TestServerSideScanMetricsFromClientSide.java   
/**
 * Run the scan to completetion and check the metric against the specified value
 * @param scan
 * @param metricKey
 * @param expectedValue
 * @throws Exception
 */
public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
  assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
  ResultScanner scanner = TABLE.getScanner(scan);

  // Iterate through all the results
  for (Result r : scanner) {
  }
  scanner.close();
  ScanMetrics metrics = scan.getScanMetrics();
  assertTrue("Metrics are null", metrics != null);
  assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
  final long actualMetricValue = metrics.getCounter(metricKey).get();
  assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: "
      + actualMetricValue, expectedValue, actualMetricValue);

}
项目:ditb    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = parser.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:pbase    文件:ClientSideRegionScanner.java   
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
    Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics)
        throws IOException {

  // region is immutable, set isolation level
  scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);

  // open region from the snapshot directory
  this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);

  // create an internal region scanner
  this.scanner = region.getScanner(scan);
  values = new ArrayList<Cell>();

  if (scanMetrics == null) {
    initScanMetrics(scan);
  } else {
    this.scanMetrics = scanMetrics;
  }
  region.startRegionOperation();
}
项目:pbase    文件:TableRecordReaderImpl.java   
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
    Method getCounter, TaskAttemptContext context, long numStale) {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (getCounter == null) {
    return;
  }

  try {
    for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
      Counter ct = (Counter)getCounter.invoke(context,
          HBASE_COUNTER_GROUP_NAME, entry.getKey());

      ct.increment(entry.getValue());
    }
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCAN_RESULTS_STALE")).increment(numStale);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
项目:pbase    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = parser.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:HIndex    文件:ClientSideRegionScanner.java   
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
    Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException {

  this.scan = scan;

  // region is immutable, set isolation level
  scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);

  // open region from the snapshot directory
  this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);

  // create an internal region scanner
  this.scanner = region.getScanner(scan);
  values = new ArrayList<Cell>();

  if (scanMetrics == null) {
    initScanMetrics(scan);
  } else {
    this.scanMetrics = scanMetrics;
  }
  region.startRegionOperation();
}
项目:HIndex    文件:TableRecordReaderImpl.java   
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
    Method getCounter, TaskAttemptContext context) {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (getCounter == null) {
    return;
  }

  try {
    for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
      Counter ct = (Counter)getCounter.invoke(context,
          HBASE_COUNTER_GROUP_NAME, entry.getKey());

      ct.increment(entry.getValue());
    }
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
项目:HIndex    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = parser.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:hbase    文件:ClientSideRegionScanner.java   
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
    Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics)
        throws IOException {
  // region is immutable, set isolation level
  scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);

  htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();

  // open region from the snapshot directory
  this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);

  // create an internal region scanner
  this.scanner = region.getScanner(scan);
  values = new ArrayList<>();

  if (scanMetrics == null) {
    initScanMetrics(scan);
  } else {
    this.scanMetrics = scanMetrics;
  }
  region.startRegionOperation();
}
项目:hbase    文件:TestServerSideScanMetricsFromClientSide.java   
/**
 * Run the scan to completetion and check the metric against the specified value
 * @param scan
 * @param metricKey
 * @param expectedValue
 * @throws Exception
 */
public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
  assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
  ResultScanner scanner = TABLE.getScanner(scan);
  // Iterate through all the results
  while (scanner.next() != null) {

  }
  scanner.close();
  ScanMetrics metrics = scanner.getScanMetrics();
  assertTrue("Metrics are null", metrics != null);
  assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
  final long actualMetricValue = metrics.getCounter(metricKey).get();
  assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: "
      + actualMetricValue, expectedValue, actualMetricValue);

}
项目:hbase    文件:TableRecordReaderImpl.java   
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
    Method getCounter, TaskAttemptContext context, long numStale) {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (getCounter == null) {
    return;
  }

  try {
    for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
      Counter ct = (Counter)getCounter.invoke(context,
          HBASE_COUNTER_GROUP_NAME, entry.getKey());

      ct.increment(entry.getValue());
    }
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCAN_RESULTS_STALE")).increment(numStale);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
项目:hbase    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = parser.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:hbase    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    // Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:hbase    文件:AsyncClientScanner.java   
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
    AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
    long rpcTimeoutNs, int startLogErrorsCnt) {
  if (scan.getStartRow() == null) {
    scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
  }
  if (scan.getStopRow() == null) {
    scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow());
  }
  this.scan = scan;
  this.consumer = consumer;
  this.tableName = tableName;
  this.conn = conn;
  this.pauseNs = pauseNs;
  this.maxAttempts = maxAttempts;
  this.scanTimeoutNs = scanTimeoutNs;
  this.rpcTimeoutNs = rpcTimeoutNs;
  this.startLogErrorsCnt = startLogErrorsCnt;
  this.resultCache = createScanResultCache(scan);
  if (scan.isScanMetricsEnabled()) {
    this.scanMetrics = new ScanMetrics();
    consumer.onScanMetricsCreated(scanMetrics);
  } else {
    this.scanMetrics = null;
  }
}
项目:hbase    文件:ConnectionUtils.java   
static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs,
    boolean isRegionServerRemote) {
  if (scanMetrics == null || rrs == null || rrs.length == 0) {
    return;
  }
  long resultSize = 0;
  for (Result rr : rrs) {
    for (Cell cell : rr.rawCells()) {
      resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
    }
  }
  scanMetrics.countOfBytesInResults.addAndGet(resultSize);
  if (isRegionServerRemote) {
    scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
  }
}
项目:Kylin    文件:CubeSegmentTupleIterator.java   
private void closeScanner() {
    if (logger.isDebugEnabled() && scan != null) {
        logger.debug("Scan " + scan.toString());
        byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
        if (metricsBytes != null) {
            ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
            logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
        }
    }
    try {
        if (scanner != null) {
            scanner.close();
            scanner = null;
        }
    } catch (Throwable t) {
        throw new StorageException("Error when close scanner for table " + tableName, t);
    }
}
项目:PyroDB    文件:ClientSideRegionScanner.java   
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
    Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) 
        throws IOException {

  this.scan = scan;

  // region is immutable, set isolation level
  scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);

  // open region from the snapshot directory
  this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);

  // create an internal region scanner
  this.scanner = region.getScanner(scan);
  values = new ArrayList<Cell>();

  if (scanMetrics == null) {
    initScanMetrics(scan);
  } else {
    this.scanMetrics = scanMetrics;
  }
  region.startRegionOperation();
}
项目:PyroDB    文件:TableRecordReaderImpl.java   
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
    Method getCounter, TaskAttemptContext context) {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (getCounter == null) {
    return;
  }

  try {
    for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
      Counter ct = (Counter)getCounter.invoke(context,
          HBASE_COUNTER_GROUP_NAME, entry.getKey());

      ct.increment(entry.getValue());
    }
    ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
项目:PyroDB    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = parser.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:c5    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
  MapReduceProtos.ScanMetrics pScanMetrics = null;
  try {
    pScanMetrics = parser.parseFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  ScanMetrics scanMetrics = new ScanMetrics();
  if (pScanMetrics != null) {
    for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
      if (pair.hasName() && pair.hasValue()) {
        scanMetrics.setCounter(pair.getName(), pair.getValue());
      }
    }
  }
  return scanMetrics;
}
项目:DominoHBase    文件:ProtobufUtil.java   
public static ScanMetrics toScanMetrics(final byte[] bytes) {
  MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
  try {
    builder.mergeFrom(bytes);
  } catch (InvalidProtocolBufferException e) {
    //Ignored there are just no key values to add.
  }
  MapReduceProtos.ScanMetrics pScanMetrics = builder.build();
  ScanMetrics scanMetrics = new ScanMetrics();
  for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) {
    if (pair.hasName() && pair.hasValue()) {
      scanMetrics.setCounter(pair.getName(), pair.getValue());
    }
  }
  return scanMetrics;
}
项目:ditb    文件:TableRecordReaderImpl.java   
/**
 * If hbase runs on new version of mapreduce, RecordReader has access to
 * counters thus can update counters based on scanMetrics.
 * If hbase runs on old version of mapreduce, it won't be able to get
 * access to counters and TableRecorderReader can't update counter values.
 * @throws IOException
 */
private void updateCounters() throws IOException {
  ScanMetrics scanMetrics = this.scan.getScanMetrics();
  if (scanMetrics == null) {
    return;
  }

  updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
}
项目:ditb    文件:TableSnapshotInputFormat.java   
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  boolean result = delegate.nextKeyValue();
  if (result) {
    ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
    if (scanMetrics != null && context != null) {
      TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
    }
  }
  return result;
}
项目:ditb    文件:TestFromClientSide.java   
private ScanMetrics getScanMetrics(Scan scan) throws Exception {
  byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
  assertTrue("Serialized metrics were not found.", serializedMetrics != null);

  ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);

  return scanMetrics;
}
项目:ditb    文件:ProtobufUtil.java   
public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) {
  MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
  Map<String, Long> metrics = scanMetrics.getMetricsMap();
  for (Entry<String, Long> e : metrics.entrySet()) {
    HBaseProtos.NameInt64Pair nameInt64Pair =
        HBaseProtos.NameInt64Pair.newBuilder()
            .setName(e.getKey())
            .setValue(e.getValue())
            .build();
    builder.addMetrics(nameInt64Pair);
  }
  return builder.build();
}
项目:ditb    文件:ScannerCallable.java   
/**
 *
 * @param connection
 * @param tableName
 * @param scan
 * @param scanMetrics
 * @param id the replicaId
 */
public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
    ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
  super(connection, tableName, scan.getStartRow());
  this.id = id;
  this.cConnection = connection;
  this.scan = scan;
  this.scanMetrics = scanMetrics;
  Configuration conf = connection.getConfiguration();
  logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
  logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
  this.controllerFactory = rpcControllerFactory;
}
项目:ditb    文件:ReversedScannerCallable.java   
/**
 * @deprecated use
 *             {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory )}
 */
@Deprecated
public ReversedScannerCallable(ClusterConnection connection, TableName tableName,
    Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
  this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
      .instantiate(connection.getConfiguration()));
}
项目:ditb    文件:AbstractClientScanner.java   
/**
 * Check and initialize if application wants to collect scan metrics
 */
protected void initScanMetrics(Scan scan) {
  // check if application wants to collect scan metrics
  if (scan.isScanMetricsEnabled()) {
    scanMetrics = new ScanMetrics();
  }
}
项目:ditb    文件:ClientSmallScanner.java   
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
    Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
    RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
    int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) {
  scan.setStartRow(localStartKey);
  SmallScannerCallable s = new SmallScannerCallable(
    connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0);
  ScannerCallableWithReplicas scannerCallableWithReplicas =
      new ScannerCallableWithReplicas(table, connection,
          s, pool, primaryOperationTimeout, scan, retries,
          scannerTimeout, cacheNum, conf, caller);
  return scannerCallableWithReplicas;
}
项目:ditb    文件:TestClientSmallScanner.java   
private SmallScannerCallableFactory getFactory(
    final ScannerCallableWithReplicas callableWithReplicas) {
  return new SmallScannerCallableFactory() {
    @Override
    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
        RpcControllerFactory controllerFactory, ExecutorService pool,
        int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
        RpcRetryingCaller<Result[]> caller) {
      return callableWithReplicas;
    }
  };
}
项目:ditb    文件:TestClientSmallReversedScanner.java   
private SmallScannerCallableFactory getFactory(
    final ScannerCallableWithReplicas callableWithReplicas) {
  return new SmallScannerCallableFactory() {
    @Override
    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
        RpcControllerFactory controllerFactory, ExecutorService pool,
        int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
        RpcRetryingCaller<Result[]> caller) {
      return callableWithReplicas;
    }
  };
}
项目:LCIndex-HBase-0.94.16    文件:ClientScanner.java   
/**
 * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
 * row maybe changed changed.
 * @param conf The {@link Configuration} to use.
 * @param scan {@link Scan} to use in this scanner
 * @param tableName The table that we wish to scan
 * @param connection Connection identifying the cluster
 * @throws IOException
 */
public ClientScanner(final Configuration conf, final Scan scan, final byte[] tableName,
    HConnection connection) throws IOException {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Creating scanner over " + Bytes.toString(tableName) + " starting at key '"
        + Bytes.toStringBinary(scan.getStartRow()) + "'");
  }
  this.scan = scan;
  this.tableName = tableName;
  this.lastNext = System.currentTimeMillis();
  this.connection = connection;
  this.maxScannerResultSize =
      conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
  this.scannerTimeout =
      (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
        HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);

  // check if application wants to collect scan metrics
  byte[] enableMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
  if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
    scanMetrics = new ScanMetrics();
  }

  // Use the caching from the Scan. If not set, use the default cache setting for this table.
  if (this.scan.getCaching() > 0) {
    this.caching = this.scan.getCaching();
  } else {
    this.caching = conf.getInt("hbase.client.scanner.caching", 1);
  }

  // initialize the scanner
  initializeScannerInConstruction();
}
项目:LCIndex-HBase-0.94.16    文件:ScannerCallable.java   
/**
 * @param connection which connection
 * @param tableName table callable is on
 * @param scan the scan to execute
 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
 *          metrics
 */
public ScannerCallable(HConnection connection, byte[] tableName, Scan scan,
    ScanMetrics scanMetrics) {
  super(connection, tableName, scan.getStartRow());
  this.scan = scan;
  this.scanMetrics = scanMetrics;
  Configuration conf = connection.getConfiguration();
  logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
  logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
}
项目:LCIndex-HBase-0.94.16    文件:TableRecordReaderImpl.java   
/**
 * If hbase runs on new version of mapreduce, RecordReader has access to
 * counters thus can update counters based on scanMetrics.
 * If hbase runs on old version of mapreduce, it won't be able to get
 * access to counters and TableRecorderReader can't update counter values.
 * @throws IOException
 */
private void updateCounters() throws IOException {
  // we can get access to counters only if hbase uses new mapreduce APIs
  if (this.getCounter == null) {
    return;
  }

  byte[] serializedMetrics = currentScan.getAttribute(
      Scan.SCAN_ATTRIBUTES_METRICS_DATA);
  if (serializedMetrics == null || serializedMetrics.length == 0 ) {
    return;
  }

  DataInputBuffer in = new DataInputBuffer();
  in.reset(serializedMetrics, 0, serializedMetrics.length);
  ScanMetrics scanMetrics = new ScanMetrics();
  scanMetrics.readFields(in);
  MetricsTimeVaryingLong[] mlvs =
    scanMetrics.getMetricsTimeVaryingLongArray();

  try {
    for (MetricsTimeVaryingLong mlv : mlvs) {
      Counter ct = (Counter)this.getCounter.invoke(context,
        HBASE_COUNTER_GROUP_NAME, mlv.getName());
      ct.increment(mlv.getCurrentIntervalValue());
    }
    ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
        "NUM_SCANNER_RESTARTS")).increment(numRestarts);
  } catch (Exception e) {
    LOG.debug("can't update counter." + StringUtils.stringifyException(e));
  }
}
项目:LCIndex-HBase-0.94.16    文件:TestFromClientSide.java   
private ScanMetrics getScanMetrics(Scan scan) throws Exception {
  byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
  assertTrue("Serialized metrics were not found.", serializedMetrics != null);

  DataInputBuffer in = new DataInputBuffer();
  in.reset(serializedMetrics, 0, serializedMetrics.length);
  ScanMetrics scanMetrics = new ScanMetrics();
  scanMetrics.readFields(in);
  return scanMetrics;
}
项目:pbase    文件:TableRecordReaderImpl.java   
/**
 * If hbase runs on new version of mapreduce, RecordReader has access to
 * counters thus can update counters based on scanMetrics.
 * If hbase runs on old version of mapreduce, it won't be able to get
 * access to counters and TableRecorderReader can't update counter values.
 * @throws IOException
 */
private void updateCounters() throws IOException {
  ScanMetrics scanMetrics = this.scan.getScanMetrics();
  if (scanMetrics == null) {
    return;
  }

  updateCounters(scanMetrics, numRestarts, getCounter, context, numStale);
}
项目:pbase    文件:TableSnapshotInputFormat.java   
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  boolean result = delegate.nextKeyValue();
  if (result) {
    ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
    if (scanMetrics != null && context != null) {
      TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
    }
  }
  return result;
}
项目:pbase    文件:TestFromClientSide.java   
private ScanMetrics getScanMetrics(Scan scan) throws Exception {
  byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
  assertTrue("Serialized metrics were not found.", serializedMetrics != null);

  ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);

  return scanMetrics;
}