/** * Close it down! */ @Override public synchronized void close() throws IOException { if (!closed.compareAndSet(false, true)) { DFSClient.LOG.debug("DFSInputStream has been closed already"); return; } dfsClient.checkOpen(); if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { final StringBuilder builder = new StringBuilder(); extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { private String prefix = ""; @Override public void accept(ByteBuffer k, Object v) { builder.append(prefix).append(k); prefix = ", "; } }); DFSClient.LOG.warn("closing file " + src + ", but there are still " + "unreleased ByteBuffers allocated by read(). " + "Please release " + builder.toString() + "."); } closeCurrentBlockReader(); super.close(); }
/** * Close it down! */ @Override public synchronized void close() throws IOException { if (!closed.compareAndSet(false, true)) { DFSClient.LOG.debug("DFSInputStream has been closed already"); return; } dfsClient.checkOpen(); if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { final StringBuilder builder = new StringBuilder(); extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { private String prefix = ""; @Override public void accept(ByteBuffer k, Object v) { builder.append(prefix).append(k); prefix = ", "; } }); DFSClient.LOG.warn("closing file " + src + ", but there are still " + "unreleased ByteBuffers allocated by read(). " + "Please release " + builder.toString() + "."); } closeCurrentBlockReaders(); super.close(); }
/** * Close it down! */ @Override public synchronized void close() throws IOException { if (!closed.compareAndSet(false, true)) { DFSClient.LOG.warn("DFSInputStream has been closed already"); return; } dfsClient.checkOpen(); if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { final StringBuilder builder = new StringBuilder(); extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { private String prefix = ""; @Override public void accept(ByteBuffer k, Object v) { builder.append(prefix).append(k); prefix = ", "; } }); DFSClient.LOG.warn("closing file " + src + ", but there are still " + "unreleased ByteBuffers allocated by read(). " + "Please release " + builder.toString() + "."); } closeCurrentBlockReader(); super.close(); }
private synchronized IdentityHashStore<ByteBuffer, Object> getExtendedReadBuffers() { if (extendedReadBuffers == null) { extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0); } return extendedReadBuffers; }
private synchronized IdentityHashStore<ByteBuffer, Object> getExtendedReadBuffers() { if (extendedReadBuffers == null) { extendedReadBuffers = new IdentityHashStore<>(0); } return extendedReadBuffers; }
/** * Close it down! */ @Override public synchronized void close() throws IOException { if (closed) { return; } dfsClient.checkOpen(); if (!extendedReadBuffers.isEmpty()) { final StringBuilder builder = new StringBuilder(); extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() { private String prefix = ""; @Override public void accept(ByteBuffer k, Object v) { builder.append(prefix).append(k); prefix = ", "; } }); DFSClient.LOG.warn("closing file " + src + ", but there are still " + "unreleased ByteBuffers allocated by read(). " + "Please release " + builder.toString() + "."); } if (blockReader != null) { blockReader.close(); blockReader = null; } super.close(); closed = true; }