Java 类org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper 实例源码
项目:hadoop
文件:AzureNativeFileSystemStore.java
/**
* Opens a new output stream to the given blob (page or block blob)
* to populate it from scratch with data.
*/
private OutputStream openOutputStream(final CloudBlobWrapper blob)
throws StorageException {
if (blob instanceof CloudPageBlobWrapperImpl){
return new PageBlobOutputStream(
(CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
} else {
// Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
// MockCloudBlockBlobWrapper.
return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(),
getInstrumentedContext());
}
}
项目:hadoop
文件:AzureNativeFileSystemStore.java
/**
* Opens a new input stream for the given blob (page or block blob)
* to read its data.
*/
private InputStream openInputStream(CloudBlobWrapper blob)
throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) {
return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
} else {
return new PageBlobInputStream(
(CloudPageBlobWrapper) blob, getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
}
}
项目:hadoop
文件:AzureNativeFileSystemStore.java
/**
* Return the actual data length of the blob with the specified properties.
* If it is a page blob, you can't rely on the length from the properties
* argument and you must get it from the file. Otherwise, you can.
*/
private long getDataLength(CloudBlobWrapper blob, BlobProperties properties)
throws AzureException {
if (blob instanceof CloudPageBlobWrapper) {
try {
return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
} catch (Exception e) {
throw new AzureException(
"Unexpected exception getting page blob actual data size.", e);
}
}
return properties.getLength();
}
项目:hadoop
文件:PageBlobInputStream.java
/**
* Helper method to extract the actual data size of a page blob.
* This typically involves 2 service requests (one for page ranges, another
* for the last page's data).
*
* @param blob The blob to get the size from.
* @param opContext The operation context to use for the requests.
* @return The total data size of the blob in bytes.
* @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests.
*/
public static long getPageBlobSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one.
ArrayList<PageRange> pageRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
if (pageRanges.size() == 0) {
return 0;
}
if (pageRanges.get(0).getStartOffset() != 0) {
// Not expected: we always upload our page blobs as a contiguous range
// starting at byte 0.
throw badStartRangeException(blob, pageRanges.get(0));
}
long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
// Get the last page.
long lastPageStart = totalRawBlobSize - PAGE_SIZE;
ByteArrayOutputStream baos =
new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
new BlobRequestOptions(), opContext);
byte[] lastPage = baos.toByteArray();
short lastPageSize = getPageSize(blob, lastPage, 0);
long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
}
项目:hadoop
文件:PageBlobInputStream.java
/**
* Constructs a stream over the given page blob.
*/
public PageBlobInputStream(CloudPageBlobWrapper blob,
OperationContext opContext)
throws IOException {
this.blob = blob;
this.opContext = opContext;
ArrayList<PageRange> allRanges;
try {
allRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
} catch (StorageException e) {
throw new IOException(e);
}
if (allRanges.size() > 0) {
if (allRanges.get(0).getStartOffset() != 0) {
throw badStartRangeException(blob, allRanges.get(0));
}
if (allRanges.size() > 1) {
LOG.warn(String.format(
"Blob %s has %d page ranges beyond the first range. "
+ "Only reading the first range.",
blob.getUri(), allRanges.size() - 1));
}
numberOfPagesRemaining =
(allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
} else {
numberOfPagesRemaining = 0;
}
}
项目:hadoop
文件:PageBlobInputStream.java
private static short getPageSize(CloudPageBlobWrapper blob,
byte[] data, int offset) throws IOException {
short pageSize = toShort(data[offset], data[offset + 1]);
if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
throw fileCorruptException(blob, String.format(
"Unexpected page size in the header: %d.",
pageSize));
}
return pageSize;
}
项目:hadoop
文件:PageBlobInputStream.java
private static IOException badStartRangeException(CloudPageBlobWrapper blob,
PageRange startRange) {
return fileCorruptException(blob, String.format(
"Page blobs for ASV should always use a page range starting at byte 0. "
+ "This starts at byte %d.",
startRange.getStartOffset()));
}
项目:aliyun-oss-hadoop-fs
文件:AzureNativeFileSystemStore.java
/**
* Opens a new output stream to the given blob (page or block blob)
* to populate it from scratch with data.
*/
private OutputStream openOutputStream(final CloudBlobWrapper blob)
throws StorageException {
if (blob instanceof CloudPageBlobWrapperImpl){
return new PageBlobOutputStream(
(CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
} else {
// Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
// MockCloudBlockBlobWrapper.
return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(),
getInstrumentedContext());
}
}
项目:aliyun-oss-hadoop-fs
文件:AzureNativeFileSystemStore.java
/**
* Opens a new input stream for the given blob (page or block blob)
* to read its data.
*/
private InputStream openInputStream(CloudBlobWrapper blob)
throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) {
return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
} else {
return new PageBlobInputStream(
(CloudPageBlobWrapper) blob, getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
}
}
项目:aliyun-oss-hadoop-fs
文件:AzureNativeFileSystemStore.java
/**
* Return the actual data length of the blob with the specified properties.
* If it is a page blob, you can't rely on the length from the properties
* argument and you must get it from the file. Otherwise, you can.
*/
private long getDataLength(CloudBlobWrapper blob, BlobProperties properties)
throws AzureException {
if (blob instanceof CloudPageBlobWrapper) {
try {
return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) blob,
getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
} catch (Exception e) {
throw new AzureException(
"Unexpected exception getting page blob actual data size.", e);
}
}
return properties.getLength();
}
项目:aliyun-oss-hadoop-fs
文件:PageBlobInputStream.java
/**
* Helper method to extract the actual data size of a page blob.
* This typically involves 2 service requests (one for page ranges, another
* for the last page's data).
*
* @param blob The blob to get the size from.
* @param opContext The operation context to use for the requests.
* @return The total data size of the blob in bytes.
* @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests.
*/
public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one.
ArrayList<PageRange> pageRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
if (pageRanges.size() == 0) {
return 0;
}
if (pageRanges.get(0).getStartOffset() != 0) {
// Not expected: we always upload our page blobs as a contiguous range
// starting at byte 0.
throw badStartRangeException(blob, pageRanges.get(0));
}
long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
// Get the last page.
long lastPageStart = totalRawBlobSize - PAGE_SIZE;
ByteArrayOutputStream baos =
new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
new BlobRequestOptions(), opContext);
byte[] lastPage = baos.toByteArray();
short lastPageSize = getPageSize(blob, lastPage, 0);
long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
}
项目:aliyun-oss-hadoop-fs
文件:PageBlobInputStream.java
/**
* Constructs a stream over the given page blob.
*/
public PageBlobInputStream(CloudPageBlobWrapper blob,
OperationContext opContext)
throws IOException {
this.blob = blob;
this.opContext = opContext;
ArrayList<PageRange> allRanges;
try {
allRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
} catch (StorageException e) {
throw new IOException(e);
}
if (allRanges.size() > 0) {
if (allRanges.get(0).getStartOffset() != 0) {
throw badStartRangeException(blob, allRanges.get(0));
}
if (allRanges.size() > 1) {
LOG.warn(String.format(
"Blob %s has %d page ranges beyond the first range. "
+ "Only reading the first range.",
blob.getUri(), allRanges.size() - 1));
}
numberOfPagesRemaining =
(allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
} else {
numberOfPagesRemaining = 0;
}
}
项目:aliyun-oss-hadoop-fs
文件:PageBlobInputStream.java
private static short getPageSize(CloudPageBlobWrapper blob,
byte[] data, int offset) throws IOException {
short pageSize = toShort(data[offset], data[offset + 1]);
if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
throw fileCorruptException(blob, String.format(
"Unexpected page size in the header: %d.",
pageSize));
}
return pageSize;
}
项目:aliyun-oss-hadoop-fs
文件:PageBlobInputStream.java
private static IOException badStartRangeException(CloudPageBlobWrapper blob,
PageRange startRange) {
return fileCorruptException(blob, String.format(
"Page blobs for ASV should always use a page range starting at byte 0. "
+ "This starts at byte %d.",
startRange.getStartOffset()));
}
项目:big-c
文件:AzureNativeFileSystemStore.java
/**
* Opens a new output stream to the given blob (page or block blob)
* to populate it from scratch with data.
*/
private OutputStream openOutputStream(final CloudBlobWrapper blob)
throws StorageException {
if (blob instanceof CloudPageBlobWrapperImpl){
return new PageBlobOutputStream(
(CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
} else {
// Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
// MockCloudBlockBlobWrapper.
return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(),
getInstrumentedContext());
}
}
项目:big-c
文件:AzureNativeFileSystemStore.java
/**
* Opens a new input stream for the given blob (page or block blob)
* to read its data.
*/
private InputStream openInputStream(CloudBlobWrapper blob)
throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) {
return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
} else {
return new PageBlobInputStream(
(CloudPageBlobWrapper) blob, getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
}
}
项目:big-c
文件:AzureNativeFileSystemStore.java
/**
* Return the actual data length of the blob with the specified properties.
* If it is a page blob, you can't rely on the length from the properties
* argument and you must get it from the file. Otherwise, you can.
*/
private long getDataLength(CloudBlobWrapper blob, BlobProperties properties)
throws AzureException {
if (blob instanceof CloudPageBlobWrapper) {
try {
return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
} catch (Exception e) {
throw new AzureException(
"Unexpected exception getting page blob actual data size.", e);
}
}
return properties.getLength();
}
项目:big-c
文件:PageBlobInputStream.java
/**
* Helper method to extract the actual data size of a page blob.
* This typically involves 2 service requests (one for page ranges, another
* for the last page's data).
*
* @param blob The blob to get the size from.
* @param opContext The operation context to use for the requests.
* @return The total data size of the blob in bytes.
* @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests.
*/
public static long getPageBlobSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one.
ArrayList<PageRange> pageRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
if (pageRanges.size() == 0) {
return 0;
}
if (pageRanges.get(0).getStartOffset() != 0) {
// Not expected: we always upload our page blobs as a contiguous range
// starting at byte 0.
throw badStartRangeException(blob, pageRanges.get(0));
}
long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
// Get the last page.
long lastPageStart = totalRawBlobSize - PAGE_SIZE;
ByteArrayOutputStream baos =
new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
new BlobRequestOptions(), opContext);
byte[] lastPage = baos.toByteArray();
short lastPageSize = getPageSize(blob, lastPage, 0);
long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
}
项目:big-c
文件:PageBlobInputStream.java
/**
* Constructs a stream over the given page blob.
*/
public PageBlobInputStream(CloudPageBlobWrapper blob,
OperationContext opContext)
throws IOException {
this.blob = blob;
this.opContext = opContext;
ArrayList<PageRange> allRanges;
try {
allRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
} catch (StorageException e) {
throw new IOException(e);
}
if (allRanges.size() > 0) {
if (allRanges.get(0).getStartOffset() != 0) {
throw badStartRangeException(blob, allRanges.get(0));
}
if (allRanges.size() > 1) {
LOG.warn(String.format(
"Blob %s has %d page ranges beyond the first range. "
+ "Only reading the first range.",
blob.getUri(), allRanges.size() - 1));
}
numberOfPagesRemaining =
(allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
} else {
numberOfPagesRemaining = 0;
}
}
项目:big-c
文件:PageBlobInputStream.java
private static short getPageSize(CloudPageBlobWrapper blob,
byte[] data, int offset) throws IOException {
short pageSize = toShort(data[offset], data[offset + 1]);
if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
throw fileCorruptException(blob, String.format(
"Unexpected page size in the header: %d.",
pageSize));
}
return pageSize;
}
项目:big-c
文件:PageBlobInputStream.java
private static IOException badStartRangeException(CloudPageBlobWrapper blob,
PageRange startRange) {
return fileCorruptException(blob, String.format(
"Page blobs for ASV should always use a page range starting at byte 0. "
+ "This starts at byte %d.",
startRange.getStartOffset()));
}
项目:hadoop-2.6.0-cdh5.4.3
文件:AzureNativeFileSystemStore.java
/**
* Opens a new output stream to the given blob (page or block blob)
* to populate it from scratch with data.
*/
private OutputStream openOutputStream(final CloudBlobWrapper blob)
throws StorageException {
if (blob instanceof CloudPageBlobWrapperImpl){
return new PageBlobOutputStream(
(CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration);
} else {
// Handle both ClouldBlockBlobWrapperImpl and (only for the test code path)
// MockCloudBlockBlobWrapper.
return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(),
getInstrumentedContext());
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:AzureNativeFileSystemStore.java
/**
* Opens a new input stream for the given blob (page or block blob)
* to read its data.
*/
private InputStream openInputStream(CloudBlobWrapper blob)
throws StorageException, IOException {
if (blob instanceof CloudBlockBlobWrapper) {
return blob.openInputStream(getDownloadOptions(),
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
} else {
return new PageBlobInputStream(
(CloudPageBlobWrapper) blob, getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:AzureNativeFileSystemStore.java
/**
* Return the actual data length of the blob with the specified properties.
* If it is a page blob, you can't rely on the length from the properties
* argument and you must get it from the file. Otherwise, you can.
*/
private long getDataLength(CloudBlobWrapper blob, BlobProperties properties)
throws AzureException {
if (blob instanceof CloudPageBlobWrapper) {
try {
return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
getInstrumentedContext(
isConcurrentOOBAppendAllowed()));
} catch (Exception e) {
throw new AzureException(
"Unexpected exception getting page blob actual data size.", e);
}
}
return properties.getLength();
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PageBlobInputStream.java
/**
* Helper method to extract the actual data size of a page blob.
* This typically involves 2 service requests (one for page ranges, another
* for the last page's data).
*
* @param blob The blob to get the size from.
* @param opContext The operation context to use for the requests.
* @return The total data size of the blob in bytes.
* @throws IOException If the format is corrupt.
* @throws StorageException If anything goes wrong in the requests.
*/
public static long getPageBlobSize(CloudPageBlobWrapper blob,
OperationContext opContext) throws IOException, StorageException {
// Get the page ranges for the blob. There should be one range starting
// at byte 0, but we tolerate (and ignore) ranges after the first one.
ArrayList<PageRange> pageRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
if (pageRanges.size() == 0) {
return 0;
}
if (pageRanges.get(0).getStartOffset() != 0) {
// Not expected: we always upload our page blobs as a contiguous range
// starting at byte 0.
throw badStartRangeException(blob, pageRanges.get(0));
}
long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
// Get the last page.
long lastPageStart = totalRawBlobSize - PAGE_SIZE;
ByteArrayOutputStream baos =
new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE);
blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
new BlobRequestOptions(), opContext);
byte[] lastPage = baos.toByteArray();
short lastPageSize = getPageSize(blob, lastPage, 0);
long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PageBlobInputStream.java
/**
* Constructs a stream over the given page blob.
*/
public PageBlobInputStream(CloudPageBlobWrapper blob,
OperationContext opContext)
throws IOException {
this.blob = blob;
this.opContext = opContext;
ArrayList<PageRange> allRanges;
try {
allRanges =
blob.downloadPageRanges(new BlobRequestOptions(), opContext);
} catch (StorageException e) {
throw new IOException(e);
}
if (allRanges.size() > 0) {
if (allRanges.get(0).getStartOffset() != 0) {
throw badStartRangeException(blob, allRanges.get(0));
}
if (allRanges.size() > 1) {
LOG.warn(String.format(
"Blob %s has %d page ranges beyond the first range. "
+ "Only reading the first range.",
blob.getUri(), allRanges.size() - 1));
}
numberOfPagesRemaining =
(allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE;
} else {
numberOfPagesRemaining = 0;
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PageBlobInputStream.java
private static short getPageSize(CloudPageBlobWrapper blob,
byte[] data, int offset) throws IOException {
short pageSize = toShort(data[offset], data[offset + 1]);
if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) {
throw fileCorruptException(blob, String.format(
"Unexpected page size in the header: %d.",
pageSize));
}
return pageSize;
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PageBlobInputStream.java
private static IOException badStartRangeException(CloudPageBlobWrapper blob,
PageRange startRange) {
return fileCorruptException(blob, String.format(
"Page blobs for ASV should always use a page range starting at byte 0. "
+ "This starts at byte %d.",
startRange.getStartOffset()));
}
项目:hadoop
文件:PageBlobOutputStream.java
/**
* Constructs an output stream over the given page blob.
*
* @param blob the blob that this stream is associated with.
* @param opContext an object used to track the execution of the operation
* @throws StorageException if anything goes wrong creating the blob.
*/
public PageBlobOutputStream(final CloudPageBlobWrapper blob,
final OperationContext opContext,
final Configuration conf) throws StorageException {
this.blob = blob;
this.outBuffer = new ByteArrayOutputStream();
this.opContext = opContext;
this.lastQueuedTask = null;
this.ioQueue = new LinkedBlockingQueue<Runnable>();
// As explained above: the IO writes are not designed for parallelism,
// so we only have one thread in this thread pool.
this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
ioQueue);
// Make page blob files have a size that is the greater of a
// minimum size, or the value of fs.azure.page.blob.size from configuration.
long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize
+ " from configuration (0 if not present).");
long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize);
// Ensure that the pageBlobSize is a multiple of page size.
if (pageBlobSize % PAGE_SIZE != 0) {
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
}
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
currentBlobSize = pageBlobSize;
// Set the page blob extension size. It must be a minimum of the default
// value.
configuredPageBlobExtensionSize =
conf.getLong("fs.azure.page.blob.extension.size", 0);
if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
}
// make sure it is a multiple of the page size
if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
configuredPageBlobExtensionSize +=
PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
}
}
项目:hadoop
文件:PageBlobInputStream.java
private static IOException fileCorruptException(CloudPageBlobWrapper blob,
String reason) {
return new IOException(String.format(
"The page blob: '%s' is corrupt or has an unexpected format: %s.",
blob.getUri(), reason));
}
项目:aliyun-oss-hadoop-fs
文件:PageBlobOutputStream.java
/**
* Constructs an output stream over the given page blob.
*
* @param blob the blob that this stream is associated with.
* @param opContext an object used to track the execution of the operation
* @throws StorageException if anything goes wrong creating the blob.
*/
public PageBlobOutputStream(final CloudPageBlobWrapper blob,
final OperationContext opContext,
final Configuration conf) throws StorageException {
this.blob = blob;
this.outBuffer = new ByteArrayOutputStream();
this.opContext = opContext;
this.lastQueuedTask = null;
this.ioQueue = new LinkedBlockingQueue<Runnable>();
// As explained above: the IO writes are not designed for parallelism,
// so we only have one thread in this thread pool.
this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
ioQueue);
// Make page blob files have a size that is the greater of a
// minimum size, or the value of fs.azure.page.blob.size from configuration.
long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize
+ " from configuration (0 if not present).");
long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize);
// Ensure that the pageBlobSize is a multiple of page size.
if (pageBlobSize % PAGE_SIZE != 0) {
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
}
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
currentBlobSize = pageBlobSize;
// Set the page blob extension size. It must be a minimum of the default
// value.
configuredPageBlobExtensionSize =
conf.getLong("fs.azure.page.blob.extension.size", 0);
if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
}
// make sure it is a multiple of the page size
if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
configuredPageBlobExtensionSize +=
PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
}
}
项目:aliyun-oss-hadoop-fs
文件:PageBlobInputStream.java
private static IOException fileCorruptException(CloudPageBlobWrapper blob,
String reason) {
return new IOException(String.format(
"The page blob: '%s' is corrupt or has an unexpected format: %s.",
blob.getUri(), reason));
}
项目:big-c
文件:PageBlobOutputStream.java
/**
* Constructs an output stream over the given page blob.
*
* @param blob the blob that this stream is associated with.
* @param opContext an object used to track the execution of the operation
* @throws StorageException if anything goes wrong creating the blob.
*/
public PageBlobOutputStream(final CloudPageBlobWrapper blob,
final OperationContext opContext,
final Configuration conf) throws StorageException {
this.blob = blob;
this.outBuffer = new ByteArrayOutputStream();
this.opContext = opContext;
this.lastQueuedTask = null;
this.ioQueue = new LinkedBlockingQueue<Runnable>();
// As explained above: the IO writes are not designed for parallelism,
// so we only have one thread in this thread pool.
this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
ioQueue);
// Make page blob files have a size that is the greater of a
// minimum size, or the value of fs.azure.page.blob.size from configuration.
long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize
+ " from configuration (0 if not present).");
long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize);
// Ensure that the pageBlobSize is a multiple of page size.
if (pageBlobSize % PAGE_SIZE != 0) {
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
}
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
currentBlobSize = pageBlobSize;
// Set the page blob extension size. It must be a minimum of the default
// value.
configuredPageBlobExtensionSize =
conf.getLong("fs.azure.page.blob.extension.size", 0);
if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
}
// make sure it is a multiple of the page size
if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
configuredPageBlobExtensionSize +=
PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
}
}
项目:big-c
文件:PageBlobInputStream.java
private static IOException fileCorruptException(CloudPageBlobWrapper blob,
String reason) {
return new IOException(String.format(
"The page blob: '%s' is corrupt or has an unexpected format: %s.",
blob.getUri(), reason));
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PageBlobOutputStream.java
/**
* Constructs an output stream over the given page blob.
*
* @param blob the blob that this stream is associated with.
* @param opContext an object used to track the execution of the operation
* @throws StorageException if anything goes wrong creating the blob.
*/
public PageBlobOutputStream(final CloudPageBlobWrapper blob,
final OperationContext opContext,
final Configuration conf) throws StorageException {
this.blob = blob;
this.outBuffer = new ByteArrayOutputStream();
this.opContext = opContext;
this.lastQueuedTask = null;
this.ioQueue = new LinkedBlockingQueue<Runnable>();
// As explained above: the IO writes are not designed for parallelism,
// so we only have one thread in this thread pool.
this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
ioQueue);
// Make page blob files have a size that is the greater of a
// minimum size, or the value of fs.azure.page.blob.size from configuration.
long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0);
LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize
+ " from configuration (0 if not present).");
long pageBlobSize = Math.max(PAGE_BLOB_MIN_SIZE, pageBlobConfigSize);
// Ensure that the pageBlobSize is a multiple of page size.
if (pageBlobSize % PAGE_SIZE != 0) {
pageBlobSize += PAGE_SIZE - pageBlobSize % PAGE_SIZE;
}
blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
currentBlobSize = pageBlobSize;
// Set the page blob extension size. It must be a minimum of the default
// value.
configuredPageBlobExtensionSize =
conf.getLong("fs.azure.page.blob.extension.size", 0);
if (configuredPageBlobExtensionSize < PAGE_BLOB_DEFAULT_EXTENSION_SIZE) {
configuredPageBlobExtensionSize = PAGE_BLOB_DEFAULT_EXTENSION_SIZE;
}
// make sure it is a multiple of the page size
if (configuredPageBlobExtensionSize % PAGE_SIZE != 0) {
configuredPageBlobExtensionSize +=
PAGE_SIZE - configuredPageBlobExtensionSize % PAGE_SIZE;
}
}
项目:hadoop-2.6.0-cdh5.4.3
文件:PageBlobInputStream.java
private static IOException fileCorruptException(CloudPageBlobWrapper blob,
String reason) {
return new IOException(String.format(
"The page blob: '%s' is corrupt or has an unexpected format: %s.",
blob.getUri(), reason));
}