@Override public void initialize(URI uri, Configuration conf) throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { final String errMsg = String .format("Cannot initialize WASB file system, URI authority not recognized."); throw new IllegalArgumentException(errMsg); } super.initialize(uri, conf); if (store == null) { store = createDefaultStore(conf); } instrumentation = new AzureFileSystemInstrumentation(conf); if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { // Make sure the metrics system is available before interacting with Azure AzureFileSystemMetricsSystem.fileSystemStarted(); metricsSourceName = newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, instrumentation); } store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); if (LOG.isDebugEnabled()) { LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = " + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); } }
/** * Writes data to the given file of the given size, flushing every * x bytes. Measure performance of that and return it. */ private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path, long size, long flushInterval) throws IOException { AzureFileSystemInstrumentation instrumentation = fs.getInstrumentation(); long initialRequests = instrumentation.getCurrentWebResponses(); Date start = new Date(); OutputStream output = fs.create(path); writeTestFile(output, size, flushInterval); output.close(); long finalRequests = instrumentation.getCurrentWebResponses(); return new TestResult(new Date().getTime() - start.getTime(), finalRequests - initialRequests); }
@Override public void initialize(URI uri, Configuration conf) throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { final String errMsg = String .format("Cannot initialize WASB file system, URI authority not recognized."); throw new IllegalArgumentException(errMsg); } super.initialize(uri, conf); if (store == null) { store = createDefaultStore(conf); } instrumentation = new AzureFileSystemInstrumentation(conf); if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { // Make sure the metrics system is available before interacting with Azure AzureFileSystemMetricsSystem.fileSystemStarted(); metricsSourceName = newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, instrumentation); } store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = {}", conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); }
@Override public void initialize(URI uri, Configuration conf) throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { final String errMsg = String .format("Cannot initialize WASB file system, URI authority not recognized."); throw new IllegalArgumentException(errMsg); } super.initialize(uri, conf); if (store == null) { store = createDefaultStore(conf); } // Make sure the metrics system is available before interacting with Azure AzureFileSystemMetricsSystem.fileSystemStarted(); metricsSourceName = newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; instrumentation = new AzureFileSystemInstrumentation(conf); AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, instrumentation); store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE); if (LOG.isDebugEnabled()) { LOG.debug("NativeAzureFileSystem. Initializing."); LOG.debug(" blockSize = " + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); } }
/** * Method for the URI and configuration object necessary to create a storage * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * * @param uri - URI for target storage blob. * @param conf - reference to configuration object. * @param instrumentation - the metrics source that will keep track of operations here. * * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IllegalArgumentException, AzureException, IOException { if (null == instrumentation) { throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } // Check that URI exists. // if (null == uri) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Check that configuration object is non-null. // if (null == conf) { throw new IllegalArgumentException( "Cannot initialize WASB file system, conf is null"); } if(!conf.getBoolean( NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { //If not skip azure metrics, create bandwidthGaugeUpdater this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); } // Incoming parameters validated. Capture the URI and the job configuration // object. // sessionUri = uri; sessionConfiguration = conf; // Start an Azure storage session. // createAzureStorageSession(); // Extract the directories that should contain page blobs pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); LOG.debug("Page blob directories: " + setToString(pageBlobDirs)); // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; try { // Add to this the hbase root directory, or /hbase is that is not set. hbaseRoot = verifyAndConvertToStandardFormat( sessionConfiguration.get("hbase.rootdir", "hbase")); atomicRenameDirs.add(hbaseRoot); } catch (URISyntaxException e) { LOG.warn("Unable to initialize HBase root as an atomic rename directory."); } LOG.debug("Atomic rename directories: " + setToString(atomicRenameDirs)); }
public static AzureBlobStorageTestAccount createOutOfBandStore( int uploadBlockSize, int downloadBlockSize) throws Exception { saveMetricsConfigFile(); CloudBlobContainer container = null; Configuration conf = createTestConfiguration(); CloudStorageAccount account = createTestAccount(conf); if (null == account) { return null; } String containerName = String.format("wasbtests-%s-%tQ", System.getProperty("user.name"), new Date()); // Create the container. container = account.createCloudBlobClient().getContainerReference( containerName); container.create(); String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME); // Ensure that custom throttling is disabled and tolerate concurrent // out-of-band appends. conf.setBoolean(KEY_DISABLE_THROTTLING, true); conf.setBoolean(KEY_READ_TOLERATE_CONCURRENT_APPEND, true); // Set account URI and initialize Azure file system. URI accountUri = createAccountUri(accountName, containerName); // Set up instrumentation. // AzureFileSystemMetricsSystem.fileSystemStarted(); String sourceName = NativeAzureFileSystem.newMetricsSourceName(); String sourceDesc = "Azure Storage Volume File System metrics"; AzureFileSystemInstrumentation instrumentation = new AzureFileSystemInstrumentation(conf); AzureFileSystemMetricsSystem.registerSource( sourceName, sourceDesc, instrumentation); // Create a new AzureNativeFileSystemStore object. AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore(); // Initialize the store with the throttling feedback interfaces. testStorage.initialize(accountUri, conf, instrumentation); // Create test account initializing the appropriate member variables. // AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(testStorage, account, container); return testAcct; }
/** * Method for the URI and configuration object necessary to create a storage * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * * @param uri - URI for target storage blob. * @param conf - reference to configuration object. * @param instrumentation - the metrics source that will keep track of operations here. * * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IllegalArgumentException, AzureException, IOException { if (null == instrumentation) { throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } // Check that URI exists. // if (null == uri) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Check that configuration object is non-null. // if (null == conf) { throw new IllegalArgumentException( "Cannot initialize WASB file system, conf is null"); } if(!conf.getBoolean( NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { //If not skip azure metrics, create bandwidthGaugeUpdater this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); } // Incoming parameters validated. Capture the URI and the job configuration // object. // sessionUri = uri; sessionConfiguration = conf; // Start an Azure storage session. // createAzureStorageSession(); // Extract the directories that should contain page blobs pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); LOG.debug("Page blob directories: {}", setToString(pageBlobDirs)); // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; try { // Add to this the hbase root directory, or /hbase is that is not set. hbaseRoot = verifyAndConvertToStandardFormat( sessionConfiguration.get("hbase.rootdir", "hbase")); atomicRenameDirs.add(hbaseRoot); } catch (URISyntaxException e) { LOG.warn("Unable to initialize HBase root as an atomic rename directory."); } LOG.debug("Atomic rename directories: {} ", setToString(atomicRenameDirs)); }
/** * Method for the URI and configuration object necessary to create a storage * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * * @param uri - URI for target storage blob. * @param conf - reference to configuration object. * @param instrumentation - the metrics source that will keep track of operations here. * * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IllegalArgumentException, AzureException, IOException { if (null == instrumentation) { throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } // Check that URI exists. // if (null == uri) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Check that configuration object is non-null. // if (null == conf) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Incoming parameters validated. Capture the URI and the job configuration // object. // sessionUri = uri; sessionConfiguration = conf; // Start an Azure storage session. // createAzureStorageSession(); // Extract the directories that should contain page blobs pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); LOG.debug("Page blob directories: " + setToString(pageBlobDirs)); // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; try { // Add to this the hbase root directory, or /hbase is that is not set. hbaseRoot = verifyAndConvertToStandardFormat( sessionConfiguration.get("hbase.rootdir", "hbase")); atomicRenameDirs.add(hbaseRoot); } catch (URISyntaxException e) { LOG.warn("Unable to initialize HBase root as an atomic rename directory."); } LOG.debug("Atomic rename directories: " + setToString(atomicRenameDirs)); }
/** * Gets the metrics source for this file system. * This is mainly here for unit testing purposes. * * @return the metrics source. */ public AzureFileSystemInstrumentation getInstrumentation() { return instrumentation; }
void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IOException;