Python requests 模块,ReadTimeout() 实例源码
我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.ReadTimeout()。
def update_status():
'''
-< True : Version is test; no update needed
-> False : Check for update failed
-> str : Latest version to update to
'''
try:
r = requests.get(constants.UPDATES, timeout=5)
except (requests.ConnectionError, requests.ConnectTimeout,
requests.ReadTimeout):
return False
else:
latestVersion = r.json()['latest_version']
if constants.VERSION == latestVersion:
return True
else:
return latestVersion
def get_params(self, container_id: str) -> Optional[Dict[str, Any]]:
if self._config.cache_params and container_id in self._params_cache:
logger.debug("Returning cached params for container {0}".format(container_id))
return self._params_cache[container_id]
logger.debug("[{0}] Starting to fetch params for {1}".format(threading.current_thread().name, container_id))
try:
params = self._client.inspect_container(container_id)
except NotFound as e:
logger.warning("Container {0} not found - {1}.".format(container_id, e))
return None
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Communication error when fetching params for container {0}: {1}".format(container_id, e))
return {}
except Exception as e:
logger.error("Unexpected error when fetching params for container {0}: {1}".format(container_id, e))
return {}
logger.debug("[{0}] Params fetched for {1}".format(threading.current_thread().name, container_id))
if not self._config.cache_params:
return params
logger.debug("[{0}] Storing params of {1} in cache".format(threading.current_thread().name, container_id))
self._params_cache[container_id] = params
return params
def _run_once(self):
try:
if self.settings.DO_INBOX or self.settings.DO_FALSEPOS:
await self._process_messages()
asyncio.sleep(5)
for multi in settings.MULTIREDDITS:
if self.settings.DO_OC:
asyncio.sleep(5)
await self._process_oc_stream(multi)
for multi in settings.MULTIREDDITS + [settings.PARENT_SUB]:
if self.settings.DO_MODLOG:
asyncio.sleep(5)
await self._process_network_modlog(multi)
except (HTTPException, requests.ReadTimeout,
requests.ConnectionError) as ex:
LOG.error('%s: %s', type(ex), ex)
else:
LOG.debug('All tasks processed.')
# ======================================================
def main(url, timeout=30, redirect_unknown=True, debug=False):
"""Actual monitoring execution"""
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
if debug: # pragma: no cover
logger.setLevel(logging.DEBUG)
logger.info('debug logging enabled')
# Check if URL is valid
logger.debug('perform URL validation check')
if not valid_http_url(url):
nagios.plugin_exit(nagios.Codes.UNKNOWN, 'provided URL is not valid')
# Send a HEAD request
logger.debug('send HEAD request')
try:
response = requests.head(url, timeout=timeout)
except requests.ConnectTimeout:
nagios.plugin_exit(nagios.Codes.CRITICAL, 'connection timeout')
except requests.ReadTimeout:
nagios.plugin_exit(nagios.Codes.CRITICAL, 'no response received before'
'timeout')
except requests.ConnectionError:
nagios.plugin_exit(nagios.Codes.UNKNOWN, 'connection error')
else:
logger.debug('response received')
if response.status_code == requests.codes.ok:
# Response is OK
nagios.plugin_exit(nagios.Codes.OK,
'status code is %d' % response.status_code)
elif redirect_unknown and response.status_code == requests.codes.found:
# Redirect considered as UNKNOWN
nagios.plugin_exit(nagios.Codes.UNKNOWN,
'redirection with code %d' %
response.status_code)
else:
# Other code, considered not working
nagios.plugin_exit(nagios.Codes.CRITICAL,
'status code is %d' % response.status_code)
def __request_data(self):
if self.__is_cache_valid():
return True
# requesting data
try:
# setting 5 sec timeouts for connect and read
r = requests.get(self.data_url, verify=False, allow_redirects=True, timeout=(5, 5))
except requests.ConnectionError as e:
print("Unable to connect to ", self.data_url, " error is ", e, file=sys.stderr)
return False
except requests.ConnectTimeout as e:
print("Timed out connection to ", self.data_url, " error is ", e, file=sys.stderr)
return False
except requests.ReadTimeout as e:
print("Timed out while reading data from ", self.data_url, " error is ", e, file=sys.stderr)
return False
if r.status_code == 200:
# got HTTP/200 for request - storing it in cache
try:
open(self.temp_file_name, mode="w").write(json.dumps(r.json()))
except IOError as e:
print("IO error while trying to store cache into file ", self.temp_file_name, " error is ",
e, file=sys.stderr)
return False
return True
else:
return False
def _download(self, request, spider):
def _retry():
if self.retry_on_download_timeout:
self.logger.debug('Read timed out, retry request {}'.format(request))
self.crawl(request, spider)
try:
self._process_request(request, spider)
if request is None:
return
method = request.method.upper()
resp = None
kw_params = {
'timeout': self.download_timeout,
'cookies': request.cookies,
'headers': request.headers,
'proxies': {
'http': request.proxy,
'https': request.proxy
}
}
self.logger.debug('[{}]<{} {}>'.format(spider.name, method, request.url))
if method == 'GET':
resp = requests.get(request.url, **kw_params)
elif method == 'POST':
resp = requests.post(request.url, request.data, **kw_params)
self._responses_queue.put((Response(resp.url, resp.status_code, resp.content, request,
resp.cookies), spider))
except (requests.ReadTimeout, requests.ConnectTimeout, requests.ConnectionError):
_retry()
except Exception as err:
self.logger.error(err, exc_info=True)
def check_container(self, container_id: str, check_source: CheckSource, remove_from_cache: bool=False) \
-> Optional[Container]:
try:
if remove_from_cache:
self.remove_from_cache(container_id)
if not self._config.disable_params:
params = self.get_params(container_id)
else:
params = {}
if not self._config.disable_metrics:
logger.debug("[{0}] Starting to fetch metrics for {1}".format(threading.current_thread().name,
container_id))
metrics = self._client.stats(container=container_id, decode=True, stream=False)
else:
metrics = {}
logger.debug("[{0}] Fetched data for container {1}".format(threading.current_thread().name, container_id))
except NotFound as e:
logger.warning("Container {0} not found - {1}.".format(container_id, e))
return None
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Communication error when fetching info about container {0}: {1}".format(container_id, e))
return None
except Exception as e:
logger.error("Unexpected error when fetching info about container {0}: {1}".format(container_id, e))
return None
if params is None or metrics is None:
logger.warning("Params or metrics were not fetched for container {}. Not returning container."
.format(container_id))
return None
return Container(container_id, params, metrics, 0, check_source)
def check_containers(self, check_source: CheckSource) -> Iterable[Container]:
with self._padlock:
if self._check_in_progress:
logger.warning("[{0}] Previous check did not yet complete, consider increasing CHECK_INTERVAL_S"
.format(threading.current_thread().name))
return
self._check_in_progress = True
logger.debug("Periodic check start: connecting to get the list of containers")
self.last_check_containers_run_start_timestamp = datetime.datetime.utcnow()
try:
containers = self._client.containers(quiet=True)
logger.debug("[{0}] Fetched containers list from docker daemon".format(threading.current_thread().name))
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Timeout while trying to get list of containers from docker: {0}".format(e))
with self._padlock:
self._check_in_progress = False
self.last_periodic_run_ok = False
return
except Exception as e:
logger.error("Unexpected error while trying to get list of containers from docker: {0}".format(e))
with self._padlock:
self._check_in_progress = False
self.last_periodic_run_ok = False
return
ids = [container['Id'] for container in containers]
for container_id in ids:
container = self.check_container(container_id, check_source)
if container is None:
continue
yield container
logger.debug("Containers checked")
if self._config.cache_params:
logger.debug("Purging cache")
self.purge_cache(ids)
self.last_periodic_run_ok = True
self.last_check_containers_run_end_timestamp = datetime.datetime.utcnow()
self.last_check_containers_run_time = self.last_check_containers_run_end_timestamp \
- self.last_check_containers_run_start_timestamp
logger.debug("Periodic check done")
with self._padlock:
self._check_in_progress = False
def get_events_observable(self) -> Iterable[Any]:
successful = False
ev = None
while not successful:
try:
ev = self._client.events(decode=True)
except (ReadTimeout, ProtocolError, JSONDecodeError) as e:
logger.error("Communication error when subscribing for container events, retrying in 5s: {0}".format(e))
time.sleep(5)
except Exception as e:
logger.error("Unexpected error when subscribing for container events, retrying in 5s: {0}".format(e))
time.sleep(5)
successful = True
return ev
def kill_container(self, container: Container) -> None:
try:
self._client.stop(container.params['Id'])
except (ReadTimeout, ProtocolError) as e:
logger.error("Communication error when stopping container {0}: {1}".format(container.cid, e))
except Exception as e:
logger.error("Unexpected error when stopping container {0}: {1}".format(container.cid, e))
def run(self):
while True:
stream = submission_stream(self.r, 'all', verbosity=0)
try:
for post in stream:
self._do_post(post)
except (HTTPException, requests.ReadTimeout,
requests.ConnectionError) as e:
LOG.error('{}: {}'.format(type(e), e))
else:
LOG.error('Stream ended.')
LOG.info('Sleeping for {} minutes.'.format(RETRY_MINUTES))
sleep(60 * RETRY_MINUTES)
def _get_remote_svg_tile(hass, host, port, prefix, name, width_tiles, c1, c2):
"""Get remote SVG file."""
url_tile = URL_TILE_MASK.format(host, port, prefix, name, width_tiles)
ok, r_svg, status = False, None, -1
try:
r_svg = yield from hass.async_add_job(
partial(requests.get, url_tile, timeout=15))
status = r_svg.status_code
ok = r_svg.ok
except (requests.ReadTimeout, requests.ConnectionError):
pass
if ok:
# yield from asyncio.sleep(0)
color1 = ', '.join([str(x) for x in c1])
color2 = ', '.join([str(x) for x in c2])
mask_bg = ('background-image: radial-gradient('
'farthest-corner at 70% 70%, rgba({}), rgba({}));'
.format(color1, color2))
svg_text_sub = RG_TILE_BACKGROUND.sub(
'{}{}'.format(MASK_SVG_STYLE, mask_bg),
r_svg.content.decode(), count=1)
# svg_text_sub = RG_TILE_SIZE.sub(
# # 'viewBox="0 0 600 400"',
# 'viewBox="0 0 300 250"',
# svg_text_sub, count=1)
# LOGGER.warning(svg_text_sub[:300])
return svg_text_sub.encode()
LOGGER.info('TILE REQUEST ERROR [code:{}]: {} - {}'
.format(status, r_svg, url_tile))
return None
def execute(self, method, *args):
payload = dumps(args, methodname=method, allow_none=True)
body = gzip.compress(payload.encode('utf8'))
try:
res = await self.loop.run_in_executor(None, self.__request, body)
data, _ = loads(res.text, use_datetime=True)
if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0:
if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]:
raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString'])
self.retries = 0
return data[0]
raise DedimaniaTransportException('Invalid response from dedimania!')
except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e:
raise DedimaniaTransportException(e) from e
except ConnectTimeout as e:
raise DedimaniaTransportException(e) from e
except DedimaniaTransportException:
# Try to setup new session.
self.retries += 1
if self.retries > 5:
raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!')
self.client = requests.session()
try:
await self.authenticate()
return await self.execute(method, *args)
except Exception as e:
logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
handle_exception(e, __name__, 'execute')
raise DedimaniaTransportException('Could not retrieve data from dedimania!')
except DedimaniaFault as e:
if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString):
try:
self.retries += 1
if self.retries > 5:
raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!')
await self.authenticate()
return await self.execute(method, *args)
except:
return
logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
handle_exception(e, __name__, 'execute', extra_data={
'dedimania_retries': self.retries,
})
raise DedimaniaTransportException('Could not retrieve data from dedimania!')
def oembed_json(url, cache_failures=True):
"""oembed_json(url, *, cache_failures=True)
Asks Noembed_ for the embedding HTML code for arbitrary URLs. Sites
supported include Youtube, Vimeo, Twitter and many others.
Successful embeds are always cached for 30 days.
Failures are cached if ``cache_failures`` is ``True`` (the default). The
durations are as follows:
- Connection errors are cached 60 seconds with the hope that the connection
failure is only transient.
- HTTP errors codes and responses in an unexpected format (no JSON) are
cached for 24 hours.
The return value is always a dictionary, but it may be empty.
"""
# Thundering herd problem etc...
key = 'oembed-url-%s-data' % md5(url.encode('utf-8')).hexdigest()
data = cache.get(key)
if data is not None:
return data
try:
data = requests.get(
'https://noembed.com/embed',
params={
'url': url,
'nowrap': 'on',
'maxwidth': 1200,
'maxheight': 800,
},
timeout=2,
).json()
except (requests.ConnectionError, requests.ReadTimeout):
# Connection failed? Hopefully temporary, try again soon.
timeout = 60
except (ValueError, requests.HTTPError):
# Oof... HTTP error code, or no JSON? Try again tomorrow,
# and we should really log this.
timeout = 86400
else:
# Perfect, cache for 30 days
cache.set(key, data, timeout=30 * 86400)
return data
if cache_failures:
cache.set(key, {}, timeout=timeout)
return {}
def get_data(self):
self.get_url_and_html()
for html in self.list_html:
try:
soup = BeautifulSoup(str(html), 'lxml')
# soup??????????????
if soup.find("tr").findAll("td"):
ip_and_port = (
soup.find("tr").findAll("td")[1].get_text() +
":" + soup.find("tr").findAll("td")[2].get_text()
)
proxies = {
"http": ip_and_port,
"https": ip_and_port
}
# ??ip?????????2?
response = requests.get(
"http://1212.ip138.com/ic.asp",
headers=headers,
proxies=proxies,
timeout=2
)
if response.status_code == 200:
self.ip_and_port = ip_and_port
print "ip???????" + self.ip_and_port
print "??????:{},?????:{},?????:{},?????:{},?????:{},?????:{}".format(
str(soup.find("tr").findAll("td")[3].get_text()).replace("\n", ""),
soup.find("tr").findAll("td")[4].get_text(),
soup.find("tr").findAll("td")[5].get_text(),
soup.find("tr").findAll("td")[6].find({"div", "title"}).attrs["title"],
soup.find("tr").findAll("td")[7].find({"div", "title"}).attrs["title"],
soup.find("tr").findAll("td")[8].get_text(),
soup.find("tr").findAll("td")[9].get_text()
)
break
else:
print "http????200"
raise requests.ConnectionError
except requests.ReadTimeout:
print "?ip??????????????ip"
except requests.ConnectionError:
print "?ip????"
except Exception as e:
print "??????????:%(errorName)s\n?????:\n%(detailInfo)s" % {
"errorName": e, "detailInfo": traceback.format_exc()}
def checkin(cred, code):
'''
-> 0: Successful check-in
-> 1: No internet connection
-> 2: Invalid credentials
-> 3: Not connected to SunwayEdu Wi-Fi
-> 4: Invalid code
-> 5: Wrong class
-> 6: Already checked-in
'''
# Start a session
session = requests.Session()
# Login to iZone
payload = {
'form_action': 'submitted',
'student_uid': cred[0],
'password': cred[1],
}
try:
r = session.post(constants.LOGIN, data=payload)
except requests.ConnectionError:
return 1
if not r.history:
return 2
# Check for SunwayEdu Wi-Fi
try:
r = requests.get(constants.WIFI, timeout=2)
except requests.ConnectTimeout:
return 3
except requests.ConnectionError:
return 1
# Check-in with code
try:
r = session.post(constants.CHECKIN, data={'checkin_code': code},
timeout=2)
except (requests.ReadTimeout, requests.ConnectionError):
return 1
if 'Checkin code not valid.' in r.text or \
'The specified URL cannot be found.' in r.text:
return 4
if 'You cannot check in to a class you are not a part of.' in r.text:
return 5
if 'You have already checked in' in r.text:
return 6
return 0