Python requests 模块,options() 实例源码
我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用requests.options()。
def authorization(self,controller):
headers = {'token': self.token}
p = requests.options(self.base+"%s/" %(controller), headers=headers)
auth_json = json.loads(p.text)
if p.status_code != 200:
logging.error("phpipam.authorization: Failure %s" %(p.status_code))
logging.error(auth_json)
self.error = p.status_code
self.error_message=auth_json['message']
return self.error,self.error_message
if not auth_json['success']:
logging.error("phpipam.authorization: FAILURE: %s" %(auth_json['code']))
self.error=auth_json['code']
return self.error
logging.info("phpipam.authorization: success %s" %(auth_json['success']))
return auth_json['data']['methods']
def show_options(self, url, view):
"""Send options request to `url` and display results in pop-up.
"""
res = options(url, timeout=3)
if not res.ok:
return
names = ['Allow', 'Access-Control-Allow-Methods', 'Access-Control-Max-Age']
headers = [res.headers.get(name, None) for name in names]
items = '\n'.join('<li>{}: {}</li>'.format(n, h) for n, h in zip(names, headers) if h)
content = '<h2>OPTIONS: {}</h2>\n<ul>{}</ul>'.format(url, items)
try:
json_dict = res.json()
except:
pass
else:
content = '{}\n<pre><code>{}</pre></code>'.format(
content, json.dumps(json_dict, sort_keys=True, indent=2, separators=(',', ': '))
)
view.show_popup(content, max_width=700, max_height=500)
def test_can_support_custom_cors(smoke_test_app):
response = requests.get(smoke_test_app.url + '/custom_cors')
response.raise_for_status()
expected_allow_origin = 'https://foo.example.com'
assert response.headers[
'Access-Control-Allow-Origin'] == expected_allow_origin
# Should also have injected an OPTIONs request.
response = requests.options(smoke_test_app.url + '/custom_cors')
response.raise_for_status()
headers = response.headers
assert headers['Access-Control-Allow-Origin'] == expected_allow_origin
assert headers['Access-Control-Allow-Headers'] == (
'Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token,'
'X-Api-Key,X-Special-Header')
_assert_contains_access_control_allow_methods(
headers, ['GET', 'POST', 'PUT', 'OPTIONS'])
assert headers['Access-Control-Max-Age'] == '600'
assert headers['Access-Control-Expose-Headers'] == 'X-Special-Header'
assert headers['Access-Control-Allow-Credentials'] == 'true'
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None):
if method not in ["get", "post", "put", "delete", "head", "options"]:
raise Exception("Not supported method: %s" % method)
_cookie_obj = cookie
_response = None
if header is not "":
_request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url,
request_body,
header)
else:
_request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body)
exec _request_api_string
return _response
def test_cors_preflight_allowed(self):
"""Test cross origin resource sharing preflight (OPTIONS) request."""
headers = {
const.HTTP_HEADER_ORIGIN: HTTP_BASE_URL,
'Access-Control-Request-Method': 'GET',
'Access-Control-Request-Headers': 'x-ha-access'
}
req = requests.options(_url(const.URL_API), headers=headers)
allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN
allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS
assert req.status_code == 200
assert req.headers.get(allow_origin) == HTTP_BASE_URL
assert req.headers.get(allow_headers) == \
const.HTTP_HEADER_HA_AUTH.upper()
def exploit(request, response, method, key, is_array=False):
if config.dbconn().fetch_rows('result', condition="exploit='%s' and result != 'continue' and `host`='%s'" % (os.path.basename(__file__)[:-3], request['host']), order="id asc", limit="1", fetchone=True): return
allow = requests.options(request['uri']).headers.get('Allow', '')
if allow.find('PUT') != -1 or allow.find('PATCH') != -1:
return {'result': 'vul', 'info': "Server support put/patch method", 'hash': None, 'level': "middle"}
else:
return {'result': 'safe', 'info': "Server does not support put/patch method", 'hash': None, 'level': "middle"}
def test_should_respond_to_any_options(self):
server.on('OPTIONS').status(200)
res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'})
self.assertEqual(res.status_code, 200)
def test_should_always_respond_to_matching_queries(self):
server.always('OPTIONS').status(200)
res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'})
self.assertEqual(res.status_code, 200)
res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'})
self.assertEqual(res.status_code, 200)
def test_should_reset_always_rules(self):
server.always('OPTIONS').status(200)
server.reset()
res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'})
self.assertEqual(res.status_code, 500)
def __accept_post_options(self, inbox, **kwargs):
r = requests.options(inbox, **kwargs)
if r.status_code == requests.codes.ok and 'accept-post' in r.headers:
if self.JSON_LD in r.headers['accept-post']:
return self.JSON_LD
for content_type in r.headers['accept-post'].split(','):
return self.content_type_to_mime_type(content_type)
def test_can_accept_options_request(config, sample_app, local_server_factory):
local_server, port = local_server_factory(sample_app, config)
response = local_server.make_call(requests.options, '/test-cors', port)
assert response.headers['Content-Length'] == '0'
assert response.headers['Access-Control-Allow-Methods'] == 'POST,OPTIONS'
assert response.text == ''
def test_can_accept_multiple_options_request(config, sample_app,
local_server_factory):
local_server, port = local_server_factory(sample_app, config)
response = local_server.make_call(requests.options, '/test-cors', port)
assert response.headers['Content-Length'] == '0'
assert response.headers['Access-Control-Allow-Methods'] == 'POST,OPTIONS'
assert response.text == ''
response = local_server.make_call(requests.options, '/test-cors', port)
assert response.headers['Content-Length'] == '0'
assert response.headers['Access-Control-Allow-Methods'] == 'POST,OPTIONS'
assert response.text == ''
def test_can_support_cors(smoke_test_app):
response = requests.get(smoke_test_app.url + '/cors')
response.raise_for_status()
assert response.headers['Access-Control-Allow-Origin'] == '*'
# Should also have injected an OPTIONs request.
response = requests.options(smoke_test_app.url + '/cors')
response.raise_for_status()
headers = response.headers
assert headers['Access-Control-Allow-Origin'] == '*'
assert headers['Access-Control-Allow-Headers'] == (
'Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token,'
'X-Api-Key')
_assert_contains_access_control_allow_methods(
headers, ['GET', 'POST', 'PUT', 'OPTIONS'])
def is_cross_origin_accessible(path, origin='http://example.org'):
"""
Returns True if the path is accessible at the given origin
(default: example.org).
Raises AssertionError otherwise.
"""
response = requests.options(path, headers={'Origin': origin})
assert response.status_code == 200
assert is_allowed_origin(origin, response)
return True
def do_options(self):
logger.debug(self.neuron_name + " do_options method called")
r = requests.options(url=self.url, **self.parameters)
self.post_processing_request(r)
def test_OPTIONS_CORS_headers_valid_origin(self):
# before sending a POST, the browser will send an OPTION request as a preflight to see the CORS headers.
# the backend will only return the required CORS headers, if the Origin is set to a allowed domain.
post_payload = create_post_payload()
valid_origin = 'http://testdomain.com'
preflight_response = requests.options(url=COMMENT_SIDECAR_URL, json=post_payload, headers={'Origin': valid_origin})
assert_cors_headers_exists(preflight_response, valid_origin)
assert_that(preflight_response.text).is_empty()
assert_that(get_comments().json())\
.described_as("No comment should have been created after an OPTIONS request")\
.is_empty()
def test_OPTIONS_CORS_headers_invalid_origin(self):
post_payload = create_post_payload()
valid_origin = 'http://invalid.com'
preflight_response = requests.options(url=COMMENT_SIDECAR_URL, json=post_payload, headers={'Origin': valid_origin})
assert_cors_headers_doesnt_exists(preflight_response)
assert_that(preflight_response.text).is_empty()
assert_that(get_comments().json()) \
.described_as("No comment should have been created after an OPTIONS request") \
.is_empty()
def get(env) -> "(url -- response_object)":
"Returns a request object that represents a OPTIONS request to the given url"
url = env.stack.pop().val
req = wrap_req(requests.options(url))
env.stack.push(req)
def get_site_method(self):
try:
req_headers = dict(requests.options('http://'+self.url).headers)
if req_headers.has_key('Allow'):return {"Method":req_headers["Allow"]}
else:return {"Method":"NO Information"}
except:return {"Method":"NO Information"}
def requests_method(self):
# ????????????????????
httpmthdcmbbx = ui.httpmthdcmbbx
httpmthd_1 = httpmthdcmbbx.currentText()
httpmthdlnedt = ui.httpmthdlnedt
httpmthd_2 = httpmthdlnedt.text()
dlwthrspnscmbbx = ui.dlwthrspnscmbbx
httpmthd_3 = dlwthrspnscmbbx.currentText()
global r
# ?????
if httpmthd_1 == "get":
r = requests.get(self.url, httpmthd_2)
elif httpmthd_1 == "post":
r = requests.post(self.url, httpmthd_2)
elif httpmthd_1 == "put":
r = requests.put(self.url, httpmthd_2)
elif httpmthd_1 == "delete":
r = requests.delete(self.url, httpmthd_2)
elif httpmthd_1 == "head":
r = requests.head(self.url)
else:
r = requests.options(self.url, httpmthd_2)
# ???????
if httpmthd_3 == "????????":
print(r.text)
elif httpmthd_3 == "?????????":
print(r.content)
elif httpmthd_3 == "??json????":
print(r.json)
else:
pass
messages.append(r.text)
ui.showmssg.setText('\n'.join(messages))
print(httpmthd_1)
print(httpmthd_2)
def test_preflight(testserver):
r = requests.options(testserver)
assert r.status_code == 200
assert r.headers.get('Access-Control-Allow-Origin') == '*'
def options(self, uri, **kwargs): # pragma: no cover
if 'timeout' not in kwargs:
kwargs['timeout'] = self._timeout()
try:
url = self._base_url() + uri
self._log("OPTIONS", url, **kwargs)
return requests.options(url, **kwargs)
except rexc.Timeout as e: # pragma: no cover
raise TimeoutException(-1, e)
except rexc.ConnectionError as e: # pragma: no cover
raise ApiUnavailableException(-1, e)
def do_OPTIONS(self):
self.data_bytes = None
self.method = requests.options
self.forward('OPTIONS')
def do_OPTIONS(self):
self.method = requests.options
self.forward('OPTIONS')
def test_options_request(self):
# OPTIONS?????????
r = requests.options('http://127.0.0.1:8080')
expected = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'POST'
}
for name, value in expected.items():
self.assertTrue(name in r.headers)
self.assertEqual(value, r.headers[name])
def options(self, headers=None):
"""
Method for making a HTTP OPTIONS request on resource's endpoint.
"""
r = requests.options(self.endpoint, headers=headers, auth=self.auth)
return r
def assembleRequest(data, method, URL, safe, headerDict, proxyDict, verbose):
# connect to the API endpoint, and send the data that was given
# note the response variable will hold all of the information received back from the server
try:
if method == "GET":
response = requests.get(URL, data, headers=headerDict,proxies=proxyDict, verify=safe)
elif method == "POST":
response = requests.post(URL, data, headers=headerDict, verify=safe, proxies=proxyDict)
elif method == "PUT":
response = requests.put(URL, data, headers=headerDict, verify=safe, proxies=proxyDict)
elif method == "DELETE":
response = requests.delete(URL, headers=headerDict, verify=safe, proxies=proxyDict)
elif method == "OPTIONS":
response = requests.options(URL, verify=safe, proxies=proxyDict)
elif method == "HEAD":
response = requests.head(URL, verify=safe, proxies=proxyDict)
else:
print "method not currently supported"
print "current methods: GET, POST, PUT, DELETE, OPTIONS, HEAD"
sys.exit()
print ""
# print response code (i.e. 404, 500, 200)
print response.status_code
if verbose:
# response headers are saved as a dictionary. Loop through the dictionary and print information.
for i in response.headers:
print i + ":", response.headers[i]
print ""
# output response body
print response.content
else:
pass
print ""
print ""
except requests.exceptions.Timeout:
print "Connection timeout - make sure a WAF or firewall rule is not blocking traffic."
except requests.exceptions.TooManyRedirects:
print "Too many redirects - URL may be bad"
except requests.exceptions.SSLError:
print "could not verify HTTPS certificate."
print 'pURL tries to verify HTTPS certificates. If using a proxy try "--unsafe"'
except requests.exceptions.RequestException as e:
print e
except Exception, e:
print "catastrophic failure see below for details"
print e
sys.exit(1)
def send_http_request(self, url, method, request_body="", header="", cookie=None):
try:
if method.lower() == "get":
if header is not "":
_response = requests.get(url, data=request_body, headers=eval(header), cookies=cookie)
return _response
else:
_response = requests.get(url, data=request_body, cookies=cookie)
return _response
elif method.lower() == "post":
if header is not "":
_response = requests.post(url, data=request_body, headers=eval(header), cookies=cookie)
return _response
else:
_response = requests.post(url, data=request_body, cookies=cookie)
return _response
elif method.lower() == "put":
if header is not "":
_response = requests.put(url, headers=eval(header), data=request_body, cookies=cookie)
return _response
else:
_response = requests.put(url, data=request_body, cookies=cookie)
return _response
elif method.lower() == "delete":
if header is not "":
_response = requests.delete(url, headers=eval(header), data=request_body, cookies=cookie)
return _response
else:
_response = requests.delete(url, data=request_body, cookies=cookie)
return _response
elif method.lower() == "head":
if header is not "":
_response = requests.head(url, headers=eval(header), data=request_body, cookies=cookie)
return _response
else:
_response = requests.head(url, data=request_body, cookies=cookie)
return _response
elif method.lower() == "options":
if header is not "":
_response = requests.options(url, headers=eval(header), data=request_body, cookies=cookie)
return _response
else:
_response = requests.options(url, data=request_body, cookies=cookie)
return _response
else:
raise Exception("Not supported method: %s" % method)
except Exception, ex:
raise HTTPRequestException(ex.message)