@Override public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException { try { checkNotClosed(); if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } IOUtils.closeStream(in); in = store.retrieve(key); this.pos = in.skip(pos); LOG.debug("Seek to position {}. Bytes skipped {}", pos, this.pos); } catch(IOException e) { Throwable innerException = checkForAzureStorageException(e); if (innerException instanceof StorageException && isFileNotFoundException((StorageException) innerException)) { throw new FileNotFoundException(String.format("%s is not found", key)); } throw e; } }
@Override public synchronized void seek(long targetPos) throws IOException { checkNotClosed(); // Do not allow negative seek if (targetPos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + targetPos); } if (contentLength <= 0) { return; } // Lazy seek nextReadPos = targetPos; }
/** Seek to a position. */ @Override public void seek(long pos) throws IOException { if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } checkStream(); try { /* * If data of target pos in the underlying stream has already been read * and decrypted in outBuffer, we just need to re-position outBuffer. */ if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) { int forward = (int) (pos - (streamOffset - outBuffer.remaining())); if (forward > 0) { outBuffer.position(outBuffer.position() + forward); } } else { ((Seekable) in).seek(pos); resetStreamOffset(pos); } } catch (ClassCastException e) { throw new UnsupportedOperationException("This stream does not support " + "seek."); } }
@Override public void seek(long pos) throws IOException { if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); } if (in == null) { throw new SwiftConnectionClosedException(FSExceptionMessages.STREAM_IS_CLOSED); } super.seek(pos); }
@Override public synchronized void seek(long newpos) throws IOException { if (newpos < 0) { throw new EOFException( FSExceptionMessages.NEGATIVE_SEEK); } if (pos != newpos) { // the seek is attempting to move the current position reopen(newpos); } }
private synchronized void reopen(long pos) throws IOException { if (wrappedStream != null) { if (LOG.isDebugEnabled()) { LOG.debug("Aborting old stream to open at pos " + pos); } wrappedStream.abort(); } if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK +" " + pos); } if (contentLength > 0 && pos > contentLength-1) { throw new EOFException( FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } LOG.debug("Actually opening file " + key + " at pos " + pos); GetObjectRequest request = new GetObjectRequest(bucket, key); request.setRange(pos, contentLength-1); wrappedStream = client.getObject(request).getObjectContent(); if (wrappedStream == null) { throw new IOException("Null IO stream"); } this.pos = pos; }
private synchronized void reopen(long pos) throws IOException { if (inputStream != null) { if (LOG.isDebugEnabled()) { LOG.debug("Aborting old stream " + "to open at pos " + pos); } inputStream.close(); } if (pos < 0) { throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos); } if (contentLength > 0 && pos > contentLength - 1) { throw new EOFException( FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos); } LOG.debug("Actually opening file " + key + " at pos " + pos); GetObjectRequest request = new GetObjectRequest(bucket, key); request.setRange(pos, contentLength - 1); inputStream = ossClient.getObject(request).getObjectContent(); if (inputStream == null) { throw new IOException("Null IO stream"); } this.pos = pos; }
@Override public synchronized void seek(long newpos) throws IOException { if (newpos < 0) { throw new EOFException( FSExceptionMessages.NEGATIVE_SEEK); } if (pos != newpos) { // the seek is attempting to move the current position LOG.debug("Opening key '{}' for reading at position '{}", key, newpos); InputStream newStream = store.retrieve(key, newpos); updateInnerStream(newStream, newpos); } }
/** * Seek to an offset. If the data is already in the buffer, move to it * @param targetPos target position * @throws IOException on any problem */ @Override public synchronized void seek(long targetPos) throws IOException { if (targetPos < 0) { throw new EOFException( FSExceptionMessages.NEGATIVE_SEEK); } //there's some special handling of near-local data //as the seek can be omitted if it is in/adjacent long offset = targetPos - pos; if (LOG.isDebugEnabled()) { LOG.debug("Seek to " + targetPos + "; current pos =" + pos + "; offset="+offset); } if (offset == 0) { LOG.debug("seek is no-op"); return; } if (offset < 0) { LOG.debug("seek is backwards"); } else if ((rangeOffset + offset < bufferSize)) { //if the seek is in range of that requested, scan forwards //instead of closing and re-opening a new HTTP connection SwiftUtils.debug(LOG, "seek is within current stream" + "; pos= %d ; targetPos=%d; " + "offset= %d ; bufferOffset=%d", pos, targetPos, offset, rangeOffset); try { LOG.debug("chomping "); chompBytes(offset); } catch (IOException e) { //this is assumed to be recoverable with a seek -or more likely to fail LOG.debug("while chomping ",e); } if (targetPos - pos == 0) { LOG.trace("chomping successful"); return; } LOG.trace("chomping failed"); } else { if (LOG.isDebugEnabled()) { LOG.debug("Seek is beyond buffer size of " + bufferSize); } } innerClose("seeking to " + targetPos); fillBuffer(targetPos); }
private void checkNotClosed() throws IOException { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } }
/** * Verify that the input stream is open. Non blocking; this gives * the last state of the volatile {@link #closed} field. * @throws IOException if the connection is closed */ private void checkNotClosed() throws IOException { if (closed) { throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } }