@Override public synchronized void close() throws IOException { if (isClosed) { return; } // Call the base close() to close any resources there. super.close(); // Close the store to close any resources there - e.g. the bandwidth // updater thread would be stopped at this time. store.close(); // Notify the metrics system that this file system is closed, which may // trigger one final metrics push to get the accurate final file system // metrics out. long startTime = System.currentTimeMillis(); if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); AzureFileSystemMetricsSystem.fileSystemClosed(); } LOG.debug("Submitting metrics when file system closed took {} ms.", (System.currentTimeMillis() - startTime)); isClosed = true; }
@Override public synchronized void close() throws IOException { if (isClosed) { return; } // Call the base close() to close any resources there. super.close(); // Close the store to close any resources there - e.g. the bandwidth // updater thread would be stopped at this time. store.close(); // Notify the metrics system that this file system is closed, which may // trigger one final metrics push to get the accurate final file system // metrics out. long startTime = System.currentTimeMillis(); AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); AzureFileSystemMetricsSystem.fileSystemClosed(); if (LOG.isDebugEnabled()) { LOG.debug("Submitting metrics when file system closed took " + (System.currentTimeMillis() - startTime) + " ms."); } isClosed = true; }
@Override public void initialize(URI uri, Configuration conf) throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { final String errMsg = String .format("Cannot initialize WASB file system, URI authority not recognized."); throw new IllegalArgumentException(errMsg); } super.initialize(uri, conf); if (store == null) { store = createDefaultStore(conf); } instrumentation = new AzureFileSystemInstrumentation(conf); if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { // Make sure the metrics system is available before interacting with Azure AzureFileSystemMetricsSystem.fileSystemStarted(); metricsSourceName = newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, instrumentation); } store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); if (LOG.isDebugEnabled()) { LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = " + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); } }
@Override public synchronized void close() throws IOException { if (isClosed) { return; } // Call the base close() to close any resources there. super.close(); // Close the store to close any resources there - e.g. the bandwidth // updater thread would be stopped at this time. store.close(); // Notify the metrics system that this file system is closed, which may // trigger one final metrics push to get the accurate final file system // metrics out. long startTime = System.currentTimeMillis(); if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); AzureFileSystemMetricsSystem.fileSystemClosed(); } if (LOG.isDebugEnabled()) { LOG.debug("Submitting metrics when file system closed took " + (System.currentTimeMillis() - startTime) + " ms."); } isClosed = true; }
@Override public void initialize(URI uri, Configuration conf) throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { final String errMsg = String .format("Cannot initialize WASB file system, URI authority not recognized."); throw new IllegalArgumentException(errMsg); } super.initialize(uri, conf); if (store == null) { store = createDefaultStore(conf); } instrumentation = new AzureFileSystemInstrumentation(conf); if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { // Make sure the metrics system is available before interacting with Azure AzureFileSystemMetricsSystem.fileSystemStarted(); metricsSourceName = newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, instrumentation); } store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = {}", conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); }
@Override public void initialize(URI uri, Configuration conf) throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { final String errMsg = String .format("Cannot initialize WASB file system, URI authority not recognized."); throw new IllegalArgumentException(errMsg); } super.initialize(uri, conf); if (store == null) { store = createDefaultStore(conf); } // Make sure the metrics system is available before interacting with Azure AzureFileSystemMetricsSystem.fileSystemStarted(); metricsSourceName = newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; instrumentation = new AzureFileSystemInstrumentation(conf); AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, instrumentation); store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); if (LOG.isDebugEnabled()) { LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = " + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); } }
public static AzureBlobStorageTestAccount createOutOfBandStore( int uploadBlockSize, int downloadBlockSize) throws Exception { saveMetricsConfigFile(); CloudBlobContainer container = null; Configuration conf = createTestConfiguration(); CloudStorageAccount account = createTestAccount(conf); if (null == account) { return null; } String containerName = String.format("wasbtests-%s-%tQ", System.getProperty("user.name"), new Date()); // Create the container. container = account.createCloudBlobClient().getContainerReference( containerName); container.create(); String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME); // Ensure that custom throttling is disabled and tolerate concurrent // out-of-band appends. conf.setBoolean(KEY_DISABLE_THROTTLING, true); conf.setBoolean(KEY_READ_TOLERATE_CONCURRENT_APPEND, true); // Set account URI and initialize Azure file system. URI accountUri = createAccountUri(accountName, containerName); // Set up instrumentation. // AzureFileSystemMetricsSystem.fileSystemStarted(); String sourceName = NativeAzureFileSystem.newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; AzureFileSystemInstrumentation instrumentation = new AzureFileSystemInstrumentation(conf); AzureFileSystemMetricsSystem.registerSource( sourceName, sourceDesc, instrumentation); // Create a new AzureNativeFileSystemStore object. AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore(); // Initialize the store with the throttling feedback interfaces. testStorage.initialize(accountUri, conf, instrumentation); // Create test account initializing the appropriate member variables. // AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(testStorage, account, container); return testAcct; }