void updateCounters() { String counterPrifix = schema.toUpperCase() + "_"; long readBytes = 0; long writeBytes = 0; long readOps = 0; long largeReadOps = 0; long writeOps = 0; for (FileSystem.Statistics stat : stats) { readBytes = readBytes + stat.getBytesRead(); writeBytes = writeBytes + stat.getBytesWritten(); readOps = readOps + stat.getReadOps(); largeReadOps = largeReadOps + stat.getLargeReadOps(); writeOps = writeOps + stat.getWriteOps(); } PSAgentContext.get().getMetrics() .put(counterPrifix + AngelCounter.BYTES_READ, Long.toString(readBytes)); PSAgentContext.get().getMetrics() .put(counterPrifix.toString() + AngelCounter.BYTES_WRITTEN, Long.toString(writeBytes)); PSAgentContext.get().getMetrics() .put(counterPrifix + AngelCounter.READ_OPS, Long.toString(readOps)); PSAgentContext.get().getMetrics() .put(counterPrifix + AngelCounter.LARGE_READ_OPS, Long.toString(largeReadOps)); PSAgentContext.get().getMetrics() .put(counterPrifix + AngelCounter.WRITE_OPS, Long.toString(writeOps)); }
@Test public void testStatistics() throws IOException, URISyntaxException { URI fsUri = getFsUri(); Statistics stats = FileContext.getStatistics(fsUri); Assert.assertEquals(0, stats.getBytesRead()); Path filePath = fileContextTestHelper .getTestRootPath(fc, "file1"); createFile(fc, filePath, numBlocks, blockSize); Assert.assertEquals(0, stats.getBytesRead()); verifyWrittenBytes(stats); FSDataInputStream fstr = fc.open(filePath); byte[] buf = new byte[blockSize]; int bytesRead = fstr.read(buf, 0, blockSize); fstr.read(0, buf, 0, blockSize); Assert.assertEquals(blockSize, bytesRead); verifyReadBytes(stats); verifyWrittenBytes(stats); verifyReadBytes(FileContext.getStatistics(getFsUri())); Map<URI, Statistics> statsMap = FileContext.getAllStatistics(); URI exactUri = getSchemeAuthorityUri(); verifyWrittenBytes(statsMap.get(exactUri)); fc.delete(filePath, true); }
TrackedRecordReader(TaskReporter reporter, JobConf job) throws IOException{ inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ); this.reporter = reporter; List<Statistics> matchedStats = null; if (this.reporter.getInputSplit() instanceof FileSplit) { matchedStats = getFsStatistics(((FileSplit) this.reporter .getInputSplit()).getPath(), job); } fsStats = matchedStats; bytesInPrev = getInputBytes(fsStats); rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(), job, reporter); bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat, TaskReporter reporter, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); List <Statistics> matchedStats = null; if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split) .getPath(), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesInPrev = getInputBytes(fsStats); this.real = inputFormat.createRecordReader(split, taskContext); long bytesInCurr = getInputBytes(fsStats); fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }
@SuppressWarnings("unchecked") NewDirectOutputCollector(MRJobConfig jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { this.reporter = reporter; mapOutputRecordCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(FileOutputFormatCounter.BYTES_WRITTEN); List<Statistics> matchedStats = null; if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat .getOutputPath(taskContext), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesOutPrev = getOutputBytes(fsStats); out = outputFormat.getRecordWriter(taskContext); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }
@SuppressWarnings("unchecked") public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { this.reporter = context.getReporter(); JobConf job = context.getJobConf(); String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); OutputFormat<K, V> outputFormat = job.getOutputFormat(); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(FileOutputFormatCounter.BYTES_WRITTEN); List<Statistics> matchedStats = null; if (outputFormat instanceof FileOutputFormat) { matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job); } fsStats = matchedStats; long bytesOutPrev = getOutputBytes(fsStats); out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }
@SuppressWarnings({ "deprecation", "unchecked" }) public OldTrackingRecordWriter(ReduceTask reduce, JobConf job, TaskReporter reporter, String finalName) throws IOException { this.reduceOutputCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List<Statistics> matchedStats = null; if (job.getOutputFormat() instanceof FileOutputFormat) { matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job); } fsStats = matchedStats; FileSystem fs = FileSystem.get(job); long bytesOutPrev = getOutputBytes(fsStats); this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }
@SuppressWarnings("unchecked") NewTrackingRecordWriter(ReduceTask reduce, org.apache.hadoop.mapreduce.TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List<Statistics> matchedStats = null; if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat .getOutputPath(taskContext), taskContext.getConfiguration()); } fsStats = matchedStats; long bytesOutPrev = getOutputBytes(fsStats); this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduce.outputFormat .getRecordWriter(taskContext); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }
/** * Direct HTTP PUT request without JOSS package * * @param objName name of the object * @param contentType content type * @return HttpURLConnection */ @Override public FSDataOutputStream createObject(String objName, String contentType, Map<String, String> metadata, Statistics statistics) throws IOException { URL url = new URL(mJossAccount.getAccessURL() + "/" + getURLEncodedObjName(objName)); LOG.debug("PUT {}. Content-Type : {}", url.toString(), contentType); // When overwriting an object, cached metadata will be outdated String cachedName = getObjName(container + "/", objName); objectCache.remove(cachedName); try { OutputStream sos; if (nonStreamingUpload) { sos = new SwiftNoStreamingOutputStream(mJossAccount, url, contentType, metadata, swiftConnectionManager, this); } else { sos = new SwiftOutputStream(mJossAccount, url, contentType, metadata, swiftConnectionManager); } return new FSDataOutputStream(sos, statistics); } catch (IOException e) { LOG.error(e.getMessage()); throw e; } }