private static void storeLinkAttribute(CloudBlobWrapper blob, String linkTarget) throws UnsupportedEncodingException { // We have to URL encode the link attribute as the link URI could // have URI special characters which unless encoded will result // in 403 errors from the server. This is due to metadata properties // being sent in the HTTP header of the request which is in turn used // on the server side to authorize the request. String encodedLinkTarget = null; if (linkTarget != null) { encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8"); } storeMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY, encodedLinkTarget); // Remove the old metadata key if present removeMetadataAttribute(blob, OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY); }
/** * If the blob with the given key exists and has a link in its metadata to a * temporary file (see storeEmptyLinkFile), this method returns the key to * that temporary file. Otherwise, returns null. */ @Override public String getLinkInFileMetadata(String key) throws AzureException { if (null == storageInteractionLayer) { final String errMsg = String.format( "Storage session expected for URI '%s' but does not exist.", sessionUri); throw new AssertionError(errMsg); } try { checkContainer(ContainerAccessType.PureRead); CloudBlobWrapper blob = getBlobReference(key); blob.downloadAttributes(getInstrumentedContext()); return getLinkAttributeValue(blob); } catch (Exception e) { // Caught exception while attempting download. Re-throw as an Azure // storage exception. throw new AzureException(e); } }
@Override public DataInputStream retrieve(String key) throws AzureException, IOException { try { // Check if a session exists, if not create a session with the // Azure storage server. if (null == storageInteractionLayer) { final String errMsg = String.format( "Storage session expected for URI '%s' but does not exist.", sessionUri); throw new AssertionError(errMsg); } checkContainer(ContainerAccessType.PureRead); // Get blob reference and open the input buffer stream. CloudBlobWrapper blob = getBlobReference(key); BufferedInputStream inBufStream = new BufferedInputStream( openInputStream(blob)); // Return a data input stream. DataInputStream inDataStream = new DataInputStream(inBufStream); return inDataStream; } catch (Exception e) { // Re-throw as an Azure storage exception. throw new AzureException(e); } }
@Override public void delete(String key, SelfRenewingLease lease) throws IOException { try { if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) { // Container doesn't exist, no need to do anything return; } // Get the blob reference and delete it. CloudBlobWrapper blob = getBlobReference(key); if (blob.exists(getInstrumentedContext())) { safeDelete(blob, lease); } } catch (Exception e) { // Re-throw as an Azure storage exception. throw new AzureException(e); } }
private void waitForCopyToComplete(CloudBlobWrapper blob, OperationContext opContext){ boolean copyInProgress = true; while (copyInProgress) { try { blob.downloadAttributes(opContext); } catch (StorageException se){ } // test for null because mocked filesystem doesn't know about copystates yet. copyInProgress = (blob.getCopyState() != null && blob.getCopyState().getStatus() == CopyStatus.PENDING); if (copyInProgress) { try { Thread.sleep(1000); } catch (InterruptedException ie){ //ignore } } } }
/** * Get a lease on the blob identified by key. This lease will be renewed * indefinitely by a background thread. */ @Override public SelfRenewingLease acquireLease(String key) throws AzureException { LOG.debug("acquiring lease on " + key); try { checkContainer(ContainerAccessType.ReadThenWrite); CloudBlobWrapper blob = getBlobReference(key); return blob.acquireLease(); } catch (Exception e) { // Caught exception while attempting to get lease. Re-throw as an // Azure storage exception. throw new AzureException(e); } }
@Override public void updateFolderLastModifiedTime(String key, Date lastModified, SelfRenewingLease folderLease) throws AzureException { try { checkContainer(ContainerAccessType.ReadThenWrite); CloudBlobWrapper blob = getBlobReference(key); //setLastModified function is not available in 2.0.0 version. blob.uploadProperties automatically updates last modified //timestamp to current time blob.uploadProperties(getInstrumentedContext(), folderLease); } catch (Exception e) { // Caught exception while attempting to update the properties. Re-throw as an // Azure storage exception. throw new AzureException(e); } }
/** * Get a lease on the blob identified by key. This lease will be renewed * indefinitely by a background thread. */ @Override public SelfRenewingLease acquireLease(String key) throws AzureException { LOG.debug("acquiring lease on {}", key); try { checkContainer(ContainerAccessType.ReadThenWrite); CloudBlobWrapper blob = getBlobReference(key); return blob.acquireLease(); } catch (Exception e) { // Caught exception while attempting to get lease. Re-throw as an // Azure storage exception. throw new AzureException(e); } }
@Override public void updateFolderLastModifiedTime(String key, Date lastModified, SelfRenewingLease folderLease) throws AzureException { try { checkContainer(ContainerAccessType.ReadThenWrite); CloudBlobWrapper blob = getBlobReference(key); blob.getProperties().setLastModified(lastModified); blob.uploadProperties(getInstrumentedContext(), folderLease); } catch (Exception e) { // Caught exception while attempting to update the properties. Re-throw as an // Azure storage exception. throw new AzureException(e); } }
/** * Opens a new output stream to the given blob (page or block blob) * to populate it from scratch with data. */ private OutputStream openOutputStream(final CloudBlobWrapper blob) throws StorageException { if (blob instanceof CloudPageBlobWrapperImpl){ return new PageBlobOutputStream( (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration); } else { // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path) // MockCloudBlockBlobWrapper. return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(), getInstrumentedContext()); } }
/** * Opens a new input stream for the given blob (page or block blob) * to read its data. */ private InputStream openInputStream(CloudBlobWrapper blob) throws StorageException, IOException { if (blob instanceof CloudBlockBlobWrapper) { return blob.openInputStream(getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed())); } else { return new PageBlobInputStream( (CloudPageBlobWrapper) blob, getInstrumentedContext( isConcurrentOOBAppendAllowed())); } }
private static void storeMetadataAttribute(CloudBlobWrapper blob, String key, String value) { HashMap<String, String> metadata = blob.getMetadata(); if (null == metadata) { metadata = new HashMap<String, String>(); } metadata.put(key, value); blob.setMetadata(metadata); }
private static String getMetadataAttribute(CloudBlobWrapper blob, String... keyAlternatives) { HashMap<String, String> metadata = blob.getMetadata(); if (null == metadata) { return null; } for (String key : keyAlternatives) { if (metadata.containsKey(key)) { return metadata.get(key); } } return null; }
private static void removeMetadataAttribute(CloudBlobWrapper blob, String key) { HashMap<String, String> metadata = blob.getMetadata(); if (metadata != null) { metadata.remove(key); blob.setMetadata(metadata); } }
private static void storePermissionStatus(CloudBlobWrapper blob, PermissionStatus permissionStatus) { storeMetadataAttribute(blob, PERMISSION_METADATA_KEY, PERMISSION_JSON_SERIALIZER.toJSON(permissionStatus)); // Remove the old metadata key if present removeMetadataAttribute(blob, OLD_PERMISSION_METADATA_KEY); }
private PermissionStatus getPermissionStatus(CloudBlobWrapper blob) { String permissionMetadataValue = getMetadataAttribute(blob, PERMISSION_METADATA_KEY, OLD_PERMISSION_METADATA_KEY); if (permissionMetadataValue != null) { return PermissionStatusJsonSerializer.fromJSONString( permissionMetadataValue); } else { return defaultPermissionNoBlobMetadata(); } }
private static String getLinkAttributeValue(CloudBlobWrapper blob) throws UnsupportedEncodingException { String encodedLinkTarget = getMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY, OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY); String linkTarget = null; if (encodedLinkTarget != null) { linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8"); } return linkTarget; }
@Override public void storeEmptyFolder(String key, PermissionStatus permissionStatus) throws AzureException { if (null == storageInteractionLayer) { final String errMsg = String.format( "Storage session expected for URI '%s' but does not exist.", sessionUri); throw new AssertionError(errMsg); } // Check if there is an authenticated account associated with the file // this instance of the WASB file system. If not the file system has not // been authenticated and all access is anonymous. if (!isAuthenticatedAccess()) { // Preemptively raise an exception indicating no uploads are // allowed to anonymous accounts. throw new AzureException( "Uploads to to public accounts using anonymous access is prohibited."); } try { checkContainer(ContainerAccessType.PureWrite); CloudBlobWrapper blob = getBlobReference(key); storePermissionStatus(blob, permissionStatus); storeFolderAttribute(blob); openOutputStream(blob).close(); } catch (Exception e) { // Caught exception while attempting upload. Re-throw as an Azure // storage exception. throw new AzureException(e); } }
/** * Stores an empty blob that's linking to the temporary file where're we're * uploading the initial data. */ @Override public void storeEmptyLinkFile(String key, String tempBlobKey, PermissionStatus permissionStatus) throws AzureException { if (null == storageInteractionLayer) { final String errMsg = String.format( "Storage session expected for URI '%s' but does not exist.", sessionUri); throw new AssertionError(errMsg); } // Check if there is an authenticated account associated with the file // this instance of the WASB file system. If not the file system has not // been authenticated and all access is anonymous. if (!isAuthenticatedAccess()) { // Preemptively raise an exception indicating no uploads are // allowed to anonymous accounts. throw new AzureException( "Uploads to to public accounts using anonymous access is prohibited."); } try { checkContainer(ContainerAccessType.PureWrite); CloudBlobWrapper blob = getBlobReference(key); storePermissionStatus(blob, permissionStatus); storeLinkAttribute(blob, tempBlobKey); openOutputStream(blob).close(); } catch (Exception e) { // Caught exception while attempting upload. Re-throw as an Azure // storage exception. throw new AzureException(e); } }
/** * This private method uses the root directory or the original container to * get the block blob reference depending on whether the original file system * object was constructed with a short- or long-form URI. If the root * directory is non-null the URI in the file constructor was in the long form. * * @param aKey * : a key used to query Azure for the block blob. * @returns blob : a reference to the Azure block blob corresponding to the * key. * @throws URISyntaxException * */ private CloudBlobWrapper getBlobReference(String aKey) throws StorageException, URISyntaxException { CloudBlobWrapper blob = null; if (isPageBlobKey(aKey)) { blob = this.container.getPageBlobReference(aKey); } else { blob = this.container.getBlockBlobReference(aKey); blob.setStreamMinimumReadSizeInBytes(downloadBlockSizeBytes); blob.setWriteBlockSizeInBytes(uploadBlockSizeBytes); } return blob; }
@Override public DataInputStream retrieve(String key, long startByteOffset) throws AzureException, IOException { try { // Check if a session exists, if not create a session with the // Azure storage server. if (null == storageInteractionLayer) { final String errMsg = String.format( "Storage session expected for URI '%s' but does not exist.", sessionUri); throw new AssertionError(errMsg); } checkContainer(ContainerAccessType.PureRead); // Get blob reference and open the input buffer stream. CloudBlobWrapper blob = getBlobReference(key); // Open input stream and seek to the start offset. InputStream in = blob.openInputStream( getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed())); // Create a data input stream. DataInputStream inDataStream = new DataInputStream(in); // Skip bytes and ignore return value. This is okay // because if you try to skip too far you will be positioned // at the end and reads will not return data. inDataStream.skip(startByteOffset); return inDataStream; } catch (Exception e) { // Re-throw as an Azure storage exception. throw new AzureException(e); } }