public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // open region from the snapshot directory this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); // create an internal region scanner this.scanner = region.getScanner(scan); values = new ArrayList<Cell>(); if (scanMetrics == null) { initScanMetrics(scan); } else { this.scanMetrics = scanMetrics; } region.startRegionOperation(); }
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, Method getCounter, TaskAttemptContext context, long numStale) { // we can get access to counters only if hbase uses new mapreduce APIs if (getCounter == null) { return; } try { for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { Counter ct = (Counter)getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, entry.getKey()); ct.increment(entry.getValue()); } ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, "NUM_SCAN_RESULTS_STALE")).increment(numStale); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); } }
/** * Run the scan to completetion and check the metric against the specified value * @param scan * @param metricKey * @param expectedValue * @throws Exception */ public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); ResultScanner scanner = TABLE.getScanner(scan); // Iterate through all the results for (Result r : scanner) { } scanner.close(); ScanMetrics metrics = scan.getScanMetrics(); assertTrue("Metrics are null", metrics != null); assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); final long actualMetricValue = metrics.getCounter(metricKey).get(); assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, expectedValue, actualMetricValue); }
public static ScanMetrics toScanMetrics(final byte[] bytes) { Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER; MapReduceProtos.ScanMetrics pScanMetrics = null; try { pScanMetrics = parser.parseFrom(bytes); } catch (InvalidProtocolBufferException e) { //Ignored there are just no key values to add. } ScanMetrics scanMetrics = new ScanMetrics(); if (pScanMetrics != null) { for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) { if (pair.hasName() && pair.hasValue()) { scanMetrics.setCounter(pair.getName(), pair.getValue()); } } } return scanMetrics; }
public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { this.scan = scan; // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); // open region from the snapshot directory this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); // create an internal region scanner this.scanner = region.getScanner(scan); values = new ArrayList<Cell>(); if (scanMetrics == null) { initScanMetrics(scan); } else { this.scanMetrics = scanMetrics; } region.startRegionOperation(); }
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts, Method getCounter, TaskAttemptContext context) { // we can get access to counters only if hbase uses new mapreduce APIs if (getCounter == null) { return; } try { for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) { Counter ct = (Counter)getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, entry.getKey()); ct.increment(entry.getValue()); } ((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS")).increment(numScannerRestarts); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); } }
public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build(); // open region from the snapshot directory this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); // create an internal region scanner this.scanner = region.getScanner(scan); values = new ArrayList<>(); if (scanMetrics == null) { initScanMetrics(scan); } else { this.scanMetrics = scanMetrics; } region.startRegionOperation(); }
/** * Run the scan to completetion and check the metric against the specified value * @param scan * @param metricKey * @param expectedValue * @throws Exception */ public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); ResultScanner scanner = TABLE.getScanner(scan); // Iterate through all the results while (scanner.next() != null) { } scanner.close(); ScanMetrics metrics = scanner.getScanMetrics(); assertTrue("Metrics are null", metrics != null); assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); final long actualMetricValue = metrics.getCounter(metricKey).get(); assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, expectedValue, actualMetricValue); }
public static ScanMetrics toScanMetrics(final byte[] bytes) { MapReduceProtos.ScanMetrics pScanMetrics = null; try { pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes); } catch (InvalidProtocolBufferException e) { // Ignored there are just no key values to add. } ScanMetrics scanMetrics = new ScanMetrics(); if (pScanMetrics != null) { for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) { if (pair.hasName() && pair.hasValue()) { scanMetrics.setCounter(pair.getName(), pair.getValue()); } } } return scanMetrics; }
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); } if (scan.getStopRow() == null) { scan.withStopRow(EMPTY_END_ROW, scan.includeStopRow()); } this.scan = scan; this.consumer = consumer; this.tableName = tableName; this.conn = conn; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; this.startLogErrorsCnt = startLogErrorsCnt; this.resultCache = createScanResultCache(scan); if (scan.isScanMetricsEnabled()) { this.scanMetrics = new ScanMetrics(); consumer.onScanMetricsCreated(scanMetrics); } else { this.scanMetrics = null; } }
static void updateResultsMetrics(ScanMetrics scanMetrics, Result[] rrs, boolean isRegionServerRemote) { if (scanMetrics == null || rrs == null || rrs.length == 0) { return; } long resultSize = 0; for (Result rr : rrs) { for (Cell cell : rr.rawCells()) { resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell); } } scanMetrics.countOfBytesInResults.addAndGet(resultSize); if (isRegionServerRemote) { scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize); } }
private void closeScanner() { if (logger.isDebugEnabled() && scan != null) { logger.debug("Scan " + scan.toString()); byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); if (metricsBytes != null) { ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes); logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries }); } } try { if (scanner != null) { scanner.close(); scanner = null; } } catch (Throwable t) { throw new StorageException("Error when close scanner for table " + tableName, t); } }
public static ScanMetrics toScanMetrics(final byte[] bytes) { MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder(); try { builder.mergeFrom(bytes); } catch (InvalidProtocolBufferException e) { //Ignored there are just no key values to add. } MapReduceProtos.ScanMetrics pScanMetrics = builder.build(); ScanMetrics scanMetrics = new ScanMetrics(); for (HBaseProtos.NameInt64Pair pair : pScanMetrics.getMetricsList()) { if (pair.hasName() && pair.hasValue()) { scanMetrics.setCounter(pair.getName(), pair.getValue()); } } return scanMetrics; }
/** * If hbase runs on new version of mapreduce, RecordReader has access to * counters thus can update counters based on scanMetrics. * If hbase runs on old version of mapreduce, it won't be able to get * access to counters and TableRecorderReader can't update counter values. * @throws IOException */ private void updateCounters() throws IOException { ScanMetrics scanMetrics = this.scan.getScanMetrics(); if (scanMetrics == null) { return; } updateCounters(scanMetrics, numRestarts, getCounter, context, numStale); }
@Override public boolean nextKeyValue() throws IOException, InterruptedException { boolean result = delegate.nextKeyValue(); if (result) { ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics(); if (scanMetrics != null && context != null) { TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0); } } return result; }
private ScanMetrics getScanMetrics(Scan scan) throws Exception { byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); assertTrue("Serialized metrics were not found.", serializedMetrics != null); ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics); return scanMetrics; }
public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) { MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder(); Map<String, Long> metrics = scanMetrics.getMetricsMap(); for (Entry<String, Long> e : metrics.entrySet()) { HBaseProtos.NameInt64Pair nameInt64Pair = HBaseProtos.NameInt64Pair.newBuilder() .setName(e.getKey()) .setValue(e.getValue()) .build(); builder.addMetrics(nameInt64Pair); } return builder.build(); }
/** * * @param connection * @param tableName * @param scan * @param scanMetrics * @param id the replicaId */ public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { super(connection, tableName, scan.getStartRow()); this.id = id; this.cConnection = connection; this.scan = scan; this.scanMetrics = scanMetrics; Configuration conf = connection.getConfiguration(); logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); this.controllerFactory = rpcControllerFactory; }
/** * @deprecated use * {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan, ScanMetrics, byte[], RpcControllerFactory )} */ @Deprecated public ReversedScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) { this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory .instantiate(connection.getConfiguration())); }
/** * Check and initialize if application wants to collect scan metrics */ protected void initScanMetrics(Scan scan) { // check if application wants to collect scan metrics if (scan.isScanMetricsEnabled()) { scanMetrics = new ScanMetrics(); } }
public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) { scan.setStartRow(localStartKey); SmallScannerCallable s = new SmallScannerCallable( connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0); ScannerCallableWithReplicas scannerCallableWithReplicas = new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan, retries, scannerTimeout, cacheNum, conf, caller); return scannerCallableWithReplicas; }
private SmallScannerCallableFactory getFactory( final ScannerCallableWithReplicas callableWithReplicas) { return new SmallScannerCallableFactory() { @Override public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) { return callableWithReplicas; } }; }
/** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start * row maybe changed changed. * @param conf The {@link Configuration} to use. * @param scan {@link Scan} to use in this scanner * @param tableName The table that we wish to scan * @param connection Connection identifying the cluster * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final byte[] tableName, HConnection connection) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Creating scanner over " + Bytes.toString(tableName) + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'"); } this.scan = scan; this.tableName = tableName; this.lastNext = System.currentTimeMillis(); this.connection = connection; this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.scannerTimeout = (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD); // check if application wants to collect scan metrics byte[] enableMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE); if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) { scanMetrics = new ScanMetrics(); } // Use the caching from the Scan. If not set, use the default cache setting for this table. if (this.scan.getCaching() > 0) { this.caching = this.scan.getCaching(); } else { this.caching = conf.getInt("hbase.client.scanner.caching", 1); } // initialize the scanner initializeScannerInConstruction(); }
/** * @param connection which connection * @param tableName table callable is on * @param scan the scan to execute * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect * metrics */ public ScannerCallable(HConnection connection, byte[] tableName, Scan scan, ScanMetrics scanMetrics) { super(connection, tableName, scan.getStartRow()); this.scan = scan; this.scanMetrics = scanMetrics; Configuration conf = connection.getConfiguration(); logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); }
/** * If hbase runs on new version of mapreduce, RecordReader has access to * counters thus can update counters based on scanMetrics. * If hbase runs on old version of mapreduce, it won't be able to get * access to counters and TableRecorderReader can't update counter values. * @throws IOException */ private void updateCounters() throws IOException { // we can get access to counters only if hbase uses new mapreduce APIs if (this.getCounter == null) { return; } byte[] serializedMetrics = currentScan.getAttribute( Scan.SCAN_ATTRIBUTES_METRICS_DATA); if (serializedMetrics == null || serializedMetrics.length == 0 ) { return; } DataInputBuffer in = new DataInputBuffer(); in.reset(serializedMetrics, 0, serializedMetrics.length); ScanMetrics scanMetrics = new ScanMetrics(); scanMetrics.readFields(in); MetricsTimeVaryingLong[] mlvs = scanMetrics.getMetricsTimeVaryingLongArray(); try { for (MetricsTimeVaryingLong mlv : mlvs) { Counter ct = (Counter)this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, mlv.getName()); ct.increment(mlv.getCurrentIntervalValue()); } ((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME, "NUM_SCANNER_RESTARTS")).increment(numRestarts); } catch (Exception e) { LOG.debug("can't update counter." + StringUtils.stringifyException(e)); } }
private ScanMetrics getScanMetrics(Scan scan) throws Exception { byte[] serializedMetrics = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); assertTrue("Serialized metrics were not found.", serializedMetrics != null); DataInputBuffer in = new DataInputBuffer(); in.reset(serializedMetrics, 0, serializedMetrics.length); ScanMetrics scanMetrics = new ScanMetrics(); scanMetrics.readFields(in); return scanMetrics; }