Python requests 模块,codes() 实例源码
我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.codes()。
def post(self, *args, **kwargs):
"""helper method for HTTP POST
:param args:
:param kwargs:
:return: json response
"""
try:
resp = self.session.post(*args, **kwargs)
if resp.status_code in _EXCEPTIONS_BY_CODE:
raise _EXCEPTIONS_BY_CODE[resp.status_code](resp.reason)
if resp.status_code != requests.codes['ok']:
raise exceptions.Etcd3Exception(resp.reason)
except requests.exceptions.Timeout as ex:
raise exceptions.ConnectionTimeoutError(six.text_type(ex))
except requests.exceptions.ConnectionError as ex:
raise exceptions.ConnectionFailedError(six.text_type(ex))
return resp.json()
def request_new_access_token(session):
url = "{token_issuer_host}/v1/{token_issuer_path}".format(
token_issuer_host=session.token_issuer_host,
token_issuer_path=session.token_issuer_path
)
headers = {
"Authorization": "Basic {}".format(base64.b64encode(
(session.api_key + ":" + session.api_secret
).encode("ascii")).decode("ascii")
), "X-Access": "{}".format(
base64.b64encode('{"type":"Full"}'.encode("ascii")
).decode("ascii")
)
}
data = {"grant_type": "client_credentials"}
response = requests.post(url=url, headers=headers, data=data)
if response and response.status_code == requests.codes["ok"]:
response_json = response.json()
session.access_token = response_json["access_token"]
return session.access_token
def request(self, method, url, params = {}, data = {}, files = None):
post_values = data
headers = {"Authorization": "Bearer %s" % self.access_token}
try:
response = self.session.request(method, url, params=params, data=post_values,
files=files, headers=headers,
timeout=self.timeout, verify=False)
except Timeout as e:
logger.warning('POST for %s timed out: %s', url, e)
raise e
if response.status_code == requests.codes['forbidden']:
raise LoginException("forbidden status code received on %s %s %s", method, response.status_code, response.text)
elif (response.status_code != requests.codes['ok']):
self.access_token = None
logger.error("raised exception, resetting token %s", response.status_code)
# read content to let it know we are done with
response.content
return response
def transcribe(self, fp):
data = fp.read()
r = self._get_response(data)
if r.status_code == requests.codes['unauthorized']:
# Request token invalid, retry once with a new token
self._logger.warning('OAuth access token invalid, generating a ' +
'new one and retrying...')
self._token = None
r = self._get_response(data)
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
self._logger.critical('Request failed with response: %r',
r.text,
exc_info=True)
return []
except requests.exceptions.RequestException:
self._logger.critical('Request failed.', exc_info=True)
return []
else:
try:
recognition = r.json()['Recognition']
if recognition['Status'] != 'OK':
raise ValueError(recognition['Status'])
results = [(x['Hypothesis'], x['Confidence'])
for x in recognition['NBest']]
except ValueError as e:
self._logger.debug('Recognition failed with status: %s',
e.args[0])
return []
except KeyError:
self._logger.critical('Cannot parse response.',
exc_info=True)
return []
else:
transcribed = [x[0].upper() for x in sorted(results,
key=lambda x: x[1],
reverse=True)]
self._logger.info('Transcribed: %r', transcribed)
return transcribed
def send_request(self, payload, **kwargs):
# type: (dict, dict) -> dict
kwargs.setdefault('headers', {})
for key, value in iteritems(self.DEFAULT_HEADERS):
kwargs['headers'].setdefault(key, value)
response = self._send_http_request(
# Use a custom JSON encoder that knows how to convert Tryte values.
payload = JsonEncoder().encode(payload),
url = self.node_url,
**kwargs
)
return self._interpret_response(response, payload, {codes['ok']})
def transcribe(self, fp):
data = fp.read()
r = self._get_response(data)
if r.status_code == requests.codes['unauthorized']:
# Request token invalid, retry once with a new token
self._logger.warning('OAuth access token invalid, generating a ' +
'new one and retrying...')
self._token = None
r = self._get_response(data)
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
self._logger.critical('Request failed with response: %r',
r.text,
exc_info=True)
return []
except requests.exceptions.RequestException:
self._logger.critical('Request failed.', exc_info=True)
return []
else:
try:
recognition = r.json()['Recognition']
if recognition['Status'] != 'OK':
raise ValueError(recognition['Status'])
results = [(x['Hypothesis'], x['Confidence'])
for x in recognition['NBest']]
except ValueError as e:
self._logger.debug('Recognition failed with status: %s',
e.args[0])
return []
except KeyError:
self._logger.critical('Cannot parse response.',
exc_info=True)
return []
else:
transcribed = [x[0].upper() for x in sorted(results,
key=lambda x: x[1],
reverse=True)]
self._logger.info('Transcribed: %r', transcribed)
return transcribed
def login(api_url, username, password):
"""Login to tendrl server."""
post_data = {"username": username, "password": password}
request = requests.post(
"{}login".format(api_url),
data=json.dumps(post_data))
if request.status_code == requests.codes["ok"]:
return request.json()
else:
response = {"url": request.url, "data": post_data}
MODULE.fail_json(
msg="Could not login with these credentials.",
meta=response)
def wasNotFound(self):
return self.currentResponse is not None and self.currentResponse.status_code == requests.codes.not_found
def _feedparser_parse_with_options(self) -> Tuple[feedparser.FeedParserDict, "UpdateResult"]:
"""
Perform a feedparser parse, providing arguments (like etag) we might want it to use.
Don't provide etag/last_modified if the last get was unsuccessful.
"""
if self.feed_state.last_modified is not None:
last_mod = self.feed_state.last_modified.timetuple()
else:
last_mod = None
# NOTE - this naming is a bit confusing here - parser is really a thing you call with
# arguments to get a feedparser result.
# Maybe better called parser-generator, or parse-performer or something?
parsed = self.parser(self.url, self.feed_state.etag, last_mod)
self.feed_state.etag = parsed.get("etag", self.feed_state.etag)
self.feed_state.store_last_modified(parsed.get("modified_parsed", None))
# Detect bozo errors (malformed RSS/ATOM feeds).
if "status" not in parsed and parsed.get("bozo", None) == 1:
# NOTE: Feedparser documentation indicates that you can always call getMessage, but
# it's possible for feedparser to spit out a URLError, which doesn't have getMessage.
# Catch this case.
if hasattr(parsed.bozo_exception, "getMessage()"):
msg = parsed.bozo_exception.getMessage()
else:
msg = repr(parsed.bozo_exception)
LOG.info(f"Unable to retrieve feed for {self.metadata['name']} from {self.url}.")
LOG.debug(f"Update failed because bozo exception {msg} occurred.")
return (None, UpdateResult.FAILURE)
elif parsed.get("status") == requests.codes["NOT_MODIFIED"]:
LOG.debug("No update to feed, nothing to do.")
return (None, UpdateResult.UNNEEDED)
else:
return (parsed, UpdateResult.SUCCESS)
def request(self, method, url, data = {}, files = None):
post_values = data
post_values['csrfmiddlewaretoken'] = self.csrf_token
try:
if 'sessionid' not in self.session.cookies:
self.login()
except CookieConflictError as e:
logger.warn("CookieConflictError %s. Relogging in" % e)
self.login()
sessionid = self.session.cookies['sessionid']
self.session.cookies.clear()
self.session.cookies['csrftoken'] = self.csrf_token
self.session.cookies['sessionid'] = sessionid
headers = {"X-CSRFToken": self.csrf_token, "Referer": "%s://%s" % (self.server_protocol, self.server_host)}
try:
response = self.session.request(method, url, data=post_values,
files=files, headers=headers,
timeout=self.timeout, verify=False)
except Timeout as e:
logger.warning('%s for %s timed out: %s' % (method, url, e))
raise e
#if (response.status_code == requests.codes['not_found'] or response.status_code == requests.codes['internal_server_error']):
if response.status_code == requests.codes['forbidden']:
raise LoginException("forbidden status code received on %s %s %s" % (method, response.status_code, response.text))
elif (response.status_code != requests.codes['ok']):
self.csrf_token = None
logger.error("raised exception, resetting token %s" % response.status_code)
#raise Exception("invalid status code received on %s %s %s" % (method, response.status_code, response.text))
# read content to let it know we are done with
response.content
return response
def transcribe(self, fp):
"""
Performs STT via the Google Speech API, transcribing an audio file and
returning an English string.
Arguments:
audio_file_path -- the path to the .wav file to be transcribed
"""
if not self.api_key:
self._logger.critical('API key missing, transcription request ' +
'aborted.')
return []
elif not self.language:
self._logger.critical('Language info missing, transcription ' +
'request aborted.')
return []
wav = wave.open(fp, 'rb')
frame_rate = wav.getframerate()
wav.close()
data = fp.read()
headers = {'content-type': 'audio/l16; rate=%s' % frame_rate}
r = self._http.post(self.request_url, data=data, headers=headers)
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
self._logger.critical('Request failed with http status %d',
r.status_code)
if r.status_code == requests.codes['forbidden']:
self._logger.warning('Status 403 is probably caused by an ' +
'invalid Google API key.')
return []
r.encoding = 'utf-8'
try:
# We cannot simply use r.json() because Google sends invalid json
# (i.e. multiple json objects, seperated by newlines. We only want
# the last one).
response = json.loads(list(r.text.strip().split('\n', 1))[-1])
if len(response['result']) == 0:
# Response result is empty
raise ValueError('Nothing has been transcribed.')
results = [alt['transcript'] for alt
in response['result'][0]['alternative']]
except ValueError as e:
self._logger.warning('Empty response: %s', e.args[0])
results = []
except (KeyError, IndexError):
self._logger.warning('Cannot parse response.', exc_info=True)
results = []
else:
# Convert all results to uppercase
results = tuple(result.upper() for result in results)
self._logger.info('Transcribed: %r', results)
return results
def transcribe(self, fp):
"""
Performs STT via the Watson Speech-to-Text API, transcribing an audio
file and returning an English string.
Arguments:
fp -- the path to the .wav file to be transcribed
"""
if not self.username:
self._logger.critical('Username missing, transcription request aborted.')
return []
elif not self.password:
self._logger.critical('Password missing, transcription request aborted.')
return []
wav = wave.open(fp, 'rb')
frame_rate = wav.getframerate()
wav.close()
data = fp.read()
headers = {
'content-type': 'audio/l16; rate={0}; channels=1'.format(frame_rate)
}
r = self._http.post(
'https://stream.watsonplatform.net/speech-to-text/api/v1/recognize?continuous=true',
data=data, headers=headers, auth=(self.username, self.password)
)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
self._logger.critical('Request failed with http status %d',
r.status_code)
if r.status_code == requests.codes['forbidden']:
self._logger.warning('Status 403 is probably caused by invalid credentials.')
return []
r.encoding = 'utf-8'
results = []
try:
response = r.json()
if not response['results']:
raise ValueError('Nothing has been transcribed.')
results = tuple([alt['transcript'].strip().upper() for alt in response['results'][0]['alternatives']])
self._logger.info('Transcribed: %r', results)
except ValueError as e:
self._logger.critical('Empty response: %s', e.args[0])
except (KeyError, IndexError):
self._logger.critical('Cannot parse response.', exc_info=True)
return results
def transcribe(self, fp):
"""
Performs STT via the Google Speech API, transcribing an audio file and
returning an English string.
Arguments:
audio_file_path -- the path to the .wav file to be transcribed
"""
if not self.api_key:
self._logger.critical('API key missing, transcription request ' +
'aborted.')
return []
elif not self.language:
self._logger.critical('Language info missing, transcription ' +
'request aborted.')
return []
wav = wave.open(fp, 'rb')
frame_rate = wav.getframerate()
wav.close()
data = fp.read()
headers = {'content-type': 'audio/l16; rate=%s' % frame_rate}
r = self._http.post(self.request_url, data=data, headers=headers)
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
self._logger.critical('Request failed with http status %d',
r.status_code)
if r.status_code == requests.codes['forbidden']:
self._logger.warning('Status 403 is probably caused by an ' +
'invalid Google API key.')
return []
r.encoding = 'utf-8'
try:
# We cannot simply use r.json() because Google sends invalid json
# (i.e. multiple json objects, seperated by newlines. We only want
# the last one).
response = json.loads(list(r.text.strip().split('\n', 1))[-1])
if len(response['result']) == 0:
# Response result is empty
raise ValueError('Nothing has been transcribed.')
results = [alt['transcript'] for alt
in response['result'][0]['alternative']]
except ValueError as e:
self._logger.warning('Empty response: %s', e.args[0])
results = []
except (KeyError, IndexError):
self._logger.warning('Cannot parse response.', exc_info=True)
results = []
else:
# Convert all results to uppercase
results = tuple(result.upper() for result in results)
self._logger.info('Transcribed: %r', results)
return results
def transcribe(self, fp):
"""
Performs STT via the Google Speech API, transcribing an audio file and
returning an English string.
Arguments:
audio_file_path -- the path to the .wav file to be transcribed
"""
if not self.api_key:
self._logger.critical('API key missing, transcription request ' +
'aborted.')
return []
elif not self.language:
self._logger.critical('Language info missing, transcription ' +
'request aborted.')
return []
wav = wave.open(fp, 'rb')
frame_rate = wav.getframerate()
wav.close()
data = fp.read()
headers = {'content-type': 'audio/l16; rate=%s' % frame_rate}
r = self._http.post(self.request_url, data=data, headers=headers)
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
self._logger.critical('Request failed with http status %d',
r.status_code)
if r.status_code == requests.codes['forbidden']:
self._logger.warning('Status 403 is probably caused by an ' +
'invalid Google API key.')
return []
r.encoding = 'utf-8'
try:
# We cannot simply use r.json() because Google sends invalid json
# (i.e. multiple json objects, seperated by newlines. We only want
# the last one).
response = json.loads(list(r.text.strip().split('\n', 1))[-1])
if len(response['result']) == 0:
# Response result is empty
raise ValueError('Nothing has been transcribed.')
results = [alt['transcript'] for alt
in response['result'][0]['alternative']]
except ValueError as e:
self._logger.warning('Empty response: %s', e.args[0])
results = []
except (KeyError, IndexError):
self._logger.warning('Cannot parse response.', exc_info=True)
results = []
else:
# Convert all results to uppercase
results = tuple(result.upper() for result in results)
self._logger.info('Transcribed: %r', results)
return results
def get_feed(self, attempt_count: int=0) -> "UpdateResult":
"""Get RSS structure for this subscription. Return status code indicating result."""
res = None
if attempt_count > MAX_RECURSIVE_ATTEMPTS:
LOG.debug(f"Too many recursive attempts ({attempt_count}) to get feed for sub"
f"{self.metadata['name']}, canceling.")
res = UpdateResult.FAILURE
elif self.url is None or self.url == "":
LOG.debug(f"URL {self.url} is empty , cannot get feed for sub " +
f"{self.metadata['name']}.")
res = UpdateResult.FAILURE
if res is not None:
return res
else:
LOG.info(f"Getting entries (attempt {attempt_count}) for {self.metadata['name']} " +
f"from {self.url}.")
(parsed, code) = self._feedparser_parse_with_options()
if code == UpdateResult.UNNEEDED:
LOG.info("We have the latest feed, nothing to do.")
return code
elif code != UpdateResult.SUCCESS:
LOG.info(f"Feedparser parse failed ({code}), aborting.")
return code
LOG.debug("Feedparser parse succeeded.")
# Detect some kinds of HTTP status codes signaling failure.
code = self._handle_http_codes(parsed)
if code == UpdateResult.ATTEMPT_AGAIN:
LOG.debug("Transient HTTP error, attempting again.")
temp = self.temp_url
code = self.get_feed(attempt_count=attempt_count + 1)
if temp is not None:
self.url = temp
elif code != UpdateResult.SUCCESS:
LOG.debug(f"Ran into HTTP error ({code}), aborting.")
else:
self.feed_state.load_rss_info(parsed)
return code
def _handle_http_codes(self, parsed: feedparser.FeedParserDict) -> "UpdateResult":
"""
Given feedparser parse result, determine if parse succeeded, and what to do about that.
"""
# Feedparser gives no status if you feedparse a local file.
if "status" not in parsed:
LOG.debug("Saw status 200 - OK, all is well.")
return UpdateResult.SUCCESS
status = parsed.get("status", 200)
result = UpdateResult.SUCCESS
if status == requests.codes["NOT_FOUND"]:
LOG.error(f"Saw status {status}, unable to retrieve feed text for "
f"{self.metadata['name']}."
f"\nStored URL {self.url} for {self.metadata['name']} will be preserved"
f"and checked again on next attempt."
)
result = UpdateResult.FAILURE
elif status in [requests.codes["UNAUTHORIZED"], requests.codes["GONE"]]:
LOG.error(f"Saw status {status}, unable to retrieve feed text for "
f"{self.metadata['name']}."
f"\nClearing stored URL {self.url} for {self.metadata['name']}."
f"\nPlease provide new URL and authorization for subscription "
f"{self.metadata['name']}."
)
self.url = None
result = UpdateResult.FAILURE
# Handle redirecting errors
elif status in [requests.codes["MOVED_PERMANENTLY"], requests.codes["PERMANENT_REDIRECT"]]:
LOG.warning(f"Saw status {status} indicating permanent URL change."
f"\nChanging stored URL {self.url} for {self.metadata['name']} to "
f"{parsed.get('href')} and attempting get with new URL."
)
self.url = parsed.get("href")
result = UpdateResult.ATTEMPT_AGAIN
elif status in [requests.codes["FOUND"], requests.codes["SEE_OTHER"],
requests.codes["TEMPORARY_REDIRECT"]]:
LOG.warning(f"Saw status {status} indicating temporary URL change."
f"\nAttempting with new URL {parsed.get('href')}."
f"\nStored URL {self.url} for {self.metadata['name']} will be unchanged.")
self.temp_url = self.url
self.url = parsed.get("href")
result = UpdateResult.ATTEMPT_AGAIN
elif status != 200:
LOG.warning(f"Saw '{status}'. Retrying retrieve for {self.metadata['name']} " +
f"at {self.url}.")
result = UpdateResult.ATTEMPT_AGAIN
else:
LOG.debug("Saw status 200. Success!")
return result