Python requests 模块,Response() 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用requests.Response()。
def get_response_and_time(self, key, default=(None, None)):
""" Retrieves response and timestamp for `key` if it's stored in cache,
otherwise returns `default`
:param key: key of resource
:param default: return this if `key` not found in cache
:returns: tuple (response, datetime)
.. note:: Response is restored after unpickling with :meth:`restore_response`
"""
try:
if key not in self.responses:
key = self.keys_map[key]
response, timestamp = self.responses[key]
except KeyError:
return default
return self.restore_response(response), timestamp
def put(self, url, data, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
"""
Perform a HTTP PUT request.
:param str url: URL of the HTTP request.
:param bytes | None data: Binary data to send in the request body.
:param dict | None headers: Additional headers for the HTTP request.
:param int ok_status_code: (Optional) Expected status code for the HTTP response.
:param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
:rtype: requests.Response
"""
params = {
'proxies': self.proxies,
'data': data
}
if headers is not None:
params['headers'] = headers
return self.request('put', url, params=params,
ok_status_code=ok_status_code, auto_renew=auto_renew)
def save_response(self, key, response):
""" Save response to cache
:param key: key for this response
:param response: response to save
.. note:: Response is reduced before saving (with :meth:`reduce_response`)
to make it picklable
"""
#
# Delete the X-Auth-Token
#
if 'X-Auth-Token' in response.request.headers:
del response.request.headers['X-Auth-Token']
self.responses[key] = response
def get_response(self, key, default=None):
""" Retrieves response and timestamp for `key` if it's stored in cache,
otherwise returns `default`
:param key: key of resource
:param default: return this if `key` not found in cache
:returns: tuple (response, datetime)
.. note:: Response is restored after unpickling with :meth:`restore_response`
"""
try:
if key not in self.responses:
key = self.keys_map[key]
response = self.responses[key]
except KeyError:
return default
return response
def restore_response(self, response, seen=None):
""" Restore response object after unpickling
"""
if seen is None:
seen = {}
try:
return seen[id(response)]
except KeyError:
pass
result = requests.Response()
for field in self._response_attrs:
setattr(result, field, getattr(response, field, None))
result.raw._cached_content_ = result.content
seen[id(response)] = result
result.history = tuple(self.restore_response(r, seen) for r in response.history)
return result
def test_request_api_server_auth_recover(self, options, config, request,
text):
driver = self._get_driver(options, config)
driver._apiinsecure = False
driver._use_api_certs = False
response_bad = requests.Response()
response_bad.status_code = requests.codes.unauthorized
token = "xyztoken"
text.return_value = json.dumps({
'access': {
'token': {
'id': token
},
},
})
response_good = requests.Response()
response_good.status_code = requests.codes.ok
request.side_effect = [response_bad, response_good, response_good]
url = "/URL"
driver._request_api_server(url)
request.assert_called_with('/URL', data=None,
headers={'X-AUTH-TOKEN': token})
def test_request_backend(self, options, config):
driver = self._get_driver(options, config)
driver._relay_request = mock.MagicMock()
response = requests.Response()
response.status_code = requests.codes.ok
driver._relay_request.return_value = response
context = mock.MagicMock()
data_dict = {}
obj_name = 'object'
action = 'TEST'
code, message = driver._request_backend(context, data_dict, obj_name,
action)
self.assertEqual(requests.codes.ok, code)
self.assertEqual({'message': None}, message)
driver._relay_request.assert_called()
def test_asend_returns_appropriate_sorna_response(mocker, req_params,
mock_sorna_aresp):
req = Request(**req_params)
methods = Request._allowed_methods
for method in methods:
req.method = method
mock_reqfunc = mocker.patch.object(
aiohttp.ClientSession, method.lower(),
new_callable=asynctest.CoroutineMock
)
mock_reqfunc.return_value, conf = mock_sorna_aresp
resp = await req.asend()
assert isinstance(resp, Response)
assert resp.status == conf['status']
assert resp.reason == conf['reason']
assert resp.content_type == conf['content_type']
body = await conf['read']()
assert resp.content_length == len(body)
assert resp.text() == body.decode()
assert resp.json() == json.loads(body.decode())
def _parse_response_content_for_predictions(self, question_number, response):
"""
Parses the json representation of the docs from the HTTP response and returns it as list predictions
with scores (and confidences if they exist)
:param str question_number: used as qid
:param requests.Response response:
:return: list of feature vectors
:rtype: list(list(str))
"""
response_content = response.json()
results = []
if response_content['response']['numFound'] > 0:
for doc in response_content['response']['docs']:
if "ranker.confidence" in doc:
conf_score = doc["ranker.confidence"]
else:
conf_score = None
results.append(Prediction(qid=question_number, aid=doc[self.DOC_ID_FIELD_NAME], rank_score=doc['score'],
conf_score=conf_score))
else:
self.logger.warn('Empty response: %s' % vars(response))
return results
def _parse_response_content_for_predictions(self, question_number, response):
"""
Parses the json representation of the docs from the HTTP response and returns it as list predictions
with scores (and confidences if they exist)
:param str question_number: used as qid
:param requests.Response response:
:return: list of feature vectors
:rtype: list(list(str))
"""
response_content = response.json()
results = []
if response_content["matching_results"] > 0:
for doc in response_content['results']:
results.append(Prediction(qid=question_number, aid=doc[DOC_ID_FIELD_NAME], rank_score=doc['score'],
conf_score=None))
else:
self.logger.warn('Empty response: %s' % vars(response))
return results
def get(self, url, params=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
"""
Perform a HTTP GET request.
:param str url: URL of the HTTP request.
:param dict[str, T] | None params: (Optional) Dictionary to construct query string.
:param dict | None headers: (Optional) Additional headers for the HTTP request.
:param int ok_status_code: (Optional) Expected status code for the HTTP response.
:param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
:rtype: requests.Response
"""
args = {'proxies': self.proxies}
if params is not None:
args['params'] = params
if headers is not None:
args['headers'] = headers
return self.request('get', url, args, ok_status_code=ok_status_code, auto_renew=auto_renew)
def post(self, url, data=None, json=None, headers=None, ok_status_code=requests.codes.ok, auto_renew=True):
"""
Perform a HTTP POST request.
:param str url: URL of the HTTP request.
:param dict | None data: (Optional) Data in POST body of the request.
:param dict | None json: (Optional) Send the dictionary as JSON content in POST body and set proper headers.
:param dict | None headers: (Optional) Additional headers for the HTTP request.
:param int ok_status_code: (Optional) Expected status code for the HTTP response.
:param True | False auto_renew: (Optional) If True, auto recover from expired token error or Internet failure.
:rtype: requests.Response
"""
params = {
'proxies': self.proxies
}
if json is not None:
params['json'] = json
else:
params['data'] = data
if headers is not None:
params['headers'] = headers
return self.request('post', url, params, ok_status_code=ok_status_code, auto_renew=auto_renew)
def mixin_request_id(self, resp):
"""mixin request id from request response
:param request.Response resp: http response
"""
if isinstance(resp, Response):
# Extract 'X-Openstack-Request-Id' from headers if
# response is a Response object.
request_id = (resp.headers.get('openstack-request-id') or
resp.headers.get('x-openstack-request-id') or
resp.headers.get('x-compute-request-id'))
else:
# If resp is of type string or None.
request_id = resp
self.request_id = request_id
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
if six.PY3 and isinstance(self._content, six.string_types):
self._content = self._content.encode('utf-8', 'strict')
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def test_request_retry(self, mock_request):
"""Tests that two connection errors will be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 3:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 204
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
cli.write_points(
self.dummy_points
)
def test_request_retry_raises(self, mock_request):
"""Tests that three connection errors will not be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 4:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)
def test_request_retry_raises(self, mock_request):
"""Tests that three connection errors will not be handled"""
class CustomMock(object):
i = 0
def connection_error(self, *args, **kwargs):
self.i += 1
if self.i < 4:
raise requests.exceptions.ConnectionError
else:
r = requests.Response()
r.status_code = 200
return r
mock_request.side_effect = CustomMock().connection_error
cli = InfluxDBClient(database='db')
with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)
def test_add_channel_failed_create_channel(mock_staff_client, mocker):
"""If client.channels.create fails an exception should be raised"""
response_500 = Response()
response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR
mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500)
with pytest.raises(ChannelCreationException) as ex:
api.add_channel(
Search.from_dict({}),
"title",
"name",
"public_description",
"channel_type",
123,
456,
)
assert ex.value.args[0] == "Error creating channel name"
mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with()
assert mock_staff_client.channels.create.call_count == 1
assert PercolateQuery.objects.count() == 0
assert Channel.objects.count() == 0
def test_send_batch_400_no_raise(self, mock_post):
"""
Test that if raise_for_status is False we don't raise an exception for a 400 response
"""
mock_post.return_value = Mock(
spec=Response,
status_code=HTTP_400_BAD_REQUEST,
json=mocked_json()
)
chunk_size = 10
recipient_tuples = [("{0}@example.com".format(letter), None) for letter in string.ascii_letters]
assert len(recipient_tuples) == 52
with override_settings(
MAILGUN_RECIPIENT_OVERRIDE=None,
):
resp_list = MailgunClient.send_batch(
'email subject', 'email body', recipient_tuples, chunk_size=chunk_size, raise_for_status=False
)
assert len(resp_list) == 6
for resp in resp_list:
assert resp.status_code == HTTP_400_BAD_REQUEST
assert mock_post.call_count == 6
assert mock_post.return_value.raise_for_status.called is False
def test_financial_aid_email_error(self, raise_for_status, mock_post):
"""
Test that send_financial_aid_email handles errors correctly
"""
mock_post.return_value = Mock(
spec=Response,
status_code=HTTP_400_BAD_REQUEST,
json=mocked_json(),
)
response = MailgunClient.send_financial_aid_email(
self.staff_user_profile.user,
self.financial_aid,
'email subject',
'email body',
raise_for_status=raise_for_status,
)
assert response.raise_for_status.called is raise_for_status
assert response.status_code == HTTP_400_BAD_REQUEST
assert response.json() == {}
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
if six.PY3 and isinstance(self._content, six.string_types):
self._content = self._content.encode('utf-8', 'strict')
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def post_settings(greeting_text):
""" Sets the START Button and also the Greeting Text.
The payload for the START Button will be 'USER_START'
:param str greeting_text: Desired Greeting Text (160 chars).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
# Set the greeting texts
url = TD_STS_URL + PAGE_ACCESS_TOKEN
txtpayload = {}
txtpayload['setting_type'] = 'greeting'
txtpayload['greeting'] = {'text': greeting_text}
response_msg = json.dumps(txtpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
# Set the start button
btpayload = {}
btpayload['setting_type'] = 'call_to_actions'
btpayload['thread_state'] = 'new_thread'
btpayload['call_to_actions'] = [{'payload': 'USER_START'}]
response_msg = json.dumps(btpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
return status
def post_start_button(payload='START'):
""" Sets the Thread Settings Greeting Text
(/docs/messenger-platform/thread-settings/get-started-button).
:param str payload: Desired postback payload (Default START).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = TD_STS_URL + PAGE_ACCESS_TOKEN
btpayload = {}
btpayload["setting_type"] = "call_to_actions"
btpayload["thread_state"] = "new_thread"
btpayload["call_to_actions"] = [{"payload": payload}]
response_msg = json.dumps(btpayload)
status = requests.post(url, headers=HEADER, data=response_msg)
return status
def post_domain_whitelisting(whitelisted_domains, domain_action_type='add'):
""" Sets the whistelisted domains for the Messenger Extension
(/docs/messenger-platform/thread-settings/domain-whitelisting).
:param list whistelisted_domains: Domains to be whistelisted.
:param str domain_action_type: Action to run `add/remove` (Defaut add).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = TD_STS_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['setting_type'] = 'domain_whitelisting'
payload['whitelisted_domains'] = whitelisted_domains
payload['domain_action_type'] = domain_action_type
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def typing(fbid, sender_action):
""" Displays/Hides the typing gif/mark seen on facebook chat
(/docs/messenger-platform/send-api-reference/sender-actions)
:param str fbid: User id to display action.
:param str sender_action: `typing_off/typing_on/mark_seen`.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
payload['sender_action'] = sender_action
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
# Send API Content Type
def post_text_message(fbid, message):
""" Sends a common text message
(/docs/messenger-platform/send-api-reference/text-message)
:param str fbid: User id to send the text.
:param str message: Text to be displayed for the user (230 chars).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
payload['message'] = {'text': message} # Limit 320 chars
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_attachment(fbid, media_url, file_type):
""" Sends a media attachment
(/docs/messenger-platform/send-api-reference/contenttypes)
:param str fbid: User id to send the audio.
:param str url: Url of a hosted media.
:param str type: image/audio/video/file.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
"""
url = MSG_URL + PAGE_ACCESS_TOKEN
payload = {}
payload['recipient'] = {'id': fbid}
attachment = {"type": file_type, "payload": {"url": media_url}}
payload['message'] = {"attachment": attachment}
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_start_button(payload='START'):
""" Sets the **Get Started button**.
:usage:
>>> payload = 'GET_STARTED'
>>> response = fbbotw.post_start_button(payload=payload)
:param str payload: Desired postback payload (Default 'START').
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/get-started-button <https://developers.facebook\
.com/docs/messenger-platform/messenger-profile/get-started-button>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload_data = {}
payload_data['get_started'] = {'payload': payload}
data = json.dumps(payload_data)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_domain_whitelist(whitelisted_domains):
""" Sets the whistelisted domains for the Messenger Extension
:usage:
>>> # Define the array of domains to be whitelisted (Max 10)
>>> whistelisted_domains = [
"https://myfirst_domain.com",
"https://another_domain.com"
]
>>> fbbotw.post_domain_whitelist(
whitelisted_domains=whitelisted_domains
)
:param list whistelisted_domains: Domains to be whistelisted (Max 10).
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/domain-whitelisting <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/domain-whitelisting>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['whitelisted_domains'] = whitelisted_domains
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def post_account_linking_url(account_linking_url):
""" Sets the **liking_url** to connect the user with your business login
:usage:
>>> my_callback_linking_url = "https://my_business_callback.com"
>>> response = fbbotw.post_account_linking_url(
account_linking_url=my_callback_linking_url
)
:param str account_linking_url: URL to the account linking OAuth flow.
:return: `Response object <http://docs.python-requests.org/en/\
master/api/#requests.Response>`_
:facebook docs: `/account-linking-url <https://developers.facebook.\
com/docs/messenger-platform/messenger-profile/account-linking-url>`_
"""
url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
payload = {}
payload['account_linking_url'] = account_linking_url
data = json.dumps(payload)
status = requests.post(url, headers=HEADER, data=data)
return status
def response(cls, r):
"""Extract the data from the API response *r*.
This method essentially strips the actual response of the envelope
while raising an :class:`~errors.ApiError` if it contains one or more
errors.
:param r: the HTTP response from an API call
:type r: :class:`requests.Response`
:returns: API response data
:rtype: json
"""
try:
data = r.json()
except ValueError:
raise errors.ApiError(r)
if data['meta'].get("errors"):
raise errors.ApiError(data['meta'])
return data["response"]
def post(self, endpoint, query=None, body=None, data=None):
"""
Send a POST request to armada. If both body and data are specified,
body will will be used.
:param string endpoint: URL string following hostname and API prefix
:param dict query: dict of k, v parameters to add to the query string
:param string body: string to use as the request body.
:param data: Something json.dumps(s) can serialize.
:return: A requests.Response object
"""
api_url = '{}{}'.format(self.base_url, endpoint)
self.logger.debug("Sending POST with armada_client session")
if body is not None:
self.logger.debug("Sending POST with explicit body: \n%s" % body)
resp = self._session.post(
api_url, params=query, data=body, timeout=3600)
else:
self.logger.debug("Sending POST with JSON body: \n%s" % str(data))
resp = self._session.post(
api_url, params=query, json=data, timeout=3600)
return resp
def post(self, endpoint, query=None, body=None, data=None):
"""
Send a POST request to Drydock. If both body and data are specified,
body will will be used.
:param string endpoint: The URL string following the hostname and API prefix
:param dict query: A dict of k, v parameters to add to the query string
:param string body: A string to use as the request body. Will be treated as raw
:param data: Something json.dumps(s) can serialize. Result will be used as the request body
:return: A requests.Response object
"""
self.logger.debug("Sending POST with drydock_client session")
if body is not None:
self.logger.debug("Sending POST with explicit body: \n%s" % body)
resp = self.__session.post(
self.base_url + endpoint, params=query, data=body, timeout=10)
else:
self.logger.debug("Sending POST with JSON body: \n%s" % str(data))
resp = self.__session.post(
self.base_url + endpoint, params=query, json=data, timeout=10)
return resp
def test_store_companies_house_profile_in_session_saves_in_session(
mock_retrieve_profile, client
):
data = {
'date_of_creation': '2000-10-10',
'company_name': 'Example corp',
'company_status': 'active',
'company_number': '01234567',
'registered_office_address': {'foo': 'bar'}
}
response = Response()
response.status_code = http.client.OK
response.json = lambda: data
session = client.session
mock_retrieve_profile.return_value = response
helpers.store_companies_house_profile_in_session(session, '01234567')
mock_retrieve_profile.assert_called_once_with(number='01234567')
assert session[helpers.COMPANIES_HOUSE_PROFILE_SESSION_KEY] == data
assert session.modified is True
def telljoke(self, ctx: commands.Context) -> None:
"""
Responds with a random joke from theoatmeal.com
"""
# TODO: get more joke formats (or a better source)
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://theoatmeal.com/djtaf/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
joke_ul = soup.find(id='joke_0')
joke = joke_ul.find('h2', {'class': 'part1'}).text.lstrip() + '\n' + joke_ul.find(id='part2_0').text
await self.bot.say(joke)
def randomfact(self, ctx: commands.Context) -> None:
"""
Responds with a random fact scraped from unkno.com
"""
# Send typing as the request can take some time.
await self.bot.send_typing(ctx.message.channel)
# Build a new request object
req: grequests.AsyncRequest = grequests.get('http://www.unkno.com/', timeout=1)
# Send new request
res: List[requests.Response] = grequests.map([req], exception_handler=self.request_exception_handler)
# Since we only have one request we can just fetch the first one.
# res[0].content is the html in bytes
soup = BeautifulSoup(res[0].content.decode(res[0].encoding), 'html.parser')
await self.bot.say(soup.find(id='content').text)
def test_push_pact(mock_put):
broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL)
mocked_response = Response()
mocked_response.status_code = CREATED
mock_put.return_value = mocked_response
with open(PACT_FILE_PATH) as data_file:
pact = json.load(data_file)
broker_client.push_pact(
provider=PROVIDER,
consumer=CONSUMER,
pact_file=PACT_FILE_PATH,
consumer_version=CONSUMER_VERSION
)[0]
mock_put.assert_called_once_with(
EXPECTED_PUSH_PACT_URL,
json=pact,
auth=None
)
def test_tag_consumer(mock_put):
broker_client = BrokerClient(broker_url=settings.PACT_BROKER_URL)
mocked_response = Response()
mocked_response.status_code = CREATED
mock_put.return_value = mocked_response
broker_client.tag_consumer(
provider=PROVIDER,
consumer=CONSUMER,
consumer_version=CONSUMER_VERSION,
tag=TAG
)[0]
mock_put.assert_called_once_with(
EXPECTED_TAG_CONSUMER_URL,
headers={'Content-Type': 'application/json'},
auth=None
)
def send(self, req, **kwargs):
resp = requests.Response()
try:
resp.raw = urlopen(req.url)
content_length = resp.raw.headers.get('content-length')
if content_length is not None:
resp.headers['Content-Length'] = content_length
except IOError as e:
if e.errno == errno.EACCES:
resp.status_code = requests.codes.forbidden
elif e.errno == errno.ENOENT:
resp.status_code = requests.codes.not_found
else:
resp.status_code = requests.codes.bad_request
except OSError:
resp.status_code = requests.codes.bad_request
else:
resp.status_code = requests.codes.ok
return resp
def test_it(self):
import sys
import mock
import requests
from ci_diff_helper import _github
response = requests.Response()
remaining = '17'
response.headers[_github._RATE_REMAINING_HEADER] = remaining
rate_limit = '60'
response.headers[_github._RATE_LIMIT_HEADER] = rate_limit
rate_reset = '1475953149'
response.headers[_github._RATE_RESET_HEADER] = rate_reset
with mock.patch('six.print_') as mocked:
self._call_function_under_test(response)
msg = _github._RATE_LIMIT_TEMPLATE.format(
remaining, rate_limit, rate_reset)
self.assertEqual(mocked.call_count, 2)
mocked.assert_any_call(msg, file=sys.stderr)
mocked.assert_any_call(_github._GH_ENV_VAR_MSG,
file=sys.stderr)
def test_match_not_found(self):
"""Check if the server send an empty match if not found."""
self.service.cache_manager.find_match.return_value = None
self.service.riot_api_handler.get_match.side_effect = LoLException(
"404", requests.Response())
response = self.stub.Match(service_pb2.MatchRequest(id=4242,
region=constants_pb2.EUW))
self.assertEqual(response, match_pb2.MatchReference())
def test_request_api_server_authorized(self, options, config, request):
driver = self._get_driver(options, config)
driver._apiinsecure = True
response = requests.Response()
response.status_code = requests.codes.ok
request.return_value = response
url = "/URL"
result = driver._request_api_server(url)
self.assertEqual(response, result)
request.assert_called_with(url, data=None, headers=mock.ANY,
verify=False)
def test_request_api_server_auth_failed(self, options, config, request):
driver = self._get_driver(options, config)
driver._apiinsecure = False
driver._use_api_certs = False
response = requests.Response()
response.status_code = requests.codes.unauthorized
request.return_value = response
url = "/URL"
self.assertRaises(RuntimeError, driver._request_api_server, url)
def test_request_api_server_authn(self, options, config, request):
driver = self._get_driver(options, config)
driver._apiinsecure = True
response = requests.Response()
response.status_code = requests.codes.ok
request.return_value = response
url = "/URL"
result = driver._request_api_server_authn(url)
self.assertEqual(response, result)
request.assert_called_with(url, data=None, headers=mock.ANY,
verify=False)
def test_vnc_api_is_authenticated_unauthorized(self, config, request):
config.APISERVER = mock.MagicMock()
config.APISERVER.use_ssl = False
config.APISERVER.api_server_ip = "localhost"
config.APISERVER.api_server_port = "8082"
response = requests.Response()
response.status_code = requests.codes.unauthorized
request.return_value = response
result = utils.vnc_api_is_authenticated()
request.assert_called_with(mock.ANY)
self.assertEqual(True, result)
def test_vnc_api_is_authenticated_invalid(self, config, request):
config.APISERVER = mock.MagicMock()
config.APISERVER.use_ssl = True
config.APISERVER.api_server_ip = "localhost"
config.APISERVER.api_server_port = "8082"
response = requests.Response()
response.status_code = requests.codes.server_error
request.return_value = response
self.assertRaises(requests.exceptions.HTTPError,
utils.vnc_api_is_authenticated)
request.assert_called_with(mock.ANY)
def process(self, http_response):
# type: (requests.Response) -> PrestoStatus
if not http_response.ok:
self.raise_response_error(http_response)
http_response.encoding = 'utf-8'
response = http_response.json()
logger.debug('HTTP {}: {}'.format(http_response.status_code, response))
if 'error' in response:
raise self._process_error(response['error'])
if constants.HEADER_CLEAR_SESSION in http_response.headers:
for prop in get_header_values(
response.headers,
constants.HEADER_CLEAR_SESSION,
):
self._client_session.properties.pop(prop, None)
if constants.HEADER_SET_SESSION in http_response.headers:
for key, value in get_session_property_values(
response.headers,
constants.HEADER_SET_SESSION,
):
self._client_session.properties[key] = value
self._next_uri = response.get('nextUri')
return PrestoStatus(
id=response['id'],
stats=response['stats'],
info_uri=response['infoUri'],
next_uri=self._next_uri,
rows=response.get('data', []),
columns=response.get('columns'),
)