Java 类org.apache.hadoop.io.nativeio.NativeIO 实例源码
项目:hadoop-oss
文件:RawLocalFileSystem.java
protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
throws IOException {
if (permission == null) {
permission = FsPermission.getDirDefault();
}
permission = permission.applyUMask(umask);
if (Shell.WINDOWS && NativeIO.isAvailable()) {
try {
NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
return true;
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
p2f, permission.toShort()), e);
}
return false;
}
} else {
boolean b = p2f.mkdir();
if (b) {
setPermission(p, permission);
}
return b;
}
}
项目:hadoop-oss
文件:ReadaheadPool.java
@Override
public void run() {
if (canceled) return;
// There's a very narrow race here that the file will close right at
// this instant. But if that happens, we'll likely receive an EBADF
// error below, and see that it's canceled, ignoring the error.
// It's also possible that we'll end up requesting readahead on some
// other FD, which may be wasted work, but won't cause a problem.
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd, off, len, POSIX_FADV_WILLNEED);
} catch (IOException ioe) {
if (canceled) {
// no big deal - the reader canceled the request and closed
// the file.
return;
}
LOG.warn("Failed readahead on " + identifier,
ioe);
}
}
项目:hadoop-oss
文件:SecureIOUtils.java
/**
* Same as openForRandomRead except that it will run even if security is off.
* This is used by unit tests.
*/
@VisibleForTesting
protected static RandomAccessFile forceSecureOpenForRandomRead(File f,
String mode, String expectedOwner, String expectedGroup)
throws IOException {
RandomAccessFile raf = new RandomAccessFile(f, mode);
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(raf.getFD());
checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return raf;
} finally {
if (!success) {
raf.close();
}
}
}
项目:hadoop-oss
文件:SecureIOUtils.java
/**
* Same as openFSDataInputStream except that it will run even if security is
* off. This is used by unit tests.
*/
@VisibleForTesting
protected static FSDataInputStream forceSecureOpenFSDataInputStream(
File file,
String expectedOwner, String expectedGroup) throws IOException {
final FSDataInputStream in =
rawFilesystem.open(new Path(file.getAbsolutePath()));
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(in.getFileDescriptor());
checkStat(file, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return in;
} finally {
if (!success) {
in.close();
}
}
}
项目:hadoop-oss
文件:SecureIOUtils.java
/**
* Same as openForRead() except that it will run even if security is off.
* This is used by unit tests.
*/
@VisibleForTesting
protected static FileInputStream forceSecureOpenForRead(File f, String expectedOwner,
String expectedGroup) throws IOException {
FileInputStream fis = new FileInputStream(f);
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(fis.getFD());
checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return fis;
} finally {
if (!success) {
fis.close();
}
}
}
项目:hadoop
文件:FadvisedChunkedFile.java
@Override
public void close() throws Exception {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd,
getStartOffset(), getEndOffset() - getStartOffset(),
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
}
}
super.close();
}
项目:hadoop
文件:FileJournalManager.java
@Override
synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
File dstFile = NNStorage.getFinalizedEditsFile(
sd, firstTxId, lastTxId);
LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile);
Preconditions.checkState(!dstFile.exists(),
"Can't finalize edits file " + inprogressFile + " since finalized file " +
"already exists");
try {
NativeIO.renameTo(inprogressFile, dstFile);
} catch (IOException e) {
errorReporter.reportErrorOnFile(dstFile);
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile, e);
}
if (inprogressFile.equals(currentInProgress)) {
currentInProgress = null;
}
}
项目:hadoop
文件:FileJournalManager.java
private void renameSelf(String newSuffix) throws IOException {
File src = file;
File dst = new File(src.getParent(), src.getName() + newSuffix);
// renameTo fails on Windows if the destination file already exists.
try {
if (dst.exists()) {
if (!dst.delete()) {
throw new IOException("Couldn't delete " + dst);
}
}
NativeIO.renameTo(src, dst);
} catch (IOException e) {
throw new IOException(
"Couldn't rename log " + src + " to " + dst, e);
}
file = dst;
}
项目:hadoop
文件:FsDatasetImpl.java
/**
* Bump a replica's generation stamp to a new one.
* Its on-disk meta file name is renamed to be the new one too.
*
* @param replicaInfo a replica
* @param newGS new generation stamp
* @throws IOException if rename fails
*/
private void bumpReplicaGS(ReplicaInfo replicaInfo,
long newGS) throws IOException {
long oldGS = replicaInfo.getGenerationStamp();
File oldmeta = replicaInfo.getMetaFile();
replicaInfo.setGenerationStamp(newGS);
File newmeta = replicaInfo.getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
try {
NativeIO.renameTo(oldmeta, newmeta);
} catch (IOException e) {
replicaInfo.setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + replicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to " + newmeta, e);
}
}
项目:hadoop
文件:MappableBlock.java
/**
* Load the block.
*
* mmap and mlock the block, and then verify its checksum.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
* start. The caller must close this.
* @param metaIn The meta file input stream. Should be positioned at
* the start. The caller must close this.
* @param blockFileName The block file name, for logging purposes.
*
* @return The Mappable block.
*/
public static MappableBlock load(long length,
FileInputStream blockIn, FileInputStream metaIn,
String blockFileName) throws IOException {
MappableBlock mappableBlock = null;
MappedByteBuffer mmap = null;
FileChannel blockChannel = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length);
} finally {
IOUtils.closeQuietly(blockChannel);
if (mappableBlock == null) {
if (mmap != null) {
NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
}
}
}
return mappableBlock;
}
项目:hadoop
文件:TestEnhancedByteBufferAccess.java
public static HdfsConfiguration initZeroCopyTest() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
return conf;
}
项目:hadoop
文件:TestFsDatasetCache.java
/**
* Run testCacheAndUncacheBlock with some failures injected into the mlock
* call. This tests the ability of the NameNode to resend commands.
*/
@Test(timeout=600000)
public void testCacheAndUncacheBlockWithRetries() throws Exception {
// We don't have to save the previous cacheManipulator
// because it will be reinstalled by the @After function.
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
private final Set<String> seenIdentifiers = new HashSet<String>();
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
if (seenIdentifiers.contains(identifier)) {
// mlock succeeds the second time.
LOG.info("mlocking " + identifier);
return;
}
seenIdentifiers.add(identifier);
throw new IOException("injecting IOException during mlock of " +
identifier);
}
});
testCacheAndUncacheBlock();
}
项目:hadoop
文件:RawLocalFileSystem.java
private LocalFSFileOutputStream(Path f, boolean append,
FsPermission permission) throws IOException {
File file = pathToFile(f);
if (permission == null) {
this.fos = new FileOutputStream(file, append);
} else {
if (Shell.WINDOWS && NativeIO.isAvailable()) {
this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
append, permission.toShort());
} else {
this.fos = new FileOutputStream(file, append);
boolean success = false;
try {
setPermission(f, permission);
success = true;
} finally {
if (!success) {
IOUtils.cleanup(LOG, this.fos);
}
}
}
}
}
项目:hadoop
文件:RawLocalFileSystem.java
protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
throws IOException {
if (permission == null) {
return p2f.mkdir();
} else {
if (Shell.WINDOWS && NativeIO.isAvailable()) {
try {
NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
return true;
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
p2f, permission.toShort()), e);
}
return false;
}
} else {
boolean b = p2f.mkdir();
if (b) {
setPermission(p, permission);
}
return b;
}
}
}
项目:hadoop
文件:ReadaheadPool.java
@Override
public void run() {
if (canceled) return;
// There's a very narrow race here that the file will close right at
// this instant. But if that happens, we'll likely receive an EBADF
// error below, and see that it's canceled, ignoring the error.
// It's also possible that we'll end up requesting readahead on some
// other FD, which may be wasted work, but won't cause a problem.
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd, off, len, NativeIO.POSIX.POSIX_FADV_WILLNEED);
} catch (IOException ioe) {
if (canceled) {
// no big deal - the reader canceled the request and closed
// the file.
return;
}
LOG.warn("Failed readahead on " + identifier,
ioe);
}
}
项目:hadoop
文件:SecureIOUtils.java
/**
* Same as openForRandomRead except that it will run even if security is off.
* This is used by unit tests.
*/
@VisibleForTesting
protected static RandomAccessFile forceSecureOpenForRandomRead(File f,
String mode, String expectedOwner, String expectedGroup)
throws IOException {
RandomAccessFile raf = new RandomAccessFile(f, mode);
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(raf.getFD());
checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return raf;
} finally {
if (!success) {
raf.close();
}
}
}
项目:hadoop
文件:SecureIOUtils.java
/**
* Same as openFSDataInputStream except that it will run even if security is
* off. This is used by unit tests.
*/
@VisibleForTesting
protected static FSDataInputStream forceSecureOpenFSDataInputStream(
File file,
String expectedOwner, String expectedGroup) throws IOException {
final FSDataInputStream in =
rawFilesystem.open(new Path(file.getAbsolutePath()));
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(in.getFileDescriptor());
checkStat(file, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return in;
} finally {
if (!success) {
in.close();
}
}
}
项目:hadoop
文件:SecureIOUtils.java
/**
* Same as openForRead() except that it will run even if security is off.
* This is used by unit tests.
*/
@VisibleForTesting
protected static FileInputStream forceSecureOpenForRead(File f, String expectedOwner,
String expectedGroup) throws IOException {
FileInputStream fis = new FileInputStream(f);
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(fis.getFD());
checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return fis;
} finally {
if (!success) {
fis.close();
}
}
}
项目:aliyun-oss-hadoop-fs
文件:FadvisedChunkedFile.java
@Override
public void close() throws Exception {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd,
getStartOffset(), getEndOffset() - getStartOffset(),
POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
}
}
super.close();
}
项目:aliyun-oss-hadoop-fs
文件:ShortCircuitShm.java
/**
* Create the ShortCircuitShm.
*
* @param shmId The ID to use.
* @param stream The stream that we're going to use to create this
* shared memory segment.
*
* Although this is a FileInputStream, we are going to
* assume that the underlying file descriptor is writable
* as well as readable. It would be more appropriate to use
* a RandomAccessFile here, but that class does not have
* any public accessor which returns a FileDescriptor,
* unlike FileInputStream.
*/
public ShortCircuitShm(ShmId shmId, FileInputStream stream)
throws IOException {
if (!NativeIO.isAvailable()) {
throw new UnsupportedOperationException("NativeIO is not available.");
}
if (Shell.WINDOWS) {
throw new UnsupportedOperationException(
"DfsClientShm is not yet implemented for Windows.");
}
if (unsafe == null) {
throw new UnsupportedOperationException(
"can't use DfsClientShm because we failed to " +
"load misc.Unsafe.");
}
this.shmId = shmId;
this.mmappedLength = getUsableLength(stream);
this.baseAddress = POSIX.mmap(stream.getFD(),
POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength);
this.slots = new Slot[mmappedLength / BYTES_PER_SLOT];
this.allocatedSlots = new BitSet(slots.length);
LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, "
+ "slots.length={})", this.getClass().getSimpleName(), shmId,
mmappedLength, String.format("%x", baseAddress), slots.length);
}
项目:aliyun-oss-hadoop-fs
文件:FileJournalManager.java
@Override
synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
File dstFile = NNStorage.getFinalizedEditsFile(
sd, firstTxId, lastTxId);
LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile);
Preconditions.checkState(!dstFile.exists(),
"Can't finalize edits file " + inprogressFile + " since finalized file " +
"already exists");
try {
NativeIO.renameTo(inprogressFile, dstFile);
} catch (IOException e) {
errorReporter.reportErrorOnFile(dstFile);
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile, e);
}
if (inprogressFile.equals(currentInProgress)) {
currentInProgress = null;
}
}
项目:aliyun-oss-hadoop-fs
文件:FileJournalManager.java
private void renameSelf(String newSuffix) throws IOException {
File src = file;
File dst = new File(src.getParent(), src.getName() + newSuffix);
// renameTo fails on Windows if the destination file already exists.
try {
if (dst.exists()) {
if (!dst.delete()) {
throw new IOException("Couldn't delete " + dst);
}
}
NativeIO.renameTo(src, dst);
} catch (IOException e) {
throw new IOException(
"Couldn't rename log " + src + " to " + dst, e);
}
file = dst;
}
项目:aliyun-oss-hadoop-fs
文件:FsDatasetImpl.java
/**
* Bump a replica's generation stamp to a new one.
* Its on-disk meta file name is renamed to be the new one too.
*
* @param replicaInfo a replica
* @param newGS new generation stamp
* @throws IOException if rename fails
*/
private void bumpReplicaGS(ReplicaInfo replicaInfo,
long newGS) throws IOException {
long oldGS = replicaInfo.getGenerationStamp();
File oldmeta = replicaInfo.getMetaFile();
replicaInfo.setGenerationStamp(newGS);
File newmeta = replicaInfo.getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
try {
NativeIO.renameTo(oldmeta, newmeta);
} catch (IOException e) {
replicaInfo.setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + replicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to " + newmeta, e);
}
}
项目:aliyun-oss-hadoop-fs
文件:MappableBlock.java
/**
* Load the block.
*
* mmap and mlock the block, and then verify its checksum.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
* start. The caller must close this.
* @param metaIn The meta file input stream. Should be positioned at
* the start. The caller must close this.
* @param blockFileName The block file name, for logging purposes.
*
* @return The Mappable block.
*/
public static MappableBlock load(long length,
FileInputStream blockIn, FileInputStream metaIn,
String blockFileName) throws IOException {
MappableBlock mappableBlock = null;
MappedByteBuffer mmap = null;
FileChannel blockChannel = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length);
} finally {
IOUtils.closeQuietly(blockChannel);
if (mappableBlock == null) {
if (mmap != null) {
NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
}
}
}
return mappableBlock;
}
项目:aliyun-oss-hadoop-fs
文件:TestEnhancedByteBufferAccess.java
public static HdfsConfiguration initZeroCopyTest() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(HdfsClientConfigKeys.Mmap.CACHE_SIZE_KEY, 3);
conf.setLong(HdfsClientConfigKeys.Mmap.CACHE_TIMEOUT_MS_KEY, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
true);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
return conf;
}
项目:aliyun-oss-hadoop-fs
文件:LazyPersistTestCase.java
/**
* Use a dummy cache manipulator for testing.
*/
public static void initCacheManipulator() {
NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.CacheManipulator() {
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
}
@Override
public long getMemlockLimit() {
LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
return Long.MAX_VALUE;
}
@Override
public boolean verifyCanMlock() {
LOG.info("LazyPersistTestCase: fake return " + true);
return true;
}
});
}
项目:aliyun-oss-hadoop-fs
文件:TestFsDatasetCache.java
@After
public void tearDown() throws Exception {
// Verify that each test uncached whatever it cached. This cleanup is
// required so that file descriptors are not leaked across tests.
DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd);
if (fs != null) {
fs.close();
fs = null;
}
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
// Restore the original CacheManipulator
NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
}
项目:aliyun-oss-hadoop-fs
文件:TestFsDatasetCache.java
/**
* Run testCacheAndUncacheBlock with some failures injected into the mlock
* call. This tests the ability of the NameNode to resend commands.
*/
@Test(timeout=600000)
public void testCacheAndUncacheBlockWithRetries() throws Exception {
// We don't have to save the previous cacheManipulator
// because it will be reinstalled by the @After function.
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
private final Set<String> seenIdentifiers = new HashSet<String>();
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
if (seenIdentifiers.contains(identifier)) {
// mlock succeeds the second time.
LOG.info("mlocking " + identifier);
return;
}
seenIdentifiers.add(identifier);
throw new IOException("injecting IOException during mlock of " +
identifier);
}
});
testCacheAndUncacheBlock();
}
项目:aliyun-oss-hadoop-fs
文件:RawLocalFileSystem.java
protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
throws IOException {
if (permission == null) {
permission = FsPermission.getDirDefault();
}
permission = permission.applyUMask(umask);
if (Shell.WINDOWS && NativeIO.isAvailable()) {
try {
NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
return true;
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
p2f, permission.toShort()), e);
}
return false;
}
} else {
boolean b = p2f.mkdir();
if (b) {
setPermission(p, permission);
}
return b;
}
}
项目:aliyun-oss-hadoop-fs
文件:ReadaheadPool.java
@Override
public void run() {
if (canceled) return;
// There's a very narrow race here that the file will close right at
// this instant. But if that happens, we'll likely receive an EBADF
// error below, and see that it's canceled, ignoring the error.
// It's also possible that we'll end up requesting readahead on some
// other FD, which may be wasted work, but won't cause a problem.
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd, off, len, POSIX_FADV_WILLNEED);
} catch (IOException ioe) {
if (canceled) {
// no big deal - the reader canceled the request and closed
// the file.
return;
}
LOG.warn("Failed readahead on " + identifier,
ioe);
}
}
项目:aliyun-oss-hadoop-fs
文件:SecureIOUtils.java
/**
* Same as openForRandomRead except that it will run even if security is off.
* This is used by unit tests.
*/
@VisibleForTesting
protected static RandomAccessFile forceSecureOpenForRandomRead(File f,
String mode, String expectedOwner, String expectedGroup)
throws IOException {
RandomAccessFile raf = new RandomAccessFile(f, mode);
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(raf.getFD());
checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return raf;
} finally {
if (!success) {
raf.close();
}
}
}
项目:aliyun-oss-hadoop-fs
文件:SecureIOUtils.java
/**
* Same as openFSDataInputStream except that it will run even if security is
* off. This is used by unit tests.
*/
@VisibleForTesting
protected static FSDataInputStream forceSecureOpenFSDataInputStream(
File file,
String expectedOwner, String expectedGroup) throws IOException {
final FSDataInputStream in =
rawFilesystem.open(new Path(file.getAbsolutePath()));
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(in.getFileDescriptor());
checkStat(file, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return in;
} finally {
if (!success) {
in.close();
}
}
}
项目:aliyun-oss-hadoop-fs
文件:SecureIOUtils.java
/**
* Same as openForRead() except that it will run even if security is off.
* This is used by unit tests.
*/
@VisibleForTesting
protected static FileInputStream forceSecureOpenForRead(File f, String expectedOwner,
String expectedGroup) throws IOException {
FileInputStream fis = new FileInputStream(f);
boolean success = false;
try {
Stat stat = NativeIO.POSIX.getFstat(fis.getFD());
checkStat(f, stat.getOwner(), stat.getGroup(), expectedOwner,
expectedGroup);
success = true;
return fis;
} finally {
if (!success) {
fis.close();
}
}
}
项目:big-c
文件:FadvisedChunkedFile.java
@Override
public void close() throws Exception {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd,
getStartOffset(), getEndOffset() - getStartOffset(),
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
}
}
super.close();
}
项目:big-c
文件:FileJournalManager.java
@Override
synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
File dstFile = NNStorage.getFinalizedEditsFile(
sd, firstTxId, lastTxId);
LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile);
Preconditions.checkState(!dstFile.exists(),
"Can't finalize edits file " + inprogressFile + " since finalized file " +
"already exists");
try {
NativeIO.renameTo(inprogressFile, dstFile);
} catch (IOException e) {
errorReporter.reportErrorOnFile(dstFile);
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile, e);
}
if (inprogressFile.equals(currentInProgress)) {
currentInProgress = null;
}
}
项目:big-c
文件:FileJournalManager.java
private void renameSelf(String newSuffix) throws IOException {
File src = file;
File dst = new File(src.getParent(), src.getName() + newSuffix);
// renameTo fails on Windows if the destination file already exists.
try {
if (dst.exists()) {
if (!dst.delete()) {
throw new IOException("Couldn't delete " + dst);
}
}
NativeIO.renameTo(src, dst);
} catch (IOException e) {
throw new IOException(
"Couldn't rename log " + src + " to " + dst, e);
}
file = dst;
}
项目:big-c
文件:FsDatasetImpl.java
/**
* Bump a replica's generation stamp to a new one.
* Its on-disk meta file name is renamed to be the new one too.
*
* @param replicaInfo a replica
* @param newGS new generation stamp
* @throws IOException if rename fails
*/
private void bumpReplicaGS(ReplicaInfo replicaInfo,
long newGS) throws IOException {
long oldGS = replicaInfo.getGenerationStamp();
File oldmeta = replicaInfo.getMetaFile();
replicaInfo.setGenerationStamp(newGS);
File newmeta = replicaInfo.getMetaFile();
// rename meta file to new GS
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
try {
NativeIO.renameTo(oldmeta, newmeta);
} catch (IOException e) {
replicaInfo.setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + replicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta +
" to " + newmeta, e);
}
}
项目:big-c
文件:MappableBlock.java
/**
* Load the block.
*
* mmap and mlock the block, and then verify its checksum.
*
* @param length The current length of the block.
* @param blockIn The block input stream. Should be positioned at the
* start. The caller must close this.
* @param metaIn The meta file input stream. Should be positioned at
* the start. The caller must close this.
* @param blockFileName The block file name, for logging purposes.
*
* @return The Mappable block.
*/
public static MappableBlock load(long length,
FileInputStream blockIn, FileInputStream metaIn,
String blockFileName) throws IOException {
MappableBlock mappableBlock = null;
MappedByteBuffer mmap = null;
FileChannel blockChannel = null;
try {
blockChannel = blockIn.getChannel();
if (blockChannel == null) {
throw new IOException("Block InputStream has no FileChannel.");
}
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length);
} finally {
IOUtils.closeQuietly(blockChannel);
if (mappableBlock == null) {
if (mmap != null) {
NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
}
}
}
return mappableBlock;
}
项目:big-c
文件:TestEnhancedByteBufferAccess.java
public static HdfsConfiguration initZeroCopyTest() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
return conf;
}
项目:big-c
文件:TestFsDatasetCache.java
/**
* Run testCacheAndUncacheBlock with some failures injected into the mlock
* call. This tests the ability of the NameNode to resend commands.
*/
@Test(timeout=600000)
public void testCacheAndUncacheBlockWithRetries() throws Exception {
// We don't have to save the previous cacheManipulator
// because it will be reinstalled by the @After function.
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
private final Set<String> seenIdentifiers = new HashSet<String>();
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
if (seenIdentifiers.contains(identifier)) {
// mlock succeeds the second time.
LOG.info("mlocking " + identifier);
return;
}
seenIdentifiers.add(identifier);
throw new IOException("injecting IOException during mlock of " +
identifier);
}
});
testCacheAndUncacheBlock();
}