Python requests 模块,response() 实例源码
我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用requests.response()。
def request(self, method, *resources, **kwargs):
method = method.upper()
response_key = kwargs.pop('response_key', None)
key = kwargs.pop('key', None)
if key is not None:
response_key = key
query = kwargs.pop('query', None)
data = kwargs.pop('data', None)
fragment = kwargs.pop('fragment', '')
params = kwargs.pop('params', '')
keep_blank_values = kwargs.pop('keep_blank_values', None)
timeout = kwargs.pop('timeout', 60)
resource = self.build_resource(resources)
content_type = kwargs.pop('content_type', 'json')
if data is not None:
if 'json' in content_type:
kwargs['json'] = data
if content_type == 'body':
kwargs['data'] = data
url = self.url_for(resource, query, params=params,
fragment=fragment,
keep_blank_values=keep_blank_values)
self.logger.info('Request %s for %s', method, url)
response = requests.request(method, url, timeout=timeout, **kwargs)
return self.handle_response(response, response_key=response_key)
def emaillogin(email, otp):
"""Raw Cloud API call, request cloud token with email address & OTP.
Args:
email(str): Email address connected to Cozify account.
otp(int): One time passcode.
Returns:
str: cloud token
"""
payload = {
'email': email,
'password': otp
}
response = requests.post(cloudBase + 'user/emaillogin', params=payload)
if response.status_code == 200:
return response.text
else:
raise APIError(response.status_code, response.text)
def hubkeys(cloud_token):
"""1:1 implementation of user/hubkeys
Args:
cloud_token(str) Cloud remote authentication token.
Returns:
dict: Map of hub_id: hub_token pairs.
"""
headers = {
'Authorization': cloud_token
}
response = requests.get(cloudBase + 'user/hubkeys', headers=headers)
if response.status_code == 200:
return json.loads(response.text)
else:
raise APIError(response.status_code, response.text)
def refreshsession(cloud_token):
"""1:1 implementation of user/refreshsession
Args:
cloud_token(str) Cloud remote authentication token.
Returns:
str: New cloud remote authentication token. Not automatically stored into state.
"""
headers = {
'Authorization': cloud_token
}
response = requests.get(cloudBase + 'user/refreshsession', headers=headers)
if response.status_code == 200:
return response.text
else:
raise APIError(response.status_code, response.text)
def cone_search(ra, dec, radius, table="gaiadr1.gaia_source", **kwargs):
"""
Perform a cone search against the ESA Gaia database using the TAP.
:param ra:
Right ascension (degrees).
:param dec:
Declination (degrees).
:param radius:
Cone search radius (degrees).
:param table: [optional]
The table name to perform the cone search on. Some examples are:
gaiadr1.gaia_source
gaiadr1.tgas_source
:param kwargs:
Keyword arguments are passed directly to the `query` method.
:returns:
The data returned by the ESA/Gaia archive -- either as an astropy
table or as a dictionary -- and optionally, the `requests.response`
object used.
"""
return query(
""" SELECT *
FROM {table}
WHERE CONTAINS(
POINT('ICRS',{table}.ra,{table}.dec),
CIRCLE('ICRS',{ra:.10f},{dec:.10f},{radius:.10f})) = 1;""".format(
table=table, ra=ra, dec=dec, radius=radius), **kwargs)
def __init__(self, response, description, *args, **kwargs):
"""Exception
exception instance has:
response, description, content and status_code
:param response: requests.response
:param description: str - description for error
"""
self.response = response
self.description = description
self.status_code = response.status_code
self.content = response.content
super(ResponseError, self).__init__(*args, **kwargs)
def __repr__(self): # pragma: no cover
return 'Error status code: {}. Description: {}'.format(
self.response.status_code, self.description)
def handle_response(self, response, response_key=None):
"""Handler for response object
:param response: requests.response obj
:param response_key: key for dict in response obj
:return object, result for response, python obj
"""
status_code = response.status_code
try:
result = response.json()
except Exception as e:
self.logger.exception(e)
raise ResponseError(response, e)
if result:
if response_key is not None and status_code in self.ok_statuses:
if response_key in result:
result = result[response_key]
else:
raise ResponseError(response, 'Response key not found!')
elif response_key is not None and status_code in self.to_none_statuses:
result = None
elif status_code not in self.ok_statuses and status_code not in self.to_none_statuses:
raise ResponseError(response,
'Status code {} not in ok_statuses {}'.format(
status_code, self.ok_statuses))
if response_key is not None and self.empty_to_none and result is not None and not result:
result = None
return result
def make_402_payment(self, response, max_price):
"""Payment handling method implemented by a BitRequests subclass.
Args:
response (requests.response): 402 response from the API server.
max_price (int): maximum allowed price for a request (in satoshi).
Returns:
headers (dict):
dict of headers with payment data to send to the
API server to inform payment status for the resource.
"""
raise NotImplementedError()
def make_402_payment(self, response, max_price):
"""Make an on-chain payment."""
# Retrieve payment headers
headers = response.headers
price = headers.get(OnChainRequests.HTTP_BITCOIN_PRICE)
payee_address = headers.get(OnChainRequests.HTTP_BITCOIN_ADDRESS)
# Verify that the payment method is supported
if price is None or payee_address is None:
raise UnsupportedPaymentMethodError(
'Resource does not support that payment method.')
# Convert string headers into correct data types
price = int(price)
# Verify resource cost against our budget
if max_price and price > max_price:
max_price_err = 'Resource price ({}) exceeds max price ({}).'
raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))
# Create the signed transaction
onchain_payment = self.wallet.make_signed_transaction_for(
payee_address, price, use_unconfirmed=True)[0].get('txn').to_hex()
return_address = self.wallet.current_address
logger.debug('[OnChainRequests] Signed transaction: {}'.format(
onchain_payment))
return {
'Bitcoin-Transaction': onchain_payment,
'Return-Wallet-Address': return_address,
OnChainRequests.HTTP_BITCOIN_PRICE: str(price),
OnChainRequests.HTTP_PAYER_21USERNAME: urllib.parse.quote(self.username) if self.username else None
}
def get_402_info(self, url):
"""Get channel payment information about the resource."""
response = requests.get(url)
price = response.headers.get(ChannelRequests.HTTP_BITCOIN_PRICE)
channel_url = response.headers.get(ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER)
return {ChannelRequests.HTTP_BITCOIN_PRICE: price,
ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER: channel_url}
def requestlogin(email):
"""Raw Cloud API call, request OTP to be sent to account email address.
Args:
email(str): Email address connected to Cozify account.
"""
payload = { 'email': email }
response = requests.post(cloudBase + 'user/requestlogin', params=payload)
if response.status_code is not 200:
raise APIError(response.status_code, response.text)
def lan_ip():
"""1:1 implementation of hub/lan_ip
This call will fail with an APIError if the requesting source address is not the same as that of the hub, i.e. if they're not in the same NAT network.
The above is based on observation and may only be partially true.
Returns:
list: List of Hub ip addresses.
"""
response = requests.get(cloudBase + 'hub/lan_ip')
if response.status_code == 200:
return json.loads(response.text)
else:
raise APIError(response.status_code, response.text)
def _default_is_success(status_code):
"""Returns true if the success status is between [200, 300).
:param response_status: the http response status
:type response_status: int
:returns: True for success status; False otherwise
:rtype: bool
"""
return 200 <= status_code < 300
def get_auth_scheme(response):
"""Return authentication scheme and realm requested by server for 'Basic'
or 'acsjwt' (DCOS acs auth) or 'oauthjwt' (DCOS acs oauth) type
:param response: requests.response
:type response: requests.Response
:returns: auth_scheme, realm
:rtype: (str, str)
"""
if 'www-authenticate' in response.headers:
auths = response.headers['www-authenticate'].split(',')
scheme = next((auth_type.rstrip().lower() for auth_type in auths
if auth_type.rstrip().lower().startswith("basic") or
auth_type.rstrip().lower().startswith("acsjwt") or
auth_type.rstrip().lower().startswith("oauthjwt")),
None)
if scheme:
scheme_info = scheme.split("=")
auth_scheme = scheme_info[0].split(" ")[0].lower()
realm = scheme_info[-1].strip(' \'\"').lower()
return auth_scheme, realm
else:
return None, None
else:
return None, None
def _get_http_auth(response, url, auth_scheme):
"""Get authentication mechanism required by server
:param response: requests.response
:type response: requests.Response
:param url: parsed request url
:type url: str
:param auth_scheme: str
:type auth_scheme: str
:returns: AuthBase
:rtype: AuthBase
"""
hostname = url.hostname
username = url.username
password = url.password
if 'www-authenticate' in response.headers:
if auth_scheme not in ['basic', 'acsjwt', 'oauthjwt']:
msg = ("Server responded with an HTTP 'www-authenticate' field of "
"'{}', DCOS only supports 'Basic'".format(
response.headers['www-authenticate']))
raise DCOSException(msg)
if auth_scheme == 'basic':
# for basic auth if username + password was present,
# we'd already be authed by python requests module
username, password = _get_auth_credentials(username, hostname)
return HTTPBasicAuth(username, password)
# dcos auth (acs or oauth)
else:
return _get_dcos_auth(auth_scheme, username, password, hostname)
else:
msg = ("Invalid HTTP response: server returned an HTTP 401 response "
"with no 'www-authenticate' field")
raise DCOSException(msg)
def _get_dcos_auth(auth_scheme, username, password, hostname):
"""Get authentication flow for dcos acs auth and dcos oauth
:param auth_scheme: authentication_scheme
:type auth_scheme: str
:param username: username user for authentication
:type username: str
:param password: password for authentication
:type password: str
:param hostname: hostname for credentials
:type hostname: str
:returns: DCOSAcsAuth
:rtype: AuthBase
"""
toml_config = util.get_config()
token = toml_config.get("core.dcos_acs_token")
if token is None:
dcos_url = toml_config.get("core.dcos_url")
if auth_scheme == "acsjwt":
creds = _get_dcos_acs_auth_creds(username, password, hostname)
else:
creds = _get_dcos_oauth_creds(dcos_url)
verify = _verify_ssl()
# Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert
if verify is not None:
silence_requests_warnings()
url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login')
# using private method here, so we don't retry on this request
# error here will be bubbled up to _request_with_auth
response = _request('post', url, json=creds, verify=verify)
if response.status_code == 200:
token = response.json()['token']
config.set_val("core.dcos_acs_token", token)
return DCOSAcsAuth(token)
def query(query, authenticate=False, json=False, full_output=False, **kwargs):
"""
Execute a synchronous TAP query to the ESA Gaia database.
:param query:
The TAP query to execute.
:param authenticate: [optional]
Authenticate with the username and password information stored in the
config.
:param json: [optional]
Return the data in JSON format. If set to False, then the data will be
returned as an `astropy.table.Table`.
:param full_output: [optional]
Return a two-length tuple containing the data and the corresponding
`requests.response` object.
:returns:
The data returned - either as an astropy table or a dictionary (JSON) -
and optionally, the `requests.response` object used.
"""
format = "json" if json else "votable"
params = dict(REQUEST="doQuery", LANG="ADQL", FORMAT=format, query=query)
params.update(kwargs)
# Create session.
session = requests.Session()
if authenticate:
utils.login(session)
response = session.get("{}/tap/sync".format(config.url), params=params)
if not response.ok:
raise TAPQueryException(response)
if json:
data = response.json()
else:
# Take the table contents and return an astropy table.
data = Table.read(StringIO(response.text), format="votable")
return (data, response) if full_output else data
def __init__(self, endpoint, ok_statuses=None, to_none_statuses=None,
empty_to_none=True, close_slash=True,
logger=None, name=None, keep_blank_values=True):
"""Create a client
:param endpoint: str, ex. http://localhost:5000 or http://localhost:5000/api/
:param ok_statuses: default - (200, 201, 202, ), status codes for "ok"
:param to_none_statuses: statuses, for generate None as response, default - (404, )
:param empty_to_none: boolean, default - True, if True - empty response will be generate None response (empty str, empty list, empty dict)
:param close_slash: boolean, url += '/', if url.endswith != '/', default - True
:param logger: logger instance
:param name: name for client
:type name: str
"""
if name is None:
name = '<client: {}>'.format(endpoint)
if logger is None:
logger = get_logger(__name__)
self.logger = InstanceLogger(self, logger)
if endpoint.endswith('/'):
endpoint = endpoint[:-1]
if ok_statuses is not None:
self.ok_statuses = ok_statuses
if to_none_statuses is not None:
self.to_none_statuses = to_none_statuses
self.empty_to_none = empty_to_none
self.close_slash = close_slash
parsed_url = urlparse.urlparse(endpoint)
endpoint = self.get_endpoint_from_parsed_url(parsed_url)
self.keep_blank_values = keep_blank_values
self.endpoint = endpoint
self.path = parsed_url.path
self.query = urlparse.parse_qs(parsed_url.query,
keep_blank_values=self.keep_blank_values)
self.fragment = parsed_url.fragment
self.params = parsed_url.params
self.name = name
self.logger.debug(
'Client built, endpoint: "%s", path: "%s", query: %s, params: %s, fragment: %s',
self.endpoint, self.path,
self.query, self.params, self.fragment)
def request(self, method, url, max_price=None, mock_requests=False, **kwargs):
"""Make a 402 request for a resource.
This is the BitRequests public method that should be used to complete a
402 request using the desired payment method (as constructed by a class
implementing BitRequests)
Args:
method (string): HTTP method for completing the request in lower-
case letters. Examples: 'get', 'post', 'put'
url (string): URL of the requested resource.
data (dict): python dict of parameters to send with the request.
max_price (int): maximum allowed price for a request (in satoshi).
Returns:
response (requests.response):
response from paying for the requested resource.
"""
if mock_requests:
fake_response = requests.models.Response()
fake_response.status_code = 200
fake_response._content = b''
return fake_response
# Make the initial request for the resource
response = requests.request(method, url, **kwargs)
# Return if we receive a status code other than 402: payment required
if response.status_code != requests.codes.payment_required:
return response
# Pass the response to the main method for handling payment
logger.debug('[BitRequests] 402 payment required: {} satoshi.'.format(
response.headers['price']))
payment_headers = self.make_402_payment(response, max_price)
# Reset the position of any files that have been used
self._reset_file_positions(kwargs.get('files'), kwargs.get('data'))
# Add any user-provided headers to the payment headers dict
if 'headers' in kwargs:
if isinstance(kwargs['headers'], dict):
kwargs['headers'].update(payment_headers)
else:
raise ValueError('argument \'headers\' must be a dict.')
else:
kwargs['headers'] = payment_headers
paid_response = requests.request(method, url, **kwargs)
setattr(paid_response, 'amount_paid', int(response.headers['price']))
if paid_response.status_code == requests.codes.ok:
logger.debug('[BitRequests] Successfully purchased resource.')
else:
logger.debug('[BitRequests] Could not purchase resource.')
return paid_response
def make_402_payment(self, response, max_price):
"""Make a bit-transfer payment to the payment-handling service."""
# Retrieve payment headers
headers = response.headers
price = headers.get(BitTransferRequests.HTTP_BITCOIN_PRICE)
payee_address = headers.get(BitTransferRequests.HTTP_BITCOIN_ADDRESS)
payee_username = headers.get(BitTransferRequests.HTTP_BITCOIN_USERNAME)
# Verify that the payment method is supported
if price is None or payee_address is None or payee_username is None:
raise UnsupportedPaymentMethodError(
'Resource does not support that payment method.')
# Convert string headers into correct data types
price = int(price)
# verify that we have the money to purchase the resource
buffer_balance = self.client.get_earnings()["total_earnings"]
if price > buffer_balance:
insuff_funds_err = 'Resource price ({}) exceeds buffer balance ({}).'
raise InsufficientBalanceError(insuff_funds_err.format(price, buffer_balance))
# Verify resource cost against our budget
if max_price and price > max_price:
max_price_err = 'Resource price ({}) exceeds max price ({}).'
raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price))
# Get the signing public key
pubkey = self.wallet.get_public_key()
compressed_pubkey = codecs.encode(pubkey.compressed_bytes, 'base64').decode()
# Create and sign BitTranfer
bittransfer = json.dumps({
'payer': self.username,
'payer_pubkey': compressed_pubkey,
'payee_address': payee_address,
'payee_username': payee_username,
'amount': price,
'timestamp': time.time(),
'description': response.url
})
if not isinstance(bittransfer, str):
raise TypeError("Serialized bittransfer must be a string")
signature = self.wallet.sign_message(bittransfer)
logger.debug('[BitTransferRequests] Signature: {}'.format(signature))
logger.debug('[BitTransferRequests] BitTransfer: {}'.format(bittransfer))
return {
'Bitcoin-Transfer': bittransfer,
'Authorization': signature
}
def request(method,
url,
is_success=_default_is_success,
timeout=None,
verify=None,
**kwargs):
"""Sends an HTTP request. If the server responds with a 401, ask the
user for their credentials, and try request again (up to 3 times).
:param method: method for the new Request object
:type method: str
:param url: URL for the new Request object
:type url: str
:param is_success: Defines successful status codes for the request
:type is_success: Function from int to bool
:param timeout: request timeout
:type timeout: int
:param verify: whether to verify SSL certs or path to cert(s)
:type verify: bool | str
:param kwargs: Additional arguments to requests.request
(see http://docs.python-requests.org/en/latest/api/#requests.request)
:type kwargs: dict
:rtype: Response
"""
if 'headers' not in kwargs:
kwargs['headers'] = {'Accept': 'application/json'}
verify = _verify_ssl(verify)
# Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad certs
if verify is not None:
silence_requests_warnings()
response = _request(method, url, is_success, timeout,
verify=verify, **kwargs)
if response.status_code == 401:
response = _request_with_auth(response, method, url, is_success,
timeout, verify, **kwargs)
if is_success(response.status_code):
return response
elif response.status_code == 403:
raise DCOSAuthorizationException(response)
else:
raise DCOSHTTPException(response)