@VisibleForTesting public BandwidthGaugeUpdater getBandwidthGaugeUpdater() { return bandwidthGaugeUpdater; }
/** * Method for the URI and configuration object necessary to create a storage * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * * @param uri - URI for target storage blob. * @param conf - reference to configuration object. * @param instrumentation - the metrics source that will keep track of operations here. * * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IllegalArgumentException, AzureException, IOException { if (null == instrumentation) { throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } // Check that URI exists. // if (null == uri) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Check that configuration object is non-null. // if (null == conf) { throw new IllegalArgumentException( "Cannot initialize WASB file system, conf is null"); } if(!conf.getBoolean( NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { //If not skip azure metrics, create bandwidthGaugeUpdater this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); } // Incoming parameters validated. Capture the URI and the job configuration // object. // sessionUri = uri; sessionConfiguration = conf; // Start an Azure storage session. // createAzureStorageSession(); // Extract the directories that should contain page blobs pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); LOG.debug("Page blob directories: " + setToString(pageBlobDirs)); // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; try { // Add to this the hbase root directory, or /hbase is that is not set. hbaseRoot = verifyAndConvertToStandardFormat( sessionConfiguration.get("hbase.rootdir", "hbase")); atomicRenameDirs.add(hbaseRoot); } catch (URISyntaxException e) { LOG.warn("Unable to initialize HBase root as an atomic rename directory."); } LOG.debug("Atomic rename directories: " + setToString(atomicRenameDirs)); }
/** * Method for the URI and configuration object necessary to create a storage * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * * @param uri - URI for target storage blob. * @param conf - reference to configuration object. * @param instrumentation - the metrics source that will keep track of operations here. * * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IllegalArgumentException, AzureException, IOException { if (null == instrumentation) { throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } // Check that URI exists. // if (null == uri) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Check that configuration object is non-null. // if (null == conf) { throw new IllegalArgumentException( "Cannot initialize WASB file system, conf is null"); } if(!conf.getBoolean( NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { //If not skip azure metrics, create bandwidthGaugeUpdater this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); } // Incoming parameters validated. Capture the URI and the job configuration // object. // sessionUri = uri; sessionConfiguration = conf; // Start an Azure storage session. // createAzureStorageSession(); // Extract the directories that should contain page blobs pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); LOG.debug("Page blob directories: {}", setToString(pageBlobDirs)); // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; try { // Add to this the hbase root directory, or /hbase is that is not set. hbaseRoot = verifyAndConvertToStandardFormat( sessionConfiguration.get("hbase.rootdir", "hbase")); atomicRenameDirs.add(hbaseRoot); } catch (URISyntaxException e) { LOG.warn("Unable to initialize HBase root as an atomic rename directory."); } LOG.debug("Atomic rename directories: {} ", setToString(atomicRenameDirs)); }
/** * Method for the URI and configuration object necessary to create a storage * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * * @param uri - URI for target storage blob. * @param conf - reference to configuration object. * @param instrumentation - the metrics source that will keep track of operations here. * * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IllegalArgumentException, AzureException, IOException { if (null == instrumentation) { throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } // Check that URI exists. // if (null == uri) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Check that configuration object is non-null. // if (null == conf) { throw new IllegalArgumentException( "Cannot initialize WASB file system, URI is null"); } // Incoming parameters validated. Capture the URI and the job configuration // object. // sessionUri = uri; sessionConfiguration = conf; // Start an Azure storage session. // createAzureStorageSession(); // Extract the directories that should contain page blobs pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); LOG.debug("Page blob directories: " + setToString(pageBlobDirs)); // Extract directories that should have atomic rename applied. atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); String hbaseRoot; try { // Add to this the hbase root directory, or /hbase is that is not set. hbaseRoot = verifyAndConvertToStandardFormat( sessionConfiguration.get("hbase.rootdir", "hbase")); atomicRenameDirs.add(hbaseRoot); } catch (URISyntaxException e) { LOG.warn("Unable to initialize HBase root as an atomic rename directory."); } LOG.debug("Atomic rename directories: " + setToString(atomicRenameDirs)); }