Java 类org.apache.hadoop.util.StopWatch 实例源码
项目:hadoop-oss
文件:SecurityUtil.java
/**
* Resolves a host subject to the security requirements determined by
* hadoop.security.token.service.use_ip. Optionally logs slow resolutions.
*
* @param hostname host or ip to resolve
* @return a resolved host
* @throws UnknownHostException if the host doesn't exist
*/
@InterfaceAudience.Private
public static
InetAddress getByName(String hostname) throws UnknownHostException {
if (logSlowLookups || LOG.isTraceEnabled()) {
StopWatch lookupTimer = new StopWatch().start();
InetAddress result = hostResolver.getByName(hostname);
long elapsedMs = lookupTimer.stop().now(TimeUnit.MILLISECONDS);
if (elapsedMs >= slowLookupThresholdMs) {
LOG.warn("Slow name lookup for " + hostname + ". Took " + elapsedMs +
" ms.");
} else if (LOG.isTraceEnabled()) {
LOG.trace("Name lookup for " + hostname + " took " + elapsedMs +
" ms.");
}
return result;
} else {
return hostResolver.getByName(hostname);
}
}
项目:hadoop
文件:TestJournalNode.java
private void doPerfTest(int editsSize, int numEdits) throws Exception {
byte[] data = new byte[editsSize];
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
StopWatch sw = new StopWatch().start();
for (int i = 1; i < numEdits; i++) {
ch.sendEdits(1L, i, 1, data).get();
}
long time = sw.now(TimeUnit.MILLISECONDS);
System.err.println("Wrote " + numEdits + " batches of " + editsSize +
" bytes in " + time + "ms");
float avgRtt = (float)time/(float)numEdits;
long throughput = ((long)numEdits * editsSize * 1000L)/time;
System.err.println("Time per batch: " + avgRtt + "ms");
System.err.println("Throughput: " + throughput + " bytes/sec");
}
项目:aliyun-oss-hadoop-fs
文件:KVJob.java
public KVJob(String jobname, Configuration conf,
Class<?> keyclass, Class<?> valueclass,
String inputpath, String outputpath) throws Exception {
job = Job.getInstance(conf, jobname);
job.setJarByClass(KVJob.class);
job.setMapperClass(KVJob.ValueMapper.class);
job.setOutputKeyClass(keyclass);
job.setMapOutputValueClass(valueclass);
if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) {
final FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(inputpath), true);
fs.close();
final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get(
TestConstants.FILESIZE_KEY, "1000")),
keyclass.getName(), valueclass.getName(), conf);
StopWatch sw = new StopWatch().start();
testfile.createSequenceTestFile(inputpath);
LOG.info("Created test file " + inputpath + " in "
+ sw.now(TimeUnit.MILLISECONDS) + "ms");
}
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputpath));
FileOutputFormat.setOutputPath(job, new Path(outputpath));
}
项目:aliyun-oss-hadoop-fs
文件:ErasureCodeBenchmarkThroughput.java
private long writeFile(Path path) throws IOException {
StopWatch sw = new StopWatch().start();
System.out.println("Writing " + path);
long dataSize = dataSizeMB * 1024 * 1024L;
long remaining = dataSize;
try (FSDataOutputStream outputStream = fs.create(path)) {
if (!isGen) {
fs.deleteOnExit(path);
}
int toWrite;
while (remaining > 0) {
toWrite = (int) Math.min(remaining, data.length);
outputStream.write(data, 0, toWrite);
remaining -= toWrite;
}
System.out.println("Finished writing " + path + ". Time taken: " +
sw.now(TimeUnit.SECONDS) + " s.");
return dataSize - remaining;
}
}
项目:aliyun-oss-hadoop-fs
文件:TestJournalNode.java
private void doPerfTest(int editsSize, int numEdits) throws Exception {
byte[] data = new byte[editsSize];
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
StopWatch sw = new StopWatch().start();
for (int i = 1; i < numEdits; i++) {
ch.sendEdits(1L, i, 1, data).get();
}
long time = sw.now(TimeUnit.MILLISECONDS);
System.err.println("Wrote " + numEdits + " batches of " + editsSize +
" bytes in " + time + "ms");
float avgRtt = (float)time/(float)numEdits;
long throughput = ((long)numEdits * editsSize * 1000L)/time;
System.err.println("Time per batch: " + avgRtt + "ms");
System.err.println("Throughput: " + throughput + " bytes/sec");
}
项目:big-c
文件:TestJournalNode.java
private void doPerfTest(int editsSize, int numEdits) throws Exception {
byte[] data = new byte[editsSize];
ch.newEpoch(1).get();
ch.setEpoch(1);
ch.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
StopWatch sw = new StopWatch().start();
for (int i = 1; i < numEdits; i++) {
ch.sendEdits(1L, i, 1, data).get();
}
long time = sw.now(TimeUnit.MILLISECONDS);
System.err.println("Wrote " + numEdits + " batches of " + editsSize +
" bytes in " + time + "ms");
float avgRtt = (float)time/(float)numEdits;
long throughput = ((long)numEdits * editsSize * 1000L)/time;
System.err.println("Time per batch: " + avgRtt + "ms");
System.err.println("Throughput: " + throughput + " bytes/sec");
}
项目:hadoop
文件:TestMultiThreadedHflush.java
private void doAWrite() throws IOException {
StopWatch sw = new StopWatch().start();
stm.write(toWrite);
stm.hflush();
long micros = sw.now(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
}
项目:hadoop
文件:TestMultiThreadedHflush.java
public int run(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.err.println(
"Configurations settable by -D options:\n" +
" num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
" num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
StopWatch sw = new StopWatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
项目:aliyun-oss-hadoop-fs
文件:ErasureCodeBenchmarkThroughput.java
private void benchmark(OpType type, int dataSizeMB,
int numClients, boolean isEc, boolean statefulRead) throws Exception {
List<Long> sizes = null;
StopWatch sw = new StopWatch().start();
switch (type) {
case READ:
sizes = doBenchmark(true, dataSizeMB, numClients, isEc,
statefulRead, false);
break;
case WRITE:
sizes = doBenchmark(
false, dataSizeMB, numClients, isEc, statefulRead, false);
break;
case GEN:
sizes = doBenchmark(false, dataSizeMB, numClients, isEc,
statefulRead, true);
}
long elapsedSec = sw.now(TimeUnit.SECONDS);
double totalDataSizeMB = 0;
for (Long size : sizes) {
if (size >= 0) {
totalDataSizeMB += size.doubleValue() / 1024 / 1024;
}
}
double throughput = totalDataSizeMB / elapsedSec;
DecimalFormat df = getDecimalFormat();
System.out.println(type + " " + df.format(totalDataSizeMB) +
" MB data takes: " + elapsedSec + " s.\nTotal throughput: " +
df.format(throughput) + " MB/s.");
}
项目:aliyun-oss-hadoop-fs
文件:ErasureCodeBenchmarkThroughput.java
private long readFile(Path path) throws IOException {
try (FSDataInputStream inputStream = fs.open(path)) {
StopWatch sw = new StopWatch().start();
System.out.println((statefulRead ? "Stateful reading " :
"Positional reading ") + path);
long totalRead = statefulRead ? doStateful(inputStream) :
doPositional(inputStream);
System.out.println(
(statefulRead ? "Finished stateful read " :
"Finished positional read ") + path + ". Time taken: " +
sw.now(TimeUnit.SECONDS) + " s.");
return totalRead;
}
}
项目:aliyun-oss-hadoop-fs
文件:TestMultiThreadedHflush.java
private void doAWrite() throws IOException {
StopWatch sw = new StopWatch().start();
stm.write(toWrite);
stm.hflush();
long micros = sw.now(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
}
项目:aliyun-oss-hadoop-fs
文件:TestMultiThreadedHflush.java
public int run(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.err.println(
"Configurations settable by -D options:\n" +
" num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
" num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
StopWatch sw = new StopWatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
项目:aliyun-oss-hadoop-fs
文件:TestBlockingThreadPoolExecutorService.java
/**
* More involved test, including detecting blocking when at capacity.
*/
@Test
public void testSubmitRunnable() throws Exception {
ensureCreated();
int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
StopWatch stopWatch = new StopWatch().start();
for (int i = 0; i < totalTasks; i++) {
tpe.submit(sleeper);
assertDidntBlock(stopWatch);
}
tpe.submit(sleeper);
assertDidBlock(stopWatch);
}
项目:aliyun-oss-hadoop-fs
文件:TestBlockingThreadPoolExecutorService.java
private void assertDidntBlock(StopWatch sw) {
try {
assertFalse("Non-blocking call took too long.",
sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
} finally {
sw.reset().start();
}
}
项目:aliyun-oss-hadoop-fs
文件:TestBlockingThreadPoolExecutorService.java
private void assertDidBlock(StopWatch sw) {
try {
if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
throw new RuntimeException("Blocking call returned too fast.");
}
} finally {
sw.reset().start();
}
}
项目:big-c
文件:TestMultiThreadedHflush.java
private void doAWrite() throws IOException {
StopWatch sw = new StopWatch().start();
stm.write(toWrite);
stm.hflush();
long micros = sw.now(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
}
项目:big-c
文件:TestMultiThreadedHflush.java
public int run(String args[]) throws Exception {
if (args.length != 1) {
System.err.println(
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> ");
System.err.println(
"Configurations settable by -D options:\n" +
" num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
" num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
StopWatch sw = new StopWatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.now(TimeUnit.MILLISECONDS) + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
项目:hops
文件:RawErasureCoderBenchmark.java
@Override
public Long call() throws Exception {
long rounds = BenchData.totalDataSizeKB / BenchData.bufferSizeKB;
StopWatch sw = new StopWatch().start();
for (long i = 0; i < rounds; i++) {
while (testData.remaining() > 0) {
for (ByteBuffer output : benchData.outputs) {
output.clear();
}
for (int j = 0; j < benchData.inputs.length; j++) {
benchData.inputs[j] = testData.duplicate();
benchData.inputs[j].limit(
testData.position() + BenchData.chunkSize);
benchData.inputs[j] = benchData.inputs[j].slice();
testData.position(testData.position() + BenchData.chunkSize);
}
if (isEncode) {
benchData.encode(encoder);
} else {
benchData.prepareDecInput();
benchData.decode(decoder);
}
}
testData.clear();
}
return sw.now(TimeUnit.MILLISECONDS);
}
项目:hadoop
文件:FileInputFormat.java
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
项目:hadoop
文件:Journal.java
/**
* Write a batch of edits to the journal.
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
*/
synchronized void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkFormatted();
checkWriteRequest(reqInfo);
checkSync(curSegment != null,
"Can't write, no segment open");
if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs
// on both the finalize() and then the start() of the next segment.
// This could cause us to continue writing to an old segment
// instead of rolling to a new one, which breaks one of the
// invariants in the design. If it happens, abort the segment
// and throw an exception.
JournalOutOfSyncException e = new JournalOutOfSyncException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId);
abortCurSegment();
throw e;
}
checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
long lastTxnId = firstTxnId + numTxns - 1;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
}
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
StopWatch sw = new StopWatch();
sw.start();
curSegment.flush(shouldFsync);
sw.stop();
long nanoSeconds = sw.now();
metrics.addSync(
TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
long milliSeconds = TimeUnit.MILLISECONDS.convert(
nanoSeconds, TimeUnit.NANOSECONDS);
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
" took " + milliSeconds + "ms");
}
if (isLagging) {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
highestWrittenTxId = lastTxnId;
nextTxId = lastTxnId + 1;
}
项目:aliyun-oss-hadoop-fs
文件:FileInputFormat.java
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input files to process : " + result.size());
return result;
}
项目:aliyun-oss-hadoop-fs
文件:Journal.java
/**
* Write a batch of edits to the journal.
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
*/
synchronized void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkFormatted();
checkWriteRequest(reqInfo);
checkSync(curSegment != null,
"Can't write, no segment open");
if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs
// on both the finalize() and then the start() of the next segment.
// This could cause us to continue writing to an old segment
// instead of rolling to a new one, which breaks one of the
// invariants in the design. If it happens, abort the segment
// and throw an exception.
JournalOutOfSyncException e = new JournalOutOfSyncException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId);
abortCurSegment();
throw e;
}
checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
long lastTxnId = firstTxnId + numTxns - 1;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
}
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
StopWatch sw = new StopWatch();
sw.start();
curSegment.flush(shouldFsync);
sw.stop();
long nanoSeconds = sw.now();
metrics.addSync(
TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
long milliSeconds = TimeUnit.MILLISECONDS.convert(
nanoSeconds, TimeUnit.NANOSECONDS);
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
" took " + milliSeconds + "ms");
}
if (isLagging) {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
updateHighestWrittenTxId(lastTxnId);
nextTxId = lastTxnId + 1;
lastJournalTimestamp = Time.now();
}
项目:big-c
文件:FileInputFormat.java
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
项目:big-c
文件:Journal.java
/**
* Write a batch of edits to the journal.
* {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
*/
synchronized void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkFormatted();
checkWriteRequest(reqInfo);
checkSync(curSegment != null,
"Can't write, no segment open");
if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs
// on both the finalize() and then the start() of the next segment.
// This could cause us to continue writing to an old segment
// instead of rolling to a new one, which breaks one of the
// invariants in the design. If it happens, abort the segment
// and throw an exception.
JournalOutOfSyncException e = new JournalOutOfSyncException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId);
abortCurSegment();
throw e;
}
checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
long lastTxnId = firstTxnId + numTxns - 1;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
}
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
// "catching up" with the rest. Hence we do not need to fsync.
boolean isLagging = lastTxnId <= committedTxnId.get();
boolean shouldFsync = !isLagging;
curSegment.writeRaw(records, 0, records.length);
curSegment.setReadyToFlush();
StopWatch sw = new StopWatch();
sw.start();
curSegment.flush(shouldFsync);
sw.stop();
long nanoSeconds = sw.now();
metrics.addSync(
TimeUnit.MICROSECONDS.convert(nanoSeconds, TimeUnit.NANOSECONDS));
long milliSeconds = TimeUnit.MILLISECONDS.convert(
nanoSeconds, TimeUnit.NANOSECONDS);
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
" took " + milliSeconds + "ms");
}
if (isLagging) {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
highestWrittenTxId = lastTxnId;
nextTxId = lastTxnId + 1;
}
项目:hops
文件:FileInputFormat.java
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job
) throws IOException {
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
List<FileStatus> result = null;
int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,
DEFAULT_LIST_STATUS_NUM_THREADS);
StopWatch sw = new StopWatch().start();
if (numThreads == 1) {
result = singleThreadedListStatus(job, dirs, inputFilter, recursive);
} else {
Iterable<FileStatus> locatedFiles = null;
try {
LocatedFileStatusFetcher locatedFileStatusFetcher = new LocatedFileStatusFetcher(
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
throw new IOException("Interrupted while getting file statuses");
}
result = Lists.newArrayList(locatedFiles);
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Time taken to get FileStatuses: "
+ sw.now(TimeUnit.MILLISECONDS));
}
LOG.info("Total input files to process : " + result.size());
return result;
}
项目:hops
文件:RawErasureCoderBenchmark.java
/**
* Performs benchmark.
*
* @param opType The operation to perform. Can be encode or decode
* @param coder The coder to use
* @param numThreads Number of threads to launch concurrently
* @param dataSizeMB Total test data size in MB
* @param chunkSizeKB Chunk size in KB
*/
public static void performBench(String opType, CODER coder,
int numThreads, int dataSizeMB, int chunkSizeKB) throws Exception {
BenchData.configure(dataSizeMB, chunkSizeKB);
RawErasureEncoder encoder = null;
RawErasureDecoder decoder = null;
ByteBuffer testData;
boolean isEncode = opType.equals("encode");
if (isEncode) {
encoder = getRawEncoder(coder.ordinal());
testData = genTestData(encoder.preferDirectBuffer(),
BenchData.bufferSizeKB);
} else {
decoder = getRawDecoder(coder.ordinal());
testData = genTestData(decoder.preferDirectBuffer(),
BenchData.bufferSizeKB);
}
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<Long>> futures = new ArrayList<>(numThreads);
StopWatch sw = new StopWatch().start();
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(new BenchmarkCallable(isEncode,
encoder, decoder, testData.duplicate())));
}
List<Long> durations = new ArrayList<>(numThreads);
try {
for (Future<Long> future : futures) {
durations.add(future.get());
}
long duration = sw.now(TimeUnit.MILLISECONDS);
double totalDataSize = BenchData.totalDataSizeKB * numThreads / 1024.0;
DecimalFormat df = new DecimalFormat("#.##");
System.out.println(coder + " " + opType + " " +
df.format(totalDataSize) + "MB data, with chunk size " +
BenchData.chunkSize / 1024 + "KB");
System.out.println("Total time: " + df.format(duration / 1000.0) + " s.");
System.out.println("Total throughput: " + df.format(
totalDataSize / duration * 1000.0) + " MB/s");
printThreadStatistics(durations, df);
} catch (Exception e) {
System.out.println("Error waiting for thread to finish.");
e.printStackTrace();
throw e;
} finally {
executor.shutdown();
}
}