/** * Opens a new output stream to the given blob (page or block blob) * to populate it from scratch with data. */ private OutputStream openOutputStream(final CloudBlobWrapper blob) throws StorageException { if (blob instanceof CloudPageBlobWrapperImpl){ return new PageBlobOutputStream( (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration); } else { // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path) // MockCloudBlockBlobWrapper. return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(), getInstrumentedContext()); } }
/** * Opens a new input stream for the given blob (page or block blob) * to read its data. */ private InputStream openInputStream(CloudBlobWrapper blob) throws StorageException, IOException { if (blob instanceof CloudBlockBlobWrapper) { return blob.openInputStream(getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed())); } else { return new PageBlobInputStream( (CloudPageBlobWrapper) blob, getInstrumentedContext( isConcurrentOOBAppendAllowed())); } }
/** * Return the actual data length of the blob with the specified properties. * If it is a page blob, you can't rely on the length from the properties * argument and you must get it from the file. Otherwise, you can. */ private long getDataLength(CloudBlobWrapper blob, BlobProperties properties) throws AzureException { if (blob instanceof CloudPageBlobWrapper) { try { return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob, getInstrumentedContext( isConcurrentOOBAppendAllowed())); } catch (Exception e) { throw new AzureException( "Unexpected exception getting page blob actual data size.", e); } } return properties.getLength(); }
/** * Helper method to extract the actual data size of a page blob. * This typically involves 2 service requests (one for page ranges, another * for the last page's data). * * @param blob The blob to get the size from. * @param opContext The operation context to use for the requests. * @return The total data size of the blob in bytes. * @throws IOException If the format is corrupt. * @throws StorageException If anything goes wrong in the requests. */ public static long getPageBlobSize(CloudPageBlobWrapper blob, OperationContext opContext) throws IOException, StorageException { // Get the page ranges for the blob. There should be one range starting // at byte 0, but we tolerate (and ignore) ranges after the first one. ArrayList<PageRange> pageRanges = blob.downloadPageRanges(new BlobRequestOptions(), opContext); if (pageRanges.size() == 0) { return 0; } if (pageRanges.get(0).getStartOffset() != 0) { // Not expected: we always upload our page blobs as a contiguous range // starting at byte 0. throw badStartRangeException(blob, pageRanges.get(0)); } long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1; // Get the last page. long lastPageStart = totalRawBlobSize - PAGE_SIZE; ByteArrayOutputStream baos = new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE); blob.downloadRange(lastPageStart, PAGE_SIZE, baos, new BlobRequestOptions(), opContext); byte[] lastPage = baos.toByteArray(); short lastPageSize = getPageSize(blob, lastPage, 0); long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE; return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize; }
/** * Constructs a stream over the given page blob. */ public PageBlobInputStream(CloudPageBlobWrapper blob, OperationContext opContext) throws IOException { this.blob = blob; this.opContext = opContext; ArrayList<PageRange> allRanges; try { allRanges = blob.downloadPageRanges(new BlobRequestOptions(), opContext); } catch (StorageException e) { throw new IOException(e); } if (allRanges.size() > 0) { if (allRanges.get(0).getStartOffset() != 0) { throw badStartRangeException(blob, allRanges.get(0)); } if (allRanges.size() > 1) { LOG.warn(String.format( "Blob %s has %d page ranges beyond the first range. " + "Only reading the first range.", blob.getUri(), allRanges.size() - 1)); } numberOfPagesRemaining = (allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE; } else { numberOfPagesRemaining = 0; } }
private static short getPageSize(CloudPageBlobWrapper blob, byte[] data, int offset) throws IOException { short pageSize = toShort(data[offset], data[offset + 1]); if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) { throw fileCorruptException(blob, String.format( "Unexpected page size in the header: %d.", pageSize)); } return pageSize; }
private static IOException badStartRangeException(CloudPageBlobWrapper blob, PageRange startRange) { return fileCorruptException(blob, String.format( "Page blobs for ASV should always use a page range starting at byte 0. " + "This starts at byte %d.", startRange.getStartOffset())); }
/** * Return the actual data length of the blob with the specified properties. * If it is a page blob, you can't rely on the length from the properties * argument and you must get it from the file. Otherwise, you can. */ private long getDataLength(CloudBlobWrapper blob, BlobProperties properties) throws AzureException { if (blob instanceof CloudPageBlobWrapper) { try { return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) blob, getInstrumentedContext( isConcurrentOOBAppendAllowed())); } catch (Exception e) { throw new AzureException( "Unexpected exception getting page blob actual data size.", e); } } return properties.getLength(); }
/** * Helper method to extract the actual data size of a page blob. * This typically involves 2 service requests (one for page ranges, another * for the last page's data). * * @param blob The blob to get the size from. * @param opContext The operation context to use for the requests. * @return The total data size of the blob in bytes. * @throws IOException If the format is corrupt. * @throws StorageException If anything goes wrong in the requests. */ public static long getPageBlobDataSize(CloudPageBlobWrapper blob, OperationContext opContext) throws IOException, StorageException { // Get the page ranges for the blob. There should be one range starting // at byte 0, but we tolerate (and ignore) ranges after the first one. ArrayList<PageRange> pageRanges = blob.downloadPageRanges(new BlobRequestOptions(), opContext); if (pageRanges.size() == 0) { return 0; } if (pageRanges.get(0).getStartOffset() != 0) { // Not expected: we always upload our page blobs as a contiguous range // starting at byte 0. throw badStartRangeException(blob, pageRanges.get(0)); } long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1; // Get the last page. long lastPageStart = totalRawBlobSize - PAGE_SIZE; ByteArrayOutputStream baos = new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE); blob.downloadRange(lastPageStart, PAGE_SIZE, baos, new BlobRequestOptions(), opContext); byte[] lastPage = baos.toByteArray(); short lastPageSize = getPageSize(blob, lastPage, 0); long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE; return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize; }
/** * Constructs an output stream over the given page blob. * * @param blob the blob that this stream is associated with. * @param opContext an object used to track the execution of the operation * @throws StorageException if anything goes wrong creating the blob. */ public PageBlobOutputStream(final CloudPageBlobWrapper blob, final OperationContext opContext, final Configuration conf) throws StorageException { this.blob = blob; this.outBuffer = new ByteArrayOutputStream(); this.opContext = opContext; this.lastQueuedTask = null; this.ioQueue = new LinkedBlockingQueue<Runnable>(); // As explained above: the IO writes are not designed for parallelism, // so we only have one thread in this thread pool. this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, ioQueue); // Make page blob files have a size that is the greater of a // minimum size, or the value of fs.azure.page.blob.size from configuration. long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0); LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize + " from configuration (0 if not present)."); long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize); // Ensure that the pageBlobSize is a multiple of page size. if (pageBlobSize % PAGE_SIZE != 0) { pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE; } blob.create(pageBlobSize, new BlobRequestOptions(), opContext); currentBlobSize = pageBlobSize; // Set the page blob extension size. It must be a minimum of the default // value. configuredPageBlobExtensionSize = conf.getLong("fs.azure.page.blob.extension.size", 0); if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) { configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE; } // make sure it is a multiple of the page size if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) { configuredPageBlobExtensionSize += PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE; } }
private static IOException fileCorruptException(CloudPageBlobWrapper blob, String reason) { return new IOException(String.format( "The page blob: '%s' is corrupt or has an unexpected format: %s.", blob.getUri(), reason)); }