我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用botocore.exceptions.MetadataRetrievalError()。
def retrieve_uri(self, relative_uri): """Retrieve JSON metadata from ECS metadata. :type relative_uri: str :param relative_uri: A relative URI, e.g "/foo/bar?id=123" :return: The parsed JSON response. """ full_url = self._full_url(relative_uri) headers = {'Accept': 'application/json'} attempts = 0 while True: try: return self._get_response(full_url, headers, self.TIMEOUT_SECONDS) except MetadataRetrievalError as e: logger.debug("Received error when attempting to retrieve " "ECS metadata: %s", e, exc_info=True) self._sleep(self.SLEEP_TIME) attempts += 1 if attempts >= self.RETRY_ATTEMPTS: raise
def _get_response(self, full_url, headers, timeout): try: response = self._session.get(full_url, headers=headers, timeout=timeout) if response.status_code != 200: raise MetadataRetrievalError( error_msg="Received non 200 response (%s) from ECS metadata: %s" % (response.status_code, response.text)) try: return json.loads(response.text) except ValueError: raise MetadataRetrievalError( error_msg=("Unable to parse JSON returned from " "ECS metadata: %s" % response.text)) except RETRYABLE_HTTP_ERRORS as e: error_msg = ("Received error when attempting to retrieve " "ECS metadata: %s" % e) raise MetadataRetrievalError(error_msg=error_msg)
def _create_fetcher(self, relative_uri): def fetch_creds(): try: response = self._fetcher.retrieve_uri(relative_uri) except MetadataRetrievalError as e: logger.debug("Error retrieving ECS metadata: %s", e, exc_info=True) raise CredentialRetrievalError(provider=self.METHOD, error_msg=str(e)) return { 'access_key': response['AccessKeyId'], 'secret_key': response['SecretAccessKey'], 'token': response['Token'], 'expiry_time': response['Expiration'], } return fetch_creds
def _create_fetcher(self, full_uri, headers): def fetch_creds(): try: response = self._fetcher.retrieve_full_uri( full_uri, headers=headers) except MetadataRetrievalError as e: logger.debug("Error retrieving container metadata: %s", e, exc_info=True) raise CredentialRetrievalError(provider=self.METHOD, error_msg=str(e)) return { 'access_key': response['AccessKeyId'], 'secret_key': response['SecretAccessKey'], 'token': response['Token'], 'expiry_time': response['Expiration'], } return fetch_creds
def _retrieve_credentials(self, full_url, extra_headers=None): headers = {'Accept': 'application/json'} if extra_headers is not None: headers.update(extra_headers) attempts = 0 while True: try: return self._get_response(full_url, headers, self.TIMEOUT_SECONDS) except MetadataRetrievalError as e: logger.debug("Received error when attempting to retrieve " "container metadata: %s", e, exc_info=True) self._sleep(self.SLEEP_TIME) attempts += 1 if attempts >= self.RETRY_ATTEMPTS: raise