/** * Resolves a host subject to the security requirements determined by * hadoop.security.token.service.use_ip. Optionally logs slow resolutions. * * @param hostname host or ip to resolve * @return a resolved host * @throws UnknownHostException if the host doesn't exist */ @InterfaceAudience.Private public static InetAddress getByName(String hostname) throws UnknownHostException { if (logSlowLookups || LOG.isTraceEnabled()) { StopWatch lookupTimer = new StopWatch().start(); InetAddress result = hostResolver.getByName(hostname); long elapsedMs = lookupTimer.stop().now(TimeUnit.MILLISECONDS); if (elapsedMs >= slowLookupThresholdMs) { LOG.warn("Slow name lookup for " + hostname + ". Took " + elapsedMs + " ms."); } else if (LOG.isTraceEnabled()) { LOG.trace("Name lookup for " + hostname + " took " + elapsedMs + " ms."); } return result; } else { return hostResolver.getByName(hostname); } }
private void doPerfTest(int editsSize, int numEdits) throws Exception { byte[] data = new byte[editsSize]; ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get(); StopWatch sw = new StopWatch().start(); for (int i = 1; i < numEdits; i++) { ch.sendEdits(1L, i, 1, data).get(); } long time = sw.now(TimeUnit.MILLISECONDS); System.err.println("Wrote " + numEdits + " batches of " + editsSize + " bytes in " + time + "ms"); float avgRtt = (float)time/(float)numEdits; long throughput = ((long)numEdits * editsSize * 1000L)/time; System.err.println("Time per batch: " + avgRtt + "ms"); System.err.println("Throughput: " + throughput + " bytes/sec"); }
public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> valueclass, String inputpath, String outputpath) throws Exception { job = Job.getInstance(conf, jobname); job.setJarByClass(KVJob.class); job.setMapperClass(KVJob.ValueMapper.class); job.setOutputKeyClass(keyclass); job.setMapOutputValueClass(valueclass); if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) { final FileSystem fs = FileSystem.get(conf); fs.delete(new Path(inputpath), true); fs.close(); final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get( TestConstants.FILESIZE_KEY, "1000")), keyclass.getName(), valueclass.getName(), conf); StopWatch sw = new StopWatch().start(); testfile.createSequenceTestFile(inputpath); LOG.info("Created test file " + inputpath + " in " + sw.now(TimeUnit.MILLISECONDS) + "ms"); } job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(job, new Path(inputpath)); FileOutputFormat.setOutputPath(job, new Path(outputpath)); }
private long writeFile(Path path) throws IOException { StopWatch sw = new StopWatch().start(); System.out.println("Writing " + path); long dataSize = dataSizeMB * 1024 * 1024L; long remaining = dataSize; try (FSDataOutputStream outputStream = fs.create(path)) { if (!isGen) { fs.deleteOnExit(path); } int toWrite; while (remaining > 0) { toWrite = (int) Math.min(remaining, data.length); outputStream.write(data, 0, toWrite); remaining -= toWrite; } System.out.println("Finished writing " + path + ". Time taken: " + sw.now(TimeUnit.SECONDS) + " s."); return dataSize - remaining; } }
private void doAWrite() throws IOException { StopWatch sw = new StopWatch().start(); stm.write(toWrite); stm.hflush(); long micros = sw.now(TimeUnit.MICROSECONDS); quantiles.insert(micros); }
public int run(String args[]) throws Exception { if (args.length != 1) { System.err.println( "usage: " + TestMultiThreadedHflush.class.getSimpleName() + " <path to test file> "); System.err.println( "Configurations settable by -D options:\n" + " num.threads [default 10] - how many threads to run\n" + " write.size [default 511] - bytes per write\n" + " num.writes [default 50000] - how many writes to perform"); System.exit(1); } TestMultiThreadedHflush test = new TestMultiThreadedHflush(); Configuration conf = getConf(); Path p = new Path(args[0]); int numThreads = conf.getInt("num.threads", 10); int writeSize = conf.getInt("write.size", 511); int numWrites = conf.getInt("num.writes", 50000); int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); StopWatch sw = new StopWatch().start(); test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites, replication); sw.stop(); System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms"); System.out.println("Latency quantiles (in microseconds):\n" + test.quantiles); return 0; }
private void benchmark(OpType type, int dataSizeMB, int numClients, boolean isEc, boolean statefulRead) throws Exception { List<Long> sizes = null; StopWatch sw = new StopWatch().start(); switch (type) { case READ: sizes = doBenchmark(true, dataSizeMB, numClients, isEc, statefulRead, false); break; case WRITE: sizes = doBenchmark( false, dataSizeMB, numClients, isEc, statefulRead, false); break; case GEN: sizes = doBenchmark(false, dataSizeMB, numClients, isEc, statefulRead, true); } long elapsedSec = sw.now(TimeUnit.SECONDS); double totalDataSizeMB = 0; for (Long size : sizes) { if (size >= 0) { totalDataSizeMB += size.doubleValue() / 1024 / 1024; } } double throughput = totalDataSizeMB / elapsedSec; DecimalFormat df = getDecimalFormat(); System.out.println(type + " " + df.format(totalDataSizeMB) + " MB data takes: " + elapsedSec + " s.\nTotal throughput: " + df.format(throughput) + " MB/s."); }
private long readFile(Path path) throws IOException { try (FSDataInputStream inputStream = fs.open(path)) { StopWatch sw = new StopWatch().start(); System.out.println((statefulRead ? "Stateful reading " : "Positional reading ") + path); long totalRead = statefulRead ? doStateful(inputStream) : doPositional(inputStream); System.out.println( (statefulRead ? "Finished stateful read " : "Finished positional read ") + path + ". Time taken: " + sw.now(TimeUnit.SECONDS) + " s."); return totalRead; } }
/** * More involved test, including detecting blocking when at capacity. */ @Test public void testSubmitRunnable() throws Exception { ensureCreated(); int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS; StopWatch stopWatch = new StopWatch().start(); for (int i = 0; i < totalTasks; i++) { tpe.submit(sleeper); assertDidntBlock(stopWatch); } tpe.submit(sleeper); assertDidBlock(stopWatch); }
private void assertDidntBlock(StopWatch sw) { try { assertFalse("Non-blocking call took too long.", sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC); } finally { sw.reset().start(); } }
private void assertDidBlock(StopWatch sw) { try { if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) { throw new RuntimeException("Blocking call returned too fast."); } } finally { sw.reset().start(); } }
@Override public Long call() throws Exception { long rounds = BenchData.totalDataSizeKB / BenchData.bufferSizeKB; StopWatch sw = new StopWatch().start(); for (long i = 0; i < rounds; i++) { while (testData.remaining() > 0) { for (ByteBuffer output : benchData.outputs) { output.clear(); } for (int j = 0; j < benchData.inputs.length; j++) { benchData.inputs[j] = testData.duplicate(); benchData.inputs[j].limit( testData.position() + BenchData.chunkSize); benchData.inputs[j] = benchData.inputs[j].slice(); testData.position(testData.position() + BenchData.chunkSize); } if (isEncode) { benchData.encode(encoder); } else { benchData.prepareDecInput(); benchData.decode(decoder); } } testData.clear(); } return sw.now(TimeUnit.MILLISECONDS); }
/** List input directories. * Subclasses may override to, e.g., select only files matching a regular * expression. * * @param job the job to list input paths for * @return array of FileStatus objects * @throws IOException if zero items. */ protected List<FileStatus> listStatus(JobContext job ) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); List<FileStatus> result = null; int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS); StopWatch sw = new StopWatch().start(); if (numThreads == 1) { result = singleThreadedListStatus(job, dirs, inputFilter, recursive); } else { Iterable<FileStatus> locatedFiles = null; try { LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( job.getConfiguration(), dirs, recursive, inputFilter, true); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException e) { throw new IOException("Interrupted while getting file statuses"); } result = Lists.newArrayList(locatedFiles); } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time taken to get FileStatuses: " + sw.now(TimeUnit.MILLISECONDS)); } LOG.info("Total input paths to process : " + result.size()); return result; }
/** * Write a batch of edits to the journal. * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkFormatted(); checkWriteRequest(reqInfo); checkSync(curSegment != null, "Can't write, no segment open"); if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. // This could cause us to continue writing to an old segment // instead of rolling to a new one, which breaks one of the // invariants in the design. If it happens, abort the segment // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + curSegmentTxId); abortCurSegment(); throw e; } checkSync(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are // "catching up" with the rest. Hence we do not need to fsync. boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); StopWatch sw = new StopWatch(); sw.start(); curSegment.flush(shouldFsync); sw.stop(); long nanoSeconds = sw.now(); metrics.addSync( TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS)); long milliSeconds = TimeUnit.MILLISECONDS.convert( nanoSeconds, TimeUnit.NANOSECONDS); if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + milliSeconds + "ms"); } if (isLagging) { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.incr(1); } metrics.batchesWritten.incr(1); metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); highestWrittenTxId = lastTxnId; nextTxId = lastTxnId + 1; }
/** List input directories. * Subclasses may override to, e.g., select only files matching a regular * expression. * * @param job the job to list input paths for * @return array of FileStatus objects * @throws IOException if zero items. */ protected List<FileStatus> listStatus(JobContext job ) throws IOException { Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); List<FileStatus> result = null; int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS, DEFAULT_LIST_STATUS_NUM_THREADS); StopWatch sw = new StopWatch().start(); if (numThreads == 1) { result = singleThreadedListStatus(job, dirs, inputFilter, recursive); } else { Iterable<FileStatus> locatedFiles = null; try { LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher( job.getConfiguration(), dirs, recursive, inputFilter, true); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException e) { throw new IOException("Interrupted while getting file statuses"); } result = Lists.newArrayList(locatedFiles); } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time taken to get FileStatuses: " + sw.now(TimeUnit.MILLISECONDS)); } LOG.info("Total input files to process : " + result.size()); return result; }
/** * Write a batch of edits to the journal. * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ synchronized void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkFormatted(); checkWriteRequest(reqInfo); checkSync(curSegment != null, "Can't write, no segment open"); if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs // on both the finalize() and then the start() of the next segment. // This could cause us to continue writing to an old segment // instead of rolling to a new one, which breaks one of the // invariants in the design. If it happens, abort the segment // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId + " but current segment is " + curSegmentTxId); abortCurSegment(); throw e; } checkSync(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); } // If the edit has already been marked as committed, we know // it has been fsynced on a quorum of other nodes, and we are // "catching up" with the rest. Hence we do not need to fsync. boolean isLagging = lastTxnId <= committedTxnId.get(); boolean shouldFsync = !isLagging; curSegment.writeRaw(records, 0, records.length); curSegment.setReadyToFlush(); StopWatch sw = new StopWatch(); sw.start(); curSegment.flush(shouldFsync); sw.stop(); long nanoSeconds = sw.now(); metrics.addSync( TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS)); long milliSeconds = TimeUnit.MILLISECONDS.convert( nanoSeconds, TimeUnit.NANOSECONDS); if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + " took " + milliSeconds + "ms"); } if (isLagging) { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.incr(1); } metrics.batchesWritten.incr(1); metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); updateHighestWrittenTxId(lastTxnId); nextTxId = lastTxnId + 1; lastJournalTimestamp = Time.now(); }
/** * Performs benchmark. * * @param opType The operation to perform. Can be encode or decode * @param coder The coder to use * @param numThreads Number of threads to launch concurrently * @param dataSizeMB Total test data size in MB * @param chunkSizeKB Chunk size in KB */ public static void performBench(String opType, CODER coder, int numThreads, int dataSizeMB, int chunkSizeKB) throws Exception { BenchData.configure(dataSizeMB, chunkSizeKB); RawErasureEncoder encoder = null; RawErasureDecoder decoder = null; ByteBuffer testData; boolean isEncode = opType.equals("encode"); if (isEncode) { encoder = getRawEncoder(coder.ordinal()); testData = genTestData(encoder.preferDirectBuffer(), BenchData.bufferSizeKB); } else { decoder = getRawDecoder(coder.ordinal()); testData = genTestData(decoder.preferDirectBuffer(), BenchData.bufferSizeKB); } ExecutorService executor = Executors.newFixedThreadPool(numThreads); List<Future<Long>> futures = new ArrayList<>(numThreads); StopWatch sw = new StopWatch().start(); for (int i = 0; i < numThreads; i++) { futures.add(executor.submit(new BenchmarkCallable(isEncode, encoder, decoder, testData.duplicate()))); } List<Long> durations = new ArrayList<>(numThreads); try { for (Future<Long> future : futures) { durations.add(future.get()); } long duration = sw.now(TimeUnit.MILLISECONDS); double totalDataSize = BenchData.totalDataSizeKB * numThreads / 1024.0; DecimalFormat df = new DecimalFormat("#.##"); System.out.println(coder + " " + opType + " " + df.format(totalDataSize) + "MB data, with chunk size " + BenchData.chunkSize / 1024 + "KB"); System.out.println("Total time: " + df.format(duration / 1000.0) + " s."); System.out.println("Total throughput: " + df.format( totalDataSize / duration * 1000.0) + " MB/s"); printThreadStatistics(durations, df); } catch (Exception e) { System.out.println("Error waiting for thread to finish."); e.printStackTrace(); throw e; } finally { executor.shutdown(); } }