Python requests 模块,utils() 实例源码
我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用requests.utils()。
def loadCartAndCheckout(self):
#Import Cookies
driver = webdriver.Chrome(executable_path="./chromedriver")
driver.delete_all_cookies()
driver.get(self.URL_cart)
cookies = requests.utils.dict_from_cookiejar(self.user_session.cookies)
for cookie in cookies.items():
cookie_dict = {'name': '',
'value': '',
'path': '/'}
cookie_dict['name'] = cookie[0]
cookie_dict['value'] = cookie[1]
driver.add_cookie(cookie_dict)
driver.get(self.URL_cart)
#time.sleep(5)
#driver.quit()
def acquire_authentication_cookie(self, options):
"""Retrieve SPO auth cookie"""
logger = self.logger(self.acquire_authentication_cookie.__name__)
url = options['endpoint']
session = requests.session()
logger.debug_secrets("session: %s\nsession.post(%s, data=%s)", session, url, self.token)
session.post(url, data=self.token, headers={'Content-Type': 'application/x-www-form-urlencoded'})
logger.debug_secrets("session.cookies: %s", session.cookies)
cookies = requests.utils.dict_from_cookiejar(session.cookies)
logger.debug_secrets("cookies: %s", cookies)
if 'FedAuth' in cookies and 'rtFa' in cookies:
self.FedAuth = cookies['FedAuth']
self.rtFa = cookies['rtFa']
return True
self.error = "An error occurred while retrieving auth cookies"
logger.error(self.error)
return False
def load_session_from_file(self, username: str, filename: Optional[str] = None) -> None:
"""Internally stores :class:`requests.Session` object loaded from file.
If filename is None, the file with the default session path is loaded.
:raises FileNotFoundError: If the file does not exist.
"""
if filename is None:
filename = get_default_session_filename(username)
with open(filename, 'rb') as sessionfile:
session = requests.Session()
session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile))
session.headers.update(self._default_http_header())
session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']})
self._log("Loaded session from %s." % filename)
self.session = session
self.username = username
def authenticated(func):
def wrapper(self, *args, **kwargs):
success = False
# ??????cookie??, ???cookie????
if 'z_c0' in requests.utils.dict_from_cookiejar(self.cookies):
from ..url import URL
r = self._execute(method="get", url=URL.profile(user_slug="zhijun-liu"))
success = r.ok
while not success:
account = input("???Email??????:")
password = input("?????:")
obj = Account()
data = obj.login(account, password)
if data.get("r") == 0:
success = True
self.cookies = obj.cookies
else:
print(data.get("msg"))
else:
return func(self, *args, **kwargs)
return wrapper
def _search(self, limit, format):
'''
Returns a list of result objects, with the url for the next page bing search url.
'''
url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit),
self.current_offset, format)
r = requests.get(url, auth=("", self.api_key))
try:
json_results = r.json()
except ValueError as vE:
if not self.safe:
raise PyBingWebException("Request returned with code %s, error msg: %s" % (r.status_code, r.text))
else:
print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % (
r.status_code, r.text))
time.sleep(5)
packaged_results = [WebResult(single_result_json) for single_result_json in json_results['d']['results']]
self.current_offset += min(50, limit, len(packaged_results))
return packaged_results
def _search(self, limit, format):
'''
Returns a list of result objects, with the url for the next page bing search url.
'''
url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit),
self.current_offset, format)
r = requests.get(url, auth=("", self.api_key))
try:
json_results = r.json()
except ValueError as vE:
if not self.safe:
raise PyBingVideoException("Request returned with code %s, error msg: %s" % (r.status_code, r.text))
else:
print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % (
r.status_code, r.text))
time.sleep(5)
packaged_results = [VideoResult(single_result_json) for single_result_json in json_results['d']['results']]
self.current_offset += min(50, limit, len(packaged_results))
return packaged_results
def _search(self, limit, format):
'''
Returns a list of result objects, with the url for the next page bing search url.
'''
url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit),
self.current_offset, format)
r = requests.get(url, auth=("", self.api_key))
try:
json_results = r.json()
except ValueError as vE:
if not self.safe:
raise PyBingNewsException("Request returned with code %s, error msg: %s" % (r.status_code, r.text))
else:
print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % (
r.status_code, r.text))
time.sleep(5)
packaged_results = [NewsResult(single_result_json) for single_result_json in json_results['d']['results']]
self.current_offset += min(50, limit, len(packaged_results))
return packaged_results
def __init__(self, app, disable_background_sync):
self.user_agent = 'Boartty/%s %s' % (boartty.version.version_info.release_string(),
requests.utils.default_user_agent())
self.version = (0, 0, 0)
self.offline = False
self.app = app
self.log = logging.getLogger('boartty.sync')
self.queue = MultiQueue([HIGH_PRIORITY, NORMAL_PRIORITY, LOW_PRIORITY])
self.result_queue = queue.Queue()
self.session = requests.Session()
self.token = 'Bearer %s' % (self.app.config.token)
self.submitTask(GetVersionTask(HIGH_PRIORITY))
self.submitTask(SyncOwnUserTask(HIGH_PRIORITY))
if not disable_background_sync:
self.submitTask(UpdateStoriesTask(HIGH_PRIORITY))
self.submitTask(SyncProjectListTask(HIGH_PRIORITY))
self.submitTask(SyncUserListTask(HIGH_PRIORITY))
self.submitTask(SyncProjectSubscriptionsTask(NORMAL_PRIORITY))
self.submitTask(SyncSubscribedProjectsTask(NORMAL_PRIORITY))
self.submitTask(SyncBoardsTask(NORMAL_PRIORITY))
self.submitTask(SyncWorklistsTask(NORMAL_PRIORITY))
#self.submitTask(SyncSubscribedProjectBranchesTask(LOW_PRIORITY))
#self.submitTask(SyncOutdatedChangesTask(LOW_PRIORITY))
#self.submitTask(PruneDatabaseTask(self.app.config.expire_age, LOW_PRIORITY))
self.periodic_thread = threading.Thread(target=self.periodicSync)
self.periodic_thread.daemon = True
self.periodic_thread.start()
def copy_session(session: requests.Session) -> requests.Session:
"""Duplicates a requests.Session."""
new = requests.Session()
new.cookies = \
requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
new.headers = session.headers.copy()
return new
def save_session_to_file(self, filename: Optional[str] = None) -> None:
"""Saves internally stored :class:`requests.Session` object."""
if filename is None:
filename = get_default_session_filename(self.username)
dirname = os.path.dirname(filename)
if dirname != '' and not os.path.exists(dirname):
os.makedirs(dirname)
os.chmod(dirname, 0o700)
with open(filename, 'wb') as sessionfile:
os.chmod(filename, 0o600)
pickle.dump(requests.utils.dict_from_cookiejar(self.session.cookies), sessionfile)
self._log("Saved session to %s." % filename)
def load_cookies(self, path):
with open(path, 'rb') as f:
self.session.cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
def save_cookies(self, path):
with open(path, 'wb') as f:
cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies)
pickle.dump(cookies_dic, f)
def login(ctx, url, username, password):
"""Performs login on the remote server at the specified URL."""
login_url = urljoin(url, "/hub/login")
payload = {"username": username, "password": password}
# Unfortunately, jupyterhub handles the afterlogin with an immediate
# redirection, meaning that we have to check for a 302 and prevent
# redirection in order to capture the cookies.
try:
response = requests.post(login_url, payload, verify=False,
allow_redirects=False)
except Exception as e:
print("Could not perform request. {}".format(e), file=sys.stderr)
sys.exit(1)
if response.status_code == 302:
cookies_dict = requests.utils.dict_from_cookiejar(response.cookies)
cred = Credentials(url, username, cookies_dict)
cred.write(ctx.obj.credentials_file)
else:
print("Failed to perform login. Server replied with error: {}".format(
response.status_code), file=sys.stderr)
sys.exit(1)
# -------------------------------------------------------------------------
def test4():
from requests.utils import get_netrc_auth
url = "http://www.126.com"
print get_netrc_auth(url)
def _search(self, limit, format):
'''
Returns a list of result objects, with the url for the next page bing search url.
Image filters:
Array of strings that filter the response the API sends based on size, aspect, color, style, face or
any combination thereof. Valid values are: Size:Small, Size:Medium, Size:Large, Size:Width:[Width],
Size:Height:[Height], Aspect:Square, Aspect:Wide, Aspect:Tall, Color:Color, Color:Monochrome, Style:Photo,
Style:Graphics, Face:Face, Face:Portrait, Face:Other.
'''
url = self.QUERY_URL.format(requests.utils.quote("'{}'".format(self.query)), min(50, limit),
self.current_offset, format,
requests.utils.quote("'{}'".format(self.image_filters)))
r = requests.get(url, auth=("", self.api_key))
try:
json_results = r.json()
except ValueError as vE:
if not self.safe:
raise PyBingImageException("Request returned with code %s, error msg: %s" % (r.status_code, r.text))
else:
print ("[ERROR] Request returned with code %s, error msg: %s. \nContinuing in 5 seconds." % (
r.status_code, r.text))
time.sleep(5)
packaged_results = [ImageResult(single_result_json) for single_result_json in json_results['d']['results']]
self.current_offset += min(50, limit, len(packaged_results))
return packaged_results
def load_cookies(self, path):
with open(path, 'rb') as f:
self.session.cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
def save_cookies(self, path):
with open(path, 'wb') as f:
cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies)
pickle.dump(cookies_dic, f)
# ??????????
def load_cookies(self, path):
with open(path, 'rb') as f:
self.session.cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
def save_cookies(self, path):
with open(path, 'wb') as f:
cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies)
pickle.dump(cookies_dic, f)
#??????
def load_cookies(self, path):
with open(path, 'rb') as f:
self.session.cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
def load_cookies(self, path):
with open(path, 'rb') as f:
self.session.cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
def save_cookies(self, path):
with open(path, 'wb') as f:
cookies_dic = requests.utils.dict_from_cookiejar(self.session.cookies)
pickle.dump(cookies_dic, f)
# ??????????
def login():
#cf = open('.cookie','r')
if os.path.exists(cookie_file_name):
cf = open(cookie_file_name,'r')
cookies = json.load(cf)
s.cookies.update(cookies)
logging.info("Load cookies from cookie file: " + str(cookies))
r = s.get(website+"/user/login",headers = headers)
print("Old cookies:" + str(r.headers))
else:
user = config.get('user','id')
password = config.get('user','password')
logging.info("Login as " + user)
url = website + '/User/Login/ajaxLogin'
payload = 'account=%s&password=%s&from=loginpage&remember=0&url_back='%(user, password)
r = s.post(url, headers=headers, data=payload)
cookies = requests.utils.dict_from_cookiejar(r.cookies)
logging.info("Login cookie " + str(cookies))
print("New Cookies:" + str(cookies))
with open(cookie_file_name,'w') as cf:
json.dump(cookies, cf)
def test_html_charset(self):
"""HTML5 meta charset attribute"""
content = '<meta charset="UTF-8">'
encodings = requests.utils.get_encodings_from_content(content)
assert len(encodings) == 1
assert encodings[0] == 'UTF-8'
def test_html4_pragma(self):
"""HTML4 pragma directive"""
content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">'
encodings = requests.utils.get_encodings_from_content(content)
assert len(encodings) == 1
assert encodings[0] == 'UTF-8'
def test_xhtml_pragma(self):
"""XHTML 1.x served with text/html MIME type"""
content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />'
encodings = requests.utils.get_encodings_from_content(content)
assert len(encodings) == 1
assert encodings[0] == 'UTF-8'
def test_xml(self):
"""XHTML 1.x served as XML"""
content = '<?xml version="1.0" encoding="UTF-8"?>'
encodings = requests.utils.get_encodings_from_content(content)
assert len(encodings) == 1
assert encodings[0] == 'UTF-8'
def test_precedence(self):
content = '''
<?xml version="1.0" encoding="XML"?>
<meta charset="HTML5">
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
'''.strip()
encodings = requests.utils.get_encodings_from_content(content)
assert encodings == ['HTML5', 'HTML4', 'XML']
def test_super_len_correctly_calculates_len_of_partially_read_file(self):
"""Ensure that we handle partially consumed file like objects."""
from requests.utils import super_len
s = StringIO.StringIO()
s.write('foobarbogus')
assert super_len(s) == 0
def test_get_environ_proxies_ip_ranges(self):
"""Ensures that IP addresses are correctly matches with ranges
in no_proxy variable."""
from requests.utils import get_environ_proxies
os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
assert get_environ_proxies('http://192.168.0.1:5000/') == {}
assert get_environ_proxies('http://192.168.0.1/') == {}
assert get_environ_proxies('http://172.16.1.1/') == {}
assert get_environ_proxies('http://172.16.1.1:5000/') == {}
assert get_environ_proxies('http://192.168.1.1:5000/') != {}
assert get_environ_proxies('http://192.168.1.1/') != {}
def test_get_environ_proxies(self):
"""Ensures that IP addresses are correctly matches with ranges
in no_proxy variable."""
from requests.utils import get_environ_proxies
os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
assert get_environ_proxies(
'http://localhost.localdomain:5000/v1.0/') == {}
assert get_environ_proxies('http://www.requests.com/') != {}
def test_guess_filename_when_int(self):
from requests.utils import guess_filename
assert None is guess_filename(1)
def test_guess_filename_with_file_like_obj(self):
from requests.utils import guess_filename
from requests import compat
fake = type('Fake', (object,), {'name': b'value'})()
guessed_name = guess_filename(fake)
assert b'value' == guessed_name
assert isinstance(guessed_name, compat.bytes)
def test_guess_filename_with_unicode_name(self):
from requests.utils import guess_filename
from requests import compat
filename = b'value'.decode('utf-8')
fake = type('Fake', (object,), {'name': filename})()
guessed_name = guess_filename(fake)
assert filename == guessed_name
assert isinstance(guessed_name, compat.str)
def test_is_ipv4_address(self):
from requests.utils import is_ipv4_address
assert is_ipv4_address('8.8.8.8')
assert not is_ipv4_address('8.8.8.8.8')
assert not is_ipv4_address('localhost.localdomain')
def test_is_valid_cidr(self):
from requests.utils import is_valid_cidr
assert not is_valid_cidr('8.8.8.8')
assert is_valid_cidr('192.168.1.0/24')
def test_dotted_netmask(self):
from requests.utils import dotted_netmask
assert dotted_netmask(8) == '255.0.0.0'
assert dotted_netmask(24) == '255.255.255.0'
assert dotted_netmask(25) == '255.255.255.128'
def test_get_auth_from_url(self):
"""Ensures that username and password in well-encoded URI as per
RFC 3986 are correclty extracted."""
from requests.utils import get_auth_from_url
from requests.compat import quote
percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] "
url_address = "request.com/url.html#test"
url = "http://" + quote(
percent_encoding_test_chars, '') + ':' + quote(
percent_encoding_test_chars, '') + '@' + url_address
(username, password) = get_auth_from_url(url)
assert username == percent_encoding_test_chars
assert password == percent_encoding_test_chars
def test_requote_uri_with_unquoted_percents(self):
"""Ensure we handle unquoted percent signs in redirects.
See: https://github.com/kennethreitz/requests/issues/2356
"""
from requests.utils import requote_uri
bad_uri = 'http://example.com/fiz?buz=%ppicture'
quoted = 'http://example.com/fiz?buz=%25ppicture'
assert quoted == requote_uri(bad_uri)