/** * Create the ShortCircuitShm. * * @param shmId The ID to use. * @param stream The stream that we're going to use to create this * shared memory segment. * * Although this is a FileInputStream, we are going to * assume that the underlying file descriptor is writable * as well as readable. It would be more appropriate to use * a RandomAccessFile here, but that class does not have * any public accessor which returns a FileDescriptor, * unlike FileInputStream. */ public ShortCircuitShm(ShmId shmId, FileInputStream stream) throws IOException { if (!NativeIO.isAvailable()) { throw new UnsupportedOperationException("NativeIO is not available."); } if (Shell.WINDOWS) { throw new UnsupportedOperationException( "DfsClientShm is not yet implemented for Windows."); } if (unsafe == null) { throw new UnsupportedOperationException( "can't use DfsClientShm because we failed to " + "load misc.Unsafe."); } this.shmId = shmId; this.mmappedLength = getUsableLength(stream); this.baseAddress = POSIX.mmap(stream.getFD(), POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; this.allocatedSlots = new BitSet(slots.length); LOG.trace("creating {}(shmId={}, mmappedLength={}, baseAddress={}, " + "slots.length={})", this.getClass().getSimpleName(), shmId, mmappedLength, String.format("%x", baseAddress), slots.length); }
/** * Create the ShortCircuitShm. * * @param shmId The ID to use. * @param stream The stream that we're going to use to create this * shared memory segment. * * Although this is a FileInputStream, we are going to * assume that the underlying file descriptor is writable * as well as readable. It would be more appropriate to use * a RandomAccessFile here, but that class does not have * any public accessor which returns a FileDescriptor, * unlike FileInputStream. */ public ShortCircuitShm(ShmId shmId, FileInputStream stream) throws IOException { if (!NativeIO.isAvailable()) { throw new UnsupportedOperationException("NativeIO is not available."); } if (Shell.WINDOWS) { throw new UnsupportedOperationException( "DfsClientShm is not yet implemented for Windows."); } if (unsafe == null) { throw new UnsupportedOperationException( "can't use DfsClientShm because we failed to " + "load misc.Unsafe."); } this.shmId = shmId; this.mmappedLength = getUsableLength(stream); this.baseAddress = POSIX.mmap(stream.getFD(), POSIX.MMAP_PROT_READ | POSIX.MMAP_PROT_WRITE, true, mmappedLength); this.slots = new Slot[mmappedLength / BYTES_PER_SLOT]; this.allocatedSlots = new BitSet(slots.length); if (LOG.isTraceEnabled()) { LOG.trace("creating " + this.getClass().getSimpleName() + "(shmId=" + shmId + ", mmappedLength=" + mmappedLength + ", baseAddress=" + String.format("%x", baseAddress) + ", slots.length=" + slots.length + ")"); } }
public void free() { try { POSIX.munmap(baseAddress, mmappedLength); } catch (IOException e) { LOG.warn(this + ": failed to munmap", e); } LOG.trace(this + ": freed"); }
@Override public void releaseExternalResources() { if (readaheadRequest != null) { readaheadRequest.cancel(); } if (manageOsCache && getCount() > 0) { try { POSIX.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(), POSIX.POSIX_FADV_DONTNEED); } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); } } super.releaseExternalResources(); }
@Override public void close() throws Exception { if (readaheadRequest != null) { readaheadRequest.cancel(); } if (manageOsCache && getEndOffset() - getStartOffset() > 0) { try { POSIX.posixFadviseIfPossible(identifier, fd, getStartOffset(), getEndOffset() - getStartOffset(), POSIX.POSIX_FADV_DONTNEED); } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); } } super.close(); }