Python requests 模块,Session() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.Session()。
def get_cookie(account, password):
s = requests.Session()
payload = {
'login_uid1': account,
'login_pwd1': password,
'agreeRule': "1",
'loginsubmit': "??",
# 'redirect_to': "http://www.creprice.cn",
# 'testcookie': "1"
}
response = s.post(login_url, data=payload, allow_redirects=False)
cookies = response.cookies.get_dict()
logger.warning("get cookie success!!!(account is:%s)" % account)
return json.dumps(cookies)
# ?Cookies??Redis???
def upload_prediction(self, file_path):
filename, signedRequest, headers, status_code = self.authorize(file_path)
if status_code!=200:
return status_code
dataset_id, comp_id, status_code = self.get_current_competition()
if status_code!=200:
return status_code
with open(file_path, 'rb') as fp:
r = requests.Request('PUT', signedRequest, data=fp.read())
prepped = r.prepare()
s = requests.Session()
resp = s.send(prepped)
if resp.status_code!=200:
return resp.status_code
r = requests.post(self._submissions_url,
data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
headers=headers)
return r.status_code
def prepareLogin(self):
self.clientid = 53999199
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:27.0) Gecko/20100101 Firefox/27.0',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
})
self.urlGet(
'https://ui.ptlogin2.qq.com/cgi-bin/login?daid=164&target=self&style=16&mibao_css=m_webqq&' + \
'appid=501004106&enable_qlogin=0&no_verifyimg=1&s_url=http%3A%2F%2Fw.qq.com%2Fproxy.html&' + \
'f_url=loginerroralert&strong_login=1&login_state=10&t=20131024001'
)
self.session.cookies.update(dict(
RK='OfeLBai4FB', ptcz='ad3bf14f9da2738e09e498bfeb93dd9da7540dea2b7a71acfb97ed4d3da4e277',
pgv_pvi='911366144', pgv_info='ssid pgv_pvid=1051433466',
qrsig='hJ9GvNx*oIvLjP5I5dQ19KPa3zwxNI62eALLO*g2JLbKPYsZIRsnbJIxNe74NzQQ'
))
self.getAuthStatus()
self.session.cookies.pop('qrsig')
def hltb(bot,trigger):
if not trigger.group(2):
return bot.say("Enter a game name to search.")
game = trigger.group(2)
url = "http://howlongtobeat.com/search_main.php?page=1"
payload = {"queryString":game,"t":"games","sorthead":"popular","sortd":"Normal Order","length_type":"main","detail":"0"}
test = {'Content-type':'application/x-www-form-urlencoded', 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.97 Safari/537.36','origin':'https://howlongtobeat.com','referer':'https://howlongtobeat.com'}
session = requests.Session()
session.post(url, headers=test, data=payload)
r = session.post(url, headers=test, data=payload)
if len(r.content) < 250:
return bot.say("No results.")
bs = BeautifulSoup(r.content)
first = bs.findAll("div", {"class":"search_list_details"})[0]
name = first.a.text
time = first.findAll('div')[3].text
bot.say('{} - {}'.format(name, time))
def AkamaiEdgeGridConfig_Setup(config_file, section):
config_file = os.path.expanduser(config_file)
if debug: print "DEBUG: config_file", config_file
#Currently unused.
required_options = ['client_token','client_secret','host','access_token']
EdgeGridConfig = {}
if os.path.isfile(config_file):
config = ConfigParser.ConfigParser()
config.readfp(open(config_file))
for key, value in config.items(section):
# ConfigParser lowercases magically
EdgeGridConfig[key] = value
else:
print "Missing configuration file. Run python gen_creds.py to get your credentials file set up once you've provisioned credentials in LUNA."
exit()
EdgeGridConfig['host'] = '%s://%s' % ('https', EdgeGridConfig['host'])
if debug: print EdgeGridConfig
return EdgeGridConfig
#Setup a EdgeGrid Session using the EdgeGridConfig previously loaded.
def __init__(self, **kwargs):
"""
Initialize the class, get the necessary parameters
"""
self.user_agent = 'python-cachetclient'
try:
self.endpoint = kwargs['endpoint']
except KeyError:
raise KeyError('Cachet API endpoint is required')
self.api_token = kwargs.get('api_token', None)
self.timeout = kwargs.get('timeout', None)
self.verify = kwargs.get('verify', None)
self.pagination = kwargs.get('pagination', False)
self.http = requests.Session()
def setupSession():
session = requests.Session()
session.header = { 'User-Agent': "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:34.0) Gecko/20100101 Firefox/34.0","Accept-Encoding": "gzip, deflate, sdch"}
return session
def __init__(self, name: str = None, description: str = None, version: str = None):
self.app_id = {'X-TBA-App-Id': ""}
self.session = requests.Session()
self.session = CacheControl(self.session, heuristic=LastModified())
self.session.headers.update(self.app_id)
if name is not None: self.set_api_key(name, description, version)
def __init__(self):
Analyzer.__init__(self)
self.service = self.get_param('config.service', None, 'EmergingThreats service is missing')
self.apikey = self.get_param('config.key', None, 'EmergingThreats apikey is missing')
self.session = requests.Session()
self.session.headers.update({"Authorization": self.apikey})
def __init__(self, key, client_id, client_version='0.1'):
self.api_key = key
self.session = requests.Session()
self.url = 'https://safebrowsing.googleapis.com/v4/threatMatches:find?key={}'.format(key)
self.client_id = client_id
self.client_version = client_version
def run(path, quiet=False):
"""
Downloads all available hash files to a given path.
:param path: Path to download directory
:param quiet: If set to True, no progressbar is displayed
"""
if os.path.isdir(path):
session = requests.Session()
session.headers = {'User-agent': 'Mozilla/5.0 Chrome/57.0.2987.110'}
max_num = max(list(map(int, re.sub(r'[\<\>]',
'',
'\n'.join(re.findall(r'\>[1-9][0-9]{2}\<',
session.get('https://virusshare.com/hashes.4n6').text
)
)
).split('\n')
)
)
)
if not quiet:
p = progressbar.ProgressBar(max_value=max_num)
for i in range(max_num):
filename = str(i).zfill(3) + '.md5'
if os.path.exists(os.path.join(path, filename)):
continue
if not quiet:
p.update(i)
url = URL + filename
head = session.head(url)
if head.status_code == 200:
body = session.get(url, stream=True)
with io.open(os.path.join(path, str(i).zfill(3) + '.md5'), mode='wb') as afile:
for chunk in body.iter_content(chunk_size=1024):
afile.write(b'' + chunk)
body.close()
else:
print('Given path is not a directory.')
sys.exit(1)
def session(self, email, password):
session = requests.Session()
session.headers.update({
'content-type': 'application/x-www-form-urlencoded'
})
response = session.post(
ZoomClient.SIGNIN_URL, data={'email': email, 'password': password}
)
return session, response
def check(auth_ref, args):
# We call get_keystone_client here as there is some logic within to get a
# new token if previous one is bad.
keystone = get_keystone_client(auth_ref)
auth_token = keystone.auth_token
registry_endpoint = 'http://{ip}:9191'.format(ip=args.ip)
s = requests.Session()
s.headers.update(
{'Content-type': 'application/json',
'x-auth-token': auth_token})
try:
# /images returns a list of public, non-deleted images
r = s.get('%s/images' % registry_endpoint, verify=False, timeout=10)
is_up = r.ok
except (exc.ConnectionError, exc.HTTPError, exc.Timeout):
is_up = False
except Exception as e:
status_err(str(e))
metric_values = dict()
status_ok()
metric_bool('glance_registry_local_status', is_up)
# only want to send other metrics if api is up
if is_up:
milliseconds = r.elapsed.total_seconds() * 1000
metric('glance_registry_local_response_time', 'double',
'%.3f' % milliseconds, 'ms')
metric_values['glance_registry_local_response_time'] = ('%.3f' % milliseconds)
metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
def check(args):
metadata_endpoint = ('http://{ip}:8775'.format(ip=args.ip))
is_up = True
s = requests.Session()
try:
# looks like we can only get / (ec2 versions) without specifying
# an instance ID and other headers
versions = s.get('%s/' % metadata_endpoint,
verify=False,
timeout=10)
milliseconds = versions.elapsed.total_seconds() * 1000
if not versions.ok or '1.0' not in versions.content.splitlines():
is_up = False
except (exc.ConnectionError, exc.HTTPError, exc.Timeout) as e:
is_up = False
except Exception as e:
status_err(str(e))
metric_values = dict()
status_ok()
metric_bool('nova_api_metadata_local_status', is_up)
# only want to send other metrics if api is up
if is_up:
metric('nova_api_metadata_local_response_time',
'double',
'%.3f' % milliseconds,
'ms')
metric_values['nova_api_metadata_local_response_time'] = ('%.3f' % milliseconds)
metric_influx(INFLUX_MEASUREMENT_NAME, metric_values)
def upload(path, imagestore_string='fabric:ImageStore', show_progress=False): # pylint: disable=too-many-locals,missing-docstring
from sfctl.config import (client_endpoint, no_verify_setting, ca_cert_info,
cert_info)
import requests
abspath = validate_app_path(path)
basename = os.path.basename(abspath)
endpoint = client_endpoint()
cert = cert_info()
ca_cert = True
if no_verify_setting():
ca_cert = False
elif ca_cert_info():
ca_cert = ca_cert_info()
if all([no_verify_setting(), ca_cert_info()]):
raise CLIError('Cannot specify both CA cert info and no verify')
# Upload to either to a folder, or native image store only
if 'file:' in imagestore_string:
dest_path = path_from_imagestore_string(imagestore_string)
upload_to_fileshare(abspath, os.path.join(dest_path, basename),
show_progress)
elif imagestore_string == 'fabric:ImageStore':
with requests.Session() as sesh:
sesh.verify = ca_cert
sesh.cert = cert
upload_to_native_imagestore(sesh, endpoint, abspath, basename,
show_progress)
else:
raise CLIError('Unsupported image store connection string')
def __init__(self, service, role_source, configfile=DEFAULT_CONFIGFILE):
self.service = service
self.role_source = role_source
self.api_endpoint = 'http://127.0.0.1:8500/v1'
self.api_session = requests.Session()
self.hostname = gethostname()
self.short_hostname = self.hostname.split('.')[0]
self.update_service = False
self.valid_states = ['master', 'slave', 'fail']
self.configfile = configfile
self.leader_uri = self.api_endpoint + '/kv/session/' + self.service + '/leader'
def current_leader_session_id(self):
check_current_leader = self.api_session.get(self.leader_uri)
if check_current_leader.status_code == 200:
return check_current_leader.json()[0].get('Session')
def __init__(self):
self.URL_vendor = 'http://shop.bdgastore.com/'
self.URL_product = 'http://shop.bdgastore.com/collections/footwear/products/y-3-pureboost-zg'
self.URL_addToCart = 'http://shop.bdgastore.com/cart/add.js'
self.URL_cart = 'http://shop.bdgastore.com/cart'
self.user_size = '8'
self.user_session = requests.Session()
def get_data(username, no):
if no == 0:
z = 'followers'
else:
z = 'following'
# these lines of code gets the list of followers or the following on the first page
# when there are no further pages of followers or following. And if there are go forward with the next page
s = requests.Session()
final=[]
x = 1
pages = [""]
data = []
while(pages != [] and x <= max_number/no_per_page):
r = s.get('https://github.com/' + username + '?page=' + str(x) + '&tab=' + z) #first getting all the followers for z=0, and following for z=1
soup = BeautifulSoup(r.text)
data = data + soup.find_all("div", {"class" : "d-table col-12 width-full py-4 border-bottom border-gray-light"})
pages = soup.find_all("div", {"class" : "pagination"})
x += 1
# getting company and area.
for i in data:
username = i.find_all("a")[0]['href']
try:
company = i.find_all("span", {"class" : "mr-3"})[0].text.strip()
except:
company = "xxxxx"
try:
area = i.find_all("p", {"class" : "text-gray text-small mb-0"})[0].text.strip()
except:
area = "xxxxx"
soup2 = BeautifulSoup(str(i))
name = soup2.find_all("span",{"class" : "f4 link-gray-dark"})[0].text
final.append([username,company,area,name])
return final
def scrape_org(org,main_list,organisation):
s = requests.Session()
r = s.get('https://github.com/orgs/'+org+'/people')
soup = BeautifulSoup(r.text)
data = soup.find_all("li", {"class" : "table-list-item member-list-item js-bulk-actions-item "})
for i in data:
soup2=BeautifulSoup(str(i))
data2=soup2.find_all("div",{"class" : "table-list-cell py-3 pl-3 v-align-middle member-avatar-cell css-truncate pr-0"})
username = data2[0].find_all("a")[0]['href']
data3 = soup2.find_all("div",{"class" : "table-list-cell py-3 v-align-middle member-info css-truncate pl-3"})
name = data3[0].find_all("a")[0].text.strip()
main_list.append([username,name])
def update_org_list(main_list,organisation):
s = requests.Session()
for i in main_list:
r = s.get('https://github.com/'+i[0])
soup = BeautifulSoup(r.text)
data = soup.find_all("li",{"aria-label":"Organization"})
try:
if data[0].text not in organisation:
organisation.append(data[0].text)
except:
continue
return organisation
def scrape_org_general(org,main_list,organisation):
org.replace(" ","+")
s = requests.Session()
count = 1
k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93"
r = s.get(k)
soup = BeautifulSoup(r.text,"lxml")
data = soup.find_all("div",{"class":"user-list-info ml-2"})
while data!=[]:
for i in data:
username = i.find_all("a")[0]['href']
name = i.find_all("span",{"class":"f4 ml-1"})[0].text.strip()
main_list.append([username,name])
count+=1
k = "https://github.com/search?p="+str(count)+"&q="+org+"+type%3Auser&type=Users&utf8=%E2%9C%93"
r = s.get(k)
soup = BeautifulSoup(r.text,"lxml")
data = soup.find_all("div",{"class":"user-list-info ml-2"})
# scraping the github pages
def setUp(self, conf='/test.cfg'):
settings = Settings()
settings.setFile(base_path + conf)
Env.set('settings', settings)
Env.set('http_opener', requests.Session())
Env.set('cache', NoCache())
YGG.log.logger.setLevel('DEBUG')
YGG.log.logger.addHandler(handler)
return YGG()
def get_av_magnet(avcode):
Referer={
"Referer": "123"
}
s = requests.Session()
gid,uc = download_image(avcode)
params={
'gid':gid,
'uc':uc,
'lang':'zh'
}
r2 = s.get("http://www.javbus.com/ajax/uncledatoolsbyajax.php",params=params, proxies=proxy, headers=Referer)
soup = BeautifulSoup(r2.content.decode('utf-8', 'ignore'),'html.parser')
trs = soup.findAll('tr',attrs={"height":"35px"})
print '[*] get magnet link'
for tr in trs:
trsoup = BeautifulSoup(str(tr).decode('utf-8', 'ignore'),'html.parser')
td2 = trsoup.findAll('td',attrs={"style":"text-align:center;white-space:nowrap"})
a = td2[0].find('a')
magnet = a.get("href") #unicode object
size = a.text.strip()
print '[*] '+magnet,size
os.chdir("../..")
def _connect(self):
self._session = requests.Session()
adaptator = requests.adapters.HTTPAdapter()
adaptator.max_retries = HttpRetry(
read=self.READ_MAX_RETRIES,
connect=self.CONN_MAX_RETRIES,
backoff_factor=self.BACKOFF_FACTOR)
self._session.mount(str(self.url), adaptator)
self.__conn = self._session.get(
self.url,
stream=True,
timeout=(self.CONN_TIMEOUT, self.READ_TIMEOUT))
def makeRequestSession(self):
host = self.requestHandler.headers.getheader('host',None)
path = self.requestHandler.path
self.url = self.uri + "://" + host + path
session = requests.Session()
for header in self.requestHandler.headers.keys():
if header != 'content-length':
session.headers.update({header : self.requestHandler.headers.getheader(header)})
if self.proxies:
session.proxies = self.proxies
return session
def setUp(self):
self.tls_adapter = CbAPISessionAdapter(force_tls_1_2=True)
self.session = requests.Session()
self.session.mount("https://", self.tls_adapter)
def _url_to_key(self, url):
session = requests.Session()
return self.create_key(session.prepare_request(requests.Request('GET', url)))
def uninstall_cache():
""" Restores ``requests.Session`` and disables cache
"""
_patch_session_factory(OriginalSession)
def __init__(self, server, ssl_verify=True, token=None, ignore_system_proxy=False,
use_https_proxy=None, ssl_verify_hostname=True, use_http_proxy=None):
""" Requires:
server - URL to the Carbon Black server. Usually the same as
the web GUI.
ssl_verify - verify server SSL certificate
token - this is for CLI API interface
"""
# We will uncomment this once cbapi 1.0.0 is released
# warn("CbApi is deprecated and will be removed as of cbapi 2.0.0.", DeprecationWarning)
if not server.startswith("http"):
raise TypeError("Server must be URL: e.g, http://cb.example.com")
if token is None:
raise TypeError("Missing required authentication token.")
self.server = server.rstrip("/")
self.ssl_verify = ssl_verify
self.token = token
self.token_header = {'X-Auth-Token': self.token}
self.session = requests.Session()
if not ssl_verify_hostname:
self.session.mount("https://", HostNameIgnoringAdapter())
self.proxies = {}
if ignore_system_proxy: # see https://github.com/kennethreitz/requests/issues/879
self.proxies = {
'no': 'pass'
}
else:
if use_http_proxy:
self.proxies['http'] = use_http_proxy
if use_https_proxy:
self.proxies['https'] = use_https_proxy
def send(self, *a, **kw):
a[0].url = a[0].url.replace(urllib.quote("<"), "<")
a[0].url = a[0].url.replace(urllib.quote(" "), " ")
a[0].url = a[0].url.replace(urllib.quote(">"), ">")
return requests.Session.send(self, *a, **kw)
def index():
try:
api_key = os.environ['DASHBOARD_DUTY_KEY']
service_key = os.environ['DASHBOARD_DUTY_SERVICE']
except KeyError:
logging.error('Missing Environment Variable(s)')
exit(1)
session = requests.Session()
d_session = dashboard_duty.Core(session, api_key, service_key)
service = d_session.service()
incidents = d_session.incident(service['id'])
if service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'schedule_reference':
service_id = service['escalation_policy']['escalation_rules'][0]['targets'][0]['id']
oncall = d_session.oncall_schedule_policy(service_id)
elif service['escalation_policy']['escalation_rules'][0]['targets'][0]['type'] == 'user_reference':
username = service['escalation_policy']['escalation_rules'][0]['targets'][0]['summary']
oncall = d_session.oncall_user_policy(username)
else:
logging.error('Unable to handle oncall policy for %s' % service_key)
exit(1)
return render_template('index.html', service=service, incidents=incidents, oncall=oncall)
def list_DL(List):
List=List[1:]
List2=[]
ID_list=[]
List_dl=[]
k=0
j=0
p=0
while p*2<len(List):
List2.append(List[p*2])
p+=1
for i in List2:
#--------------------
s1=requests.Session()
s1.get(url0)
url_list=s1.get(List2[j], headers=headers)
#--------------------
#a1=requests.get(List2[j])
#html=a1.text
html=url_list.text
a2=html.find('<a href="javascript:void(0)" link="')
a2_len=len('<a href="javascript:void(0)" link="')
if a2<0:
a2=html.find("RJ.currentMP3Url = 'mp3/")
a2_len=len("RJ.currentMP3Url = 'mp3/")
a3=html.find('" target="_blank" class="mp3_download_link">')
if a3<0:
a3=html.find("RJ.currentMP3 = '")
a4=html[a2+a2_len:a3]
if a4.find("'")>0:
a4=a4[:a4.find("'")]
ID_list.append(a4+'.mp3')
while k < len(ID_list):
List_dl.append(check_host(l_mp3[:4],ID_list[k])[0])
k+=1
else:
List_dl.append(a4)
j+=1
return(List_dl)
def __init__(self, service_urls=None, user_agent=DEFAULT_USER_AGENT):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': user_agent,
})
self.service_urls = service_urls or ['translate.google.com']
self.token_acquirer = TokenAcquirer(session=self.session, host=self.service_urls[0])
# Use HTTP2 Adapter if hyper is installed
try: # pragma: nocover
from hyper.contrib import HTTP20Adapter
self.session.mount(urls.BASE, HTTP20Adapter())
except ImportError: # pragma: nocover
pass
def __init__(self, tkk='0', session=None, host='translate.google.com'):
self.session = session or requests.Session()
self.tkk = tkk
self.host = host if 'http' in host else 'https://' + host
def AkamaiEdgeGridSession_Setup(AkamaiEdgeGridConfig):
session = requests.Session()
session.auth = EdgeGridAuth(
client_token=AkamaiEdgeGridConfig['client_token'],
client_secret=AkamaiEdgeGridConfig['client_secret'],
access_token=AkamaiEdgeGridConfig['access_token']
)
return session
#Actually call akamai and return the result.
def __init__(self, host="127.0.0.1", port=46657):
# Tendermint endpoint
self.uri = "http://{}:{}".format(host, port)
# Keep a session
self.session = requests.Session()
# Request counter for json-rpc
self.request_counter = itertools.count()
# request headers
self.headers = {
'user-agent': AGENT,
'Content-Type': 'application/json'
}
def __init__(self, api_key=None, endpoint=None):
if api_key is None or endpoint is None:
try:
from pymatgen import SETTINGS
except ImportError:
warnings.warn('MPResterBase: not using pymatgen SETTINGS!')
SETTINGS = {}
if api_key is not None:
self.api_key = api_key
else:
self.api_key = SETTINGS.get("PMG_MAPI_KEY", "")
if endpoint is not None:
self.preamble = endpoint
else:
self.preamble = SETTINGS.get(
"PMG_MAPI_ENDPOINT", "https://www.materialsproject.org/rest/v2"
)
if not self.api_key:
raise ValueError('API key not set. Run `pmg config --add PMG_MAPI_KEY <USER_API_KEY>`.')
self.session = requests.Session()
self.session.headers = {"x-api-key": self.api_key}
def __init__(self, broker="http://127.0.0.1:19820/api", encoding="utf-8", enc_key=None, enc_iv=None):
super().__init__()
self._endpoint = broker
self._encoding = "utf-8"
if enc_key == None or enc_iv == None:
self._transport_enc = False
self._transport_enc_key = None
self._transport_enc_iv = None
self._cipher = None
else:
self._transport_enc = True
self._transport_enc_key = enc_key
self._transport_enc_iv = enc_iv
backend = default_backend()
self._cipher = Cipher(algorithms.AES(
enc_key), modes.CBC(enc_iv), backend=backend)
self._session = requests.Session()
self._event_dict = {'logon': self.on_login, 'logoff': self.on_logout, 'ping': self.on_ping,
'query_data': self.on_query_data, 'send_order': self.on_insert_order,
'cancel_order': self.on_cancel_order_event, 'get_quote': self.on_get_quote}
self.client_id = ''
self.account_id = ''
def test_hearthhead(self):
with requests.Session() as s:
self.assertEqual(scrape.getHearthHeadId('Quick Shot', 'Spell', s),
'quick-shot')
self.assertEqual(scrape.getHearthHeadId('Undercity Valiant',
'Minion', s), 'undercity-valiant')
self.assertEqual(scrape.getHearthHeadId('Gorehowl', 'Weapon', s),
'gorehowl')
self.assertEqual(scrape.getHearthHeadId('V-07-TR-0N',
'Minion', s), 'v-07-tr-0n')
self.assertEqual(scrape.getHearthHeadId("Al'Akir the Windlord",
'Minion', s), 'alakir-the-windlord')
def test_Hearthpwn(self):
with requests.Session() as s:
self.assertEqual(scrape.getHearthpwnIdAndUrl('Quick Shot',
'Blackrock Mountain', 'Spell', False, s),
(14459, 'https://media-hearth.cursecdn.com/avatars/328/302/14459.png'))
self.assertEqual(scrape.getHearthpwnIdAndUrl('Upgrade!',
'Classic', 'Spell', False, s),
(638, 'https://media-hearth.cursecdn.com/avatars/330/899/638.png'))
def loadTokens(tokens = {}, wantedTokens = {}):
resultCards = {}
with requests.Session() as session:
for name, ids in wantedTokens.items():
card = None
if 'id' in ids:
card = tokens[ids['id']]
if name != card['name']:
log.warning('loadTokens() names do not match: %s - %s', name, tokens[ids['id']]['name'])
if 'id' not in ids:
for token in tokens.values():
if name == token['name']:
if card:
log.warning('loadTokens() found token again: %s', name)
card = token
if not card:
log.warning('loadTokens() could not find: %s', name)
exit()
r = session.get('http://www.hearthpwn.com/cards/{}'.format(ids['hpwn']))
r.raise_for_status()
image = fromstring(r.text).xpath('//img[@class="hscard-static"]')[0].get('src')
if not image:
image = 'https://media-hearth.cursecdn.com/avatars/148/738/687.png'
card['cdn'] = image.replace('http://', 'https://').lower()
card['hpwn'] = ids['hpwn']
card['head'] = getHearthHeadId(card['name'], "ignored", "ignored")
# since jade golem: overwrite scraped stats with prepared ones
card['atk'] = ids.get('atk', card['atk'])
card['cost'] = ids.get('cost', card['cost'])
card['hp'] = ids.get('hp', card['hp'])
resultCards[card['name']] = card
print('.', end='')
return resultCards
def download_file_from_google_drive(id, destination):
URL = "https://docs.google.com/uc?export=download"
session = requests.Session()
response = session.get(URL, params={ 'id': id }, stream=True)
token = get_confirm_token(response)
if token:
params = { 'id' : id, 'confirm' : token }
response = session.get(URL, params=params, stream=True)
save_response_content(response, destination)
def __init__(self, config):
self.dbFilename = "downloads.db"
self.showsFilename = "SHOWS"
self.sourceIP = ""
self.transmissionHostRemote = ""
self.userpass = ""
self.downloadQuality = [720, 1080]
self.speakDownload = True
# We don't retain 'proxs' or 'requestsFromSource' in this
# instance since they get stored/retained inside TorrentApiController
proxs = {}
requestsFromSource = requests.Session()
self.establishConfiguration(config, requestsFromSource, proxs)
self.torrentController = TorrentApiController(requestsFromSource, proxs)
self.establishDatabase()
def _reset_session(self):
''' Get a new request session and try logging into the current
CVP node. If the login succeeded None will be returned and
self.session will be valid. If the login failed then an
exception error will be returned and self.session will
be set to None.
'''
self.session = requests.Session()
error = None
try:
self._login()
except (ConnectionError, CvpApiError, CvpRequestError,
CvpSessionLogOutError, HTTPError, ReadTimeout, Timeout,
TooManyRedirects) as error:
self.log.error(error)
# Any error that occurs during login is a good reason not to use
# this CVP node.
self.session = None
return error
def getSession(self):
if self._session is None:
self._session = requests.Session()
return self._session
def __init__(self, user_mode):
self.user_mode = user_mode
self.interface = None
self.quota = None
self.is_online = None
self.new_api = None
self.json_decoder = json.JSONDecoder()
self.session = requests.Session()
self.csrf_token = None
self.resolver = dns.resolver.Resolver()
self.resolver.nameservers = ['172.16.0.1']
self.api_host_ip = self.resolve_url(self.api_host)
self.api_host_new_ip = self.resolve_url(self.api_host_new)
def gethtml(link):
user_agent = "Mozilla/5.0 (Windows NT 6.1; rv:38.0) Gecko/20100101 Firefox/38.0"
headers={'user-agent':user_agent}
s = requests.Session()
try:
res = s.get(link,headers=headers).text
except requests.exceptions.InvalidSchema:
req=urllib2.Request(link,None,headers)
r = urllib2.urlopen(req)
res = r.read().decode('utf8')
return res
def main():
# Start a session so we can have persistant cookies
# Session() >> http://docs.python-requests.org/en/latest/api/#request-sessions
session = requests.Session()
# This is the form data that the page sends when logging in
login_data = {
'username': RegisterNumber,
'password': DateofBirth,
'submit': 'id',
}
print login_data
# Authenticate
r = session.post(URL, data = login_data)
# Try accessing a page that requires you to be logged in
r = session.get('http://sdpdr.nic.in/annauniv/result')
web = QWebView()
web.load(QUrl("http://sdpdr.nic.in/annauniv/result"))
#web.show()
printer = QPrinter()
printer.setPageSize(QPrinter.A4)
printer.setOutputFormat(QPrinter.PdfFormat)
printer.setOutputFileName("result.pdf")
# convertion of page to pdf format
def __init__(self, word, dirpath=None, processNum=30):
#if " " in word:
#raise AttributeError("This Script Only Support Single Keyword!")
self.word = word
self.char_table = {ord(key): ord(value)
for key, value in BaiduImgDownloader.char_table.items()}
if not dirpath:
dirpath = os.path.join(sys.path[0], 'results')
self.dirpath = dirpath
self.jsonUrlFile = os.path.join(sys.path[0], 'jsonUrl.txt')
self.logFile = os.path.join(sys.path[0], 'logInfo.txt')
self.errorFile = os.path.join(sys.path[0], 'errorUrl.txt')
if os.path.exists(self.errorFile):
os.remove(self.errorFile)
if not os.path.exists(self.dirpath):
os.mkdir(self.dirpath)
self.pool = Pool(30)
self.session = requests.Session()
self.session.headers = BaiduImgDownloader.headers
self.queue = Queue()
self.messageQueue = Queue()
self.index = 0
self.promptNum = 10
self.lock = threading.Lock()
self.delay = 1.5
self.QUIT = "QUIT"
self.printPrefix = "**"
self.processNum = processNum