Python requests 模块,head() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.head()。
def qiniu_upload_img(img_url):
"""?????????
Args:
img_url (string): ????
"""
response = requests.get(img_url)
image = response.content
md5 = calc_md5(image)
qiniu_url = 'http://{}/{}'.format(QINIU_HOSTNAME, md5)
if requests.head(qiniu_url).ok:
return qiniu_url
q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
token = q.upload_token(QINIU_BUCKET, md5, 10)
put_data(token, md5, image, mime_type='image/jpeg')
return qiniu_url
def register(self):
u"Register resource into CKAN"
import ckanapi
ckansite = ckanapi.RemoteCKAN(self.ckan_url, apikey=self.apikey)
# resource url responds?
resource = requests.head(self.url)
self.size = int(resource.headers["content-length"])
# resource exists?
resources = ckansite.action.resource_search(query=u"url:%s" % self.url)
if resources[u"count"] == 0:
ckansite.action.resource_create(
package_id = self.package_id,
url = self.url,
name = self.name,
description = self.description,
format = self.format,
mimetype = self.mimetype,
size = self.size,
)
def _is_update_needed(self):
"""
Determines if an update for the database is necessary
:return: Whether or not the database should be updated
"""
# Retrieve the headers of the response to compare the MD5 checksums of the file
headers_response = requests.head(self.MAXMIND_FREE_DB_URL)
logger.debug(headers_response.headers)
headers_response.raise_for_status()
response_checksum = headers_response.headers.get('X-Database-MD5')
logger.info("Database MD5 received in response headers: " + str(response_checksum))
# Compare the current file checksum to the one received from MaxMind
if response_checksum is not None and response_checksum == self._database_checksum:
return False
self._response_checksum = response_checksum
return True
def head_account_metadata(self):
# get account's metadata
url = self.url
# request api
response = requests.head(url, headers=self.base_headers)
# formatting result
result = dict()
for key in response.headers:
if key == 'X-Account-Container-Count':
result['container_count'] = \
response.headers['X-Account-Container-Count']
elif key == 'X-Account-Object-Count':
result['object_count'] = \
response.headers['X-Account-Object-Count']
elif key == 'X-Account-Bytes-Used':
result['used_bytes'] = replace_bytes_to_readable(
response.headers['X-Account-Bytes-Used']
)
else:
result[key] = response.headers[key]
return result
def head_container_metadata(self, container_name):
"""
get container's metadata
:param container_name: target container name
:return: container's metadata dict
"""
# check container metadata
url = self.url + '/' + container_name
# request api
response = requests.head(url, headers=self.base_headers)
# formatting result
result = dict()
for key in response.headers:
if key == 'X-Container-Object-Count':
result['object_count'] = \
response.headers['X-Container-Object-Count']
elif key == 'X-Container-Bytes-Used':
result['used_bytes'] = replace_bytes_to_readable(
response.headers['X-Container-Bytes-Used']
)
else:
result[key] = response.headers[key]
return result
def get_layer_size(self, layer_hash):
"""
Attempt to return the size of the given layer
"""
url = "{base_url}/v2/{name}/blobs/{layer_hash}".format(
base_url=self.base_url,
name=self.repository_name,
layer_hash=layer_hash)
headers = {}
if self.token is not None:
headers["Authorization"] = "Bearer %s" % self.token
r = requests.head(url, headers=headers, allow_redirects=True, timeout=(3.05,5))
r.raise_for_status()
if "content-length" in r.headers:
self.layer_sizes[layer_hash] = int(r.headers["content-length"])
else:
self.layer_sizes[layer_hash] = None
return self.layer_sizes[layer_hash]
def getDLsize(self):
debug_log("getDLsize called")
it = QTreeWidgetItemIterator(self.tw)
while it.value():
item = it.value()
url_test = item.data(0, dataURL)
if url_test is not None:
try:
r = requests.head(url_test)
r.raise_for_status()
try:
size = (int(r.headers['Content-Length']) / 1024) / 1024
except ValueError:
size = 0
if size > 0:
item.setText(2, "{} MiB".format(round(size, 2)))
except requests.exceptions.HTTPError:
debug_log("Error {} getting DL size: {}".format(r.status_code, r.headers))
item.setText(2, r.status_code)
except requests.exceptions.RequestException as e:
item.setText(2, self.tr("Error"))
debug_log(e, logging.ERROR)
it += 1
def head(self):
try:
response = requests.head(self.url, timeout=self.request_timeout)
self.res = response # assign the response object from requests to a property on the instance of HTTP class
return response.headers
except requests.exceptions.ConnectionError as ce:
return False
except Exception as e:
if DEBUG_FLAG:
sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
raise e
#------------------------------------------------------------------------------
# [ post method ] (string)
# HTTP POST request for text
# returns text from the URL as a string
#------------------------------------------------------------------------------
def head(self):
try:
response = requests.head(self.url, timeout=self.request_timeout)
self.res = response # assign the response object from requests to a property on the instance of HTTP class
return response.headers
except requests.exceptions.ConnectionError as ce:
return False
except Exception as e:
if DEBUG_FLAG:
sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
raise e
#------------------------------------------------------------------------------
# [ post method ] (string)
# HTTP POST request for text
# returns text from the URL as a string
#------------------------------------------------------------------------------
def _get_wp_api_url(self, url):
"""
Private function for finding the WP-API URL.
Arguments
---------
url : str
WordPress instance URL.
"""
resp = requests.head(url)
# Search the Links for rel="https://api.w.org/".
wp_api_rel = resp.links.get('https://api.w.org/')
if wp_api_rel:
return wp_api_rel['url']
else:
# TODO: Rasie a better exception to the rel doesn't exist.
raise Exception
def already_cached(self, asset_url):
'''Checks if an item is already cached. This is indicated in the
headers of the file being checked.'''
try:
req = requests.head(asset_url, headers={'user-agent': self.user_agent}) # NOQA
if req.headers.get('Content-Type') is not None:
# Item is not in cache
self.log.debug('Not in cache: %s' % asset_url)
return False
else:
# Item is already cached
self.log.info('Already in cache: %s' % asset_url)
return True
except:
# In case there is an error, we should return false anyway as there
# is no harm in re-downloading the item if it's already downloaded.
return False
def get_normal_title(self,url):
return_message = ""
head = requests.head(url)
max_size = 5e6
if 'content-length' in head.headers and int(head.headers['content-length']) > max_size:
return_message = "File too big for link preview\r\n"
else:
with eventlet.Timeout(60, False):
response = requests.get(url,timeout=30)
if response.status_code == 200:
if 'text/html' in response.headers['content-type']:
soup = BeautifulSoup(response.content,"lxml")
if soup.title is not None:
return_message += soup.title.string + "\r\n"
else:
return_message += response.headers['content-type'] + " Size: " + response.headers['content-length'] + "\r\n"
return return_message
def find_decl_doc(self, name):
raise IncludeError(name)
import requests
from requests.exceptions import InvalidSchema
url = METATAB_ASSETS_URL + name + '.csv'
try:
# See if it exists online in the official repo
r = requests.head(url, allow_redirects=False)
if r.status_code == requests.codes.ok:
return url
except InvalidSchema:
pass # It's probably FTP
def get_minutes_url(config, metadata):
start_date = parse_timestamp_naively(metadata['start']).astimezone(get_tz(config))
meeting_type = None
for key, value in config.get('minutes_abbrs', {}).items():
if key in metadata[config['minutes_abbrs_for']]:
meeting_type = value
break
minutes_url = metadata.get('minutes_url')
if minutes_url:
requests.head(minutes_url).raise_for_status()
return minutes_url
elif config['id'] == 'vancouver':
if not meeting_type:
return 'N/A'
if metadata['title'] == 'Inaugural Council Meeting':
meeting_type = 'inau'
mins = 'http://council.vancouver.ca/{dt:%Y%m%d}/{type}{dt:%Y%m%d}ag.htm'.format(
type=meeting_type, dt=start_date)
requests.head(mins).raise_for_status()
return mins
return 'N/A'
def check_pnda_mirror():
def raise_error(reason):
CONSOLE.info('PNDA mirror...... ERROR')
CONSOLE.error(reason)
CONSOLE.error(traceback.format_exc())
sys.exit(1)
try:
mirror = PNDA_ENV['mirrors']['PNDA_MIRROR']
response = requests.head(mirror)
# expect 200 (open mirror) 403 (no listing allowed)
# or any redirect (in case of proxy/redirect)
if response.status_code not in [200, 403, 301, 302, 303, 307, 308]:
raise_error("PNDA mirror configured and present "
"but responded with unexpected status code (%s). " % response.status_code)
CONSOLE.info('PNDA mirror...... OK')
except KeyError:
raise_error('PNDA mirror was not defined in pnda_env.yaml')
except:
raise_error("Failed to connect to PNDA mirror. Verify connection "
"to %s, check mirror in pnda_env.yaml and try again." % mirror)
def _len(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML\
, like Gecko) Chrome/50.0.2661.102 Safari/537.36',
'Range': 'bytes=0-0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
}
with requests.get(url, headers=headers) as r:
length = r.headers['Content-Range']
if length.find('0-0/') == -1:
length = None
else:
length = length.split('0-0/')[-1]
length = int(length) if length else 0
if not length:
del headers['Range']
with requests.head(url, headers=headers) as r:
length = r.headers['Content-Length']
length = int(length) if length else 0
return length
def _scan_target_normal(self,target):
try:
r=requests.head(target)
#it maybe a directory
if self._check_eponymous_dir(r,target):
return target+'/;'
if self._check_exist_code(r.status_code):
# check content
r=requests.get(target)
if self._check_exist_code(r.status_code):
for cur in self._not_exist_flag:
if r.text.find(cur)!=-1:
return ''
return target+';'
return ''
except Exception as e:
return ''
def validate_media_url(self):
"""
Validates the media URL
Raises:
3PlayMediaUrlError: on invalid media url or content type
"""
if not self.media_url:
raise ThreePlayMediaUrlError('Invalid media URL "{media_url}".'.format(media_url=self.media_url))
response = requests.head(url=self.media_url)
if not response.ok:
raise ThreePlayMediaUrlError('The URL "{media_url}" is not Accessible.'.format(media_url=self.media_url))
elif response.headers['Content-Type'] != self.allowed_content_type:
raise ThreePlayMediaUrlError(
'Media content-type should be "{allowed_type}". URL was "{media_url}", content-type was "{type}"'.format( # pylint: disable=line-too-long
allowed_type=self.allowed_content_type,
media_url=self.media_url,
type=response.headers['Content-Type'],
)
)
def _check_website(url, code):
try:
response = requests.head(url, headers={'user_agent': DEFAULT_HEADERS})
website_status_code = response.status_code
if code:
return website_status_code
else:
if website_status_code == 200:
return True
else:
return False
except requests.exceptions.Timeout, requests.exceptions.ConnectionError:
if code:
return -1
else:
return False
def sync(self, attempt_perceptual_hash=False):
"""Attempt to sync the image with the remote location. Optionally does a number of
other actions:
* If the image exists and we have not constructed a fingerprint for it,
do so at this time and populate the image model's perceptual_hash field
(if attempt_perceptual_hash is True; defaults to False because it's slow)
* If the image does not exist, mark it as removed_from_source
* If the image is removed_from_source, delete it from the search engine
* In all cases, update the last_synced_with_source timestamp on the image"""
req = requests.head(self.url)
if req.status_code == 200:
if not self.perceptual_hash and attempt_perceptual_hash:
self.perceptual_hash = self.generate_hash()
else:
self.removed_from_source = True
self.last_synced_with_source = timezone.now()
self.save()
def do_http_request(self, method, endpoint, data=None, files=None, timeout=100, only_headers=False, custom_header=None):
if only_headers is True:
return requests.head(endpoint)
if method == 'post':
action = requests.post
else:
action = requests.get
if custom_header:
headers = {'user-agent': custom_header}
else:
headers = {'user-agent': str(Etiquette())}
if method == 'post':
result = action(endpoint, data=data, files=files, timeout=timeout, headers=headers)
else:
result = action(endpoint, params=data, timeout=timeout, headers=headers)
if self.throttle is True:
self._update_rate_limits(result.headers)
sleep(self.throttling_time)
return result
def get_url_file_size(url, proxies=None):
"""???????????????KB
:param proxies:
:param url:
:return:
"""
r = requests.head(url=url, proxies=proxies, timeout=3.0)
while r.is_redirect: # ??????
# print 'got'
# print r.headers['Location']
r = requests.head(url=r.headers['Location'], proxies=proxies, timeout=3.0)
# print r.headers
# print r.headers['Content-Length']
return int(r.headers['Content-Length']) / 1024
def load(self, url):
"""Load script from URL."""
# Check for a redirect to get a final base_url where to start
resp = requests.head(url, allow_redirects=True)
if url != resp.url:
# Followed a redirect
url = resp.url
fname = get_response_fname(resp)
_, ext = os.path.splitext(fname)
if ext == ".py":
py_file = self.fetch_file(url, url)
return py_file
else:
self.crawl(url, url)
def fuzzworth_contentl(self, head, strict = False):
'''Check the content-length before moving on. If strict is set to True content-length
must be validated before moving on. If strict is set to False (default) the URL will
be fuzzed if no content-length is found or no head is returned. The idea behind this
method is to ensure that huge files are not downloaded, slowing down fuzzing.'''
#no param no fuzzing
if head and "content-length" in head:
content_length = int(head["content-length"])
if content_length < self.pagesize_limit:
return True
else:
return False
else:
if strict:
return False
else:
return True
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
'''This method will check the content length header strictly. If a content-length header is found
and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return
True. If not or if content-type can't be read either, return False.'''
#if content-length header is found return True
if head and "content-length" in head:
return self.fuzzworth_contentl(head, strict = True)
#if content-length header not found, check content-type, if it is of allowed type
#return True, otherwise false
if head and "content-type" in head:
return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)
return False
def exce(indexOfResult,indexOfChar,queryASCII):
# content-start
url = "http://127.0.0.1/Less-9/?id="
tempurl = url + getPayload(indexOfResult,indexOfChar,queryASCII)
before_time = time.time()
requests.head(tempurl)
after_time = time.time()
# content-end
use_time = after_time - before_time
# judge-start
# ?sleep????? , ???????? (????????????? , ???SQL??????????sleep????????)
# ?????????? , ????/???????sleep?????????
if abs(use_time) > error_time:
return True
else:
return False
# judge-end
def validate_href(image_href):
"""Validate HTTP image reference.
:param image_href: Image reference.
:raises: exception.ImageRefValidationFailed if HEAD request failed or
returned response code not equal to 200.
:returns: Response to HEAD request.
"""
try:
response = requests.head(image_href)
if response.status_code != http_client.OK:
raise exception.ImageRefValidationFailed(
image_href=image_href,
reason=("Got HTTP code %s instead of 200 in response to "
"HEAD request." % response.status_code))
except requests.RequestException as e:
raise exception.ImageRefValidationFailed(image_href=image_href,
reason=e)
return response
def get_remote_source_length():
url = os.environ.get('SP_REMOTE_SOURCE')
try:
response = requests.head(url, allow_redirects=True, timeout=10)
except requests.exceptions.RequestException as e:
puts(colored.red('[HEAD] %s' % url))
puts(colored.red('Failed to get remote installation size: %s' % e))
sys.exit(1)
size = response.headers.get('content-length')
if not size:
size = response.headers.get('Content-Length')
if not size or not size.isdigit():
puts(colored.red('Could not fetch the remote Content-Length.'))
sys.exit(1)
try:
size = int(size)
except ValueError:
pass
return size
def _get_imageId(self, ImageName, tag="latest"):
""" ??????tag?imageId/digest """
ReqUrl = self._baseUrl + "/repositories/{}/tags/{}".format(ImageName, tag) if self.version == 1 else self._baseUrl + "/{}/manifests/{}".format(ImageName, tag)
logger.info("_get_imageId for url {}".format(ReqUrl))
try:
if self.version == 1:
r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
else:
r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
except Exception,e:
logger.error(e, exc_info=True)
return False
else:
if self.version == 1:
return r.json()
else:
return r.headers.get("Docker-Content-Digest", "")
return ""
def check_download_session(url, download_dir, cookies):
r = requests.head(url, cookies=cookies)
if r.status_code != 200 or 'Content-Disposition' not in r.headers:
return False
m = re.search('filename="(.+)"', r.headers['Content-Disposition'])
if not m:
return False
f = m.group(1)
filename = os.path.join(download_dir, f)
if os.path.isfile(filename):
return True
# get it
print "Fetching %s" % f
r2 = requests.get(url, cookies=cookies)
if r2.status_code != 200:
return False
with open(filename, 'wb') as f:
f.write(r2.content)
return True
def _scan(self):
while True:
if self.queue.empty():
break
try:
sub = self.queue.get_nowait()
self.queue.task_done()
domain = self.target + sub
r = requests.head(domain, headers=header, timeout=5, stream=True)
code = r.status_code
if code in self.status_code:
logger.info('status code {} -> {}'.format(code, domain))
except Exception, e:
pass
self.thread_count -= 1
def _from_image_tag_getId(self, ImageName, tag, url, version=1):
""" ??????tag?imageId/digest """
if url:
ReqUrl = url.strip("/") + "/v1/repositories/{}/tags/{}".format(ImageName, tag) if version == 1 else url.strip("/") + "/v2/{}/manifests/{}".format(ImageName, tag)
logger.info("_from_image_tag_getId for url {}".format(ReqUrl))
try:
if version == 1:
r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
else:
r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
except Exception,e:
logger.error(e, exc_info=True)
else:
if version == 1:
return r.json()
else:
return r.headers.get("Docker-Content-Digest", "")
return ""
def setURL(self,update=None):
#if no valid path, do nothing:
if self.path == None or self.grid == None or self.var == None or\
self.ext == None or self.year == None:
self.url = None
else:
self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
self.ext + self.year + ".nc"
#Check if the file exists
fexist = requests.head(self.url+'.html')
if (fexist.status_code >= 400):
self.yearList = self.yearList[1:]
self.year = self.yearList[self.currentYearIndex+1]
self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
self.ext + self.year + ".nc"
if update == True:
self.NCARfile()
def run(self):
while True:
piece = self.queue.get()
words = piece.split("\n")
for word in words:
url = self.target + word + self.suffix
#print "[*] Trying: " + url
try:
r = requests.head(url)
if r.status_code == 200:
print "[+] 200 - " + url
except:
print "[*] Request Error: " + url
self.queue.task_done()
# http://stackoverflow.com/questions/519633/lazy-method-for-reading-big-file-in-python
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None):
if method not in ["get", "post", "put", "delete", "head", "options"]:
raise Exception("Not supported method: %s" % method)
_cookie_obj = cookie
_response = None
if header is not "":
_request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url,
request_body,
header)
else:
_request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body)
exec _request_api_string
return _response
def _is_var_ready(self, cycle, var):
"""
Checks if the variable var is ready for the given forecast hour by comparing its
filetime to the timestamp given by the forecast hour. If the filetime is newer
(later) then the variable is ready.
:param cycle: which cycle are we working with (UTC)
:param var: the variable identifier
:return: true if the variable is ready
"""
# find last-modified time of file in UTC timezone
url = self._remote_var_url(cycle.hour, var)
r = requests.head(url)
if r.status_code != 200:
raise ValueError('Cannot find variable %s for hour %d at url %s' % (var, cycle.hour, url))
last_modif = self._parse_header_timestamp(r.headers['Last-Modified'])
return last_modif > cycle
def setup_args():
'''
setup command line arguments
'''
parser = argparse.ArgumentParser(description="Beatle request utility",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--debug', action='store_true', default=False,
help="enable debugging log")
parser.add_argument('--shard', action='store',
help='shard name or list shards by "list" value')
parser.add_argument('request', action='store',
help='request: post/get/head/delete/list')
parser.add_argument('filename', action='store',
help='file name')
return parser
def sitesManager( media_url ):
#picks which class will handle the media identification and extraction for website_name
#first resolve url shortener
shorteners=['bit.ly','goo.gl','tinyurl.com']
if any(shortener in media_url for shortener in shorteners):
#v=sitesBase.requests_get('https://unshorten.me/s/'+ urllib.quote_plus( media_url ) )
v = requests.head( media_url, timeout=REQUEST_TIMEOUT, allow_redirects=True )
log(' short url(%s)=%s'%(media_url,repr(v.url)))
media_url=v.url
for subcls in sitesBase.__subclasses__():
regex=subcls.regex
if regex:
match=re.compile( regex , re.I).findall( media_url )
#log("testing:{0}[{1}] {2}".format(media_url,regex, repr(match)) )
if match :
return subcls( media_url )
def check(url, timeout):
try:
if timeout <= 0:
timeout = 20
response = requests.head(url,timeout = timeout)
code = response.status_code
screenLock.acquire()
if code == 200:
ColorPrinter.print_green_text("[ " + str(code) + " ]")
print "Checking : " + url
if "404" in response.text:
ColorPrinter.print_blue_text(url + "\tMaybe every page same!")
elif code == 404 or code == 405:
pass
# ColorPrinter.print_red_text("[ " + str(code) + " ]")
# print "Checking : " + url
else:
ColorPrinter.print_blue_text("[ " + str(code) + " ]")
print "Checking : " + url
except Exception as e:
screenLock.acquire()
print e
finally:
screenLock.release()
def run(self):
headers = {
"User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
}
payloads = ["/excel/sso_user_export.php",
"/excel/user_export.php",
"/excel/server_export.php"]
try:
for payload in payloads:
vulnurl = self.url + payload
req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
if r"application/vnd.ms-excel" in req.headers["Content-Type"]:
cprint("[+]???????????????...(??)\tpayload: "+vulnurl, "yellow")
except:
cprint("[-] "+__file__+"====>????", "cyan")
def run(self):
headers = {
"User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
}
payloads = ["/data/dkcm_ssdfhwejkfs.mdb",
"/_data/___dkcms_30_free.mdb",
"/_data/I^(()UU()H.mdb"]
for payload in payloads:
vulnurl = self.url + payload
try:
req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
if req.headers["Content-Type"] == "application/x-msaccess":
cprint("[+]??dkcms???????...(??)\tpayload: "+vulnurl, "red")
except:
cprint("[-] "+__file__+"====>????", "cyan")
def fuzzworth_contentl(self, head, strict = False):
'''Check the content-length before moving on. If strict is set to True content-length
must be validated before moving on. If strict is set to False (default) the URL will
be fuzzed if no content-length is found or no head is returned. The idea behind this
method is to ensure that huge files are not downloaded, slowing down fuzzing.'''
#no param no fuzzing
if head and "content-length" in head:
content_length = int(head["content-length"])
if content_length < self.pagesize_limit:
return True
else:
return False
else:
if strict:
return False
else:
return True
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
'''This method will check the content length header strictly. If a content-length header is found
and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return
True. If not or if content-type can't be read either, return False.'''
#if content-length header is found return True
if head and "content-length" in head:
return self.fuzzworth_contentl(head, strict = True)
#if content-length header not found, check content-type, if it is of allowed type
#return True, otherwise false
if head and "content-type" in head:
return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)
return False
def pull_request_to_github(main_repo_user, repo_name, environment):
#Creating strings, mainly URLs, that will be needed for the
#pull request process.
#This is the API URL to make a pull request
pull_request_url = ('https://api.github.com/repos/{}/{}/pulls'.format(
main_repo_user, repo_name))
#This has the username and password from the environment file.
#It is used to log in for API calls.
auth_string = ('{}:{}'.format(environment['github_username'],
environment['github_password']))
#This is the data that will be posted for the pull request.
#It tells the API what the pull request will be like.
pull_request_data = ('{{"title": "{}", "head": "{}:master",'
' "base": "master"}}'.format(
environment['github_pull_request_title'],
environment['github_username']))
pull_request_command = ['curl', '--user', auth_string, pull_request_url,
'-d', pull_request_data]
#Make the pull request to the main repository
subprocess.check_output(pull_request_command)
return pull_request_command
def check_valid_url(repo_zip_url):
#Check that it's a URL
if(repo_zip_url[:7] != 'http://' and repo_zip_url[:8] != 'https://'):
return False
#Get just the head.
request = requests.head(repo_zip_url)
#It should be either success (200's) or redirect(300's).
#Otherwise, inform the user of failure.
if (request.status_code >= 400 or request.status_code < 200):
print('Could not reach URL provided.\n')
print('Provided url was {} and resulted in status code {}'.format(
repo_zip_url,str(request.status_code)))
return False
else:
return True
#This sends a notification email based on information provided
#in the environment file
def check_project_exist(self, project_name):
result = False
path = '%s://%s/api/projects?project_name=%s' % (
self.protocol, self.host, project_name)
response = requests.head(path,
cookies={'beegosessionID': self.session_id})
if response.status_code == 200:
result = True
logging.debug(
"Successfully check project exist, result: {}".format(result))
elif response.status_code == 404:
result = False
logging.debug(
"Successfully check project exist, result: {}".format(result))
else:
logging.error("Fail to check project exist")
return result
# POST /projects
def _is_commit_ref_available(self, package, namespace):
"""
Check if commit ref is available on git repository
"""
if not package or not namespace:
self.error("Missing parameter to check if commit is available")
repository = "https://src.fedoraproject.org/cgit/%s/%s.git" % (namespace, package.name)
if package.repository:
repository = package.repository
ref = "HEAD"
if package.ref:
ref = package.ref
patch_path = "/patch/?id=%s" % ref
url = repository + patch_path
resp = requests.head(url)
if resp.status_code < 200 or resp.status_code >= 300:
self.error("Could not find ref '%s' on '%s'. returned exit status %d; output:\n%s" %
(ref, package.name, resp.status_code, resp.text))
self.log.info("Found ref: %s for %s" % (ref, package.name))
def url_resp(url):
"""Receives a url returns True in case of 200 or 301 response codes"""
res = None
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) \AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
res = requests.head(url, timeout=3, headers=headers)
except:
pass
if res:
code = res.status_code
print('*' + str(code) + '*')
if code == 200 or code == 301:
print(url)
print('#ok#')
return True
else:
print('#bad#')
def back_online_notifier(the_url):
print("found " + the_url + " back online.")
try:
old_status_file = pickle.load(open("status.p", "rb"))
except Exception as ex:
print("The status.p file was not found. it will be recreated." + str(ex))
return
if (the_url in old_status_file) and (old_status_file[the_url]['status'] == "down"):
it_was_down_time = old_status_file[the_url]['time']
current_time = datetime.datetime.now().replace(microsecond=0)
send_message(CHANNEL_ID, ":herb: " + the_url + " is back online. It was down for " + str(current_time - it_was_down_time))
else:
print("skipping notifying that the url is online")
# --- getting only the head ---
def total_blocks_and_bytes(self):
total_blocks, total_bytes = 0, 0
for u in self.input_urls:
head_response_headers = requests.head(u).headers
if 'Content-Length' not in head_response_headers:
m = "The url: '{}' doesn't support the 'Content-Length' field.".format(u)
raise ContentLengthNotSupportedException(m)
else:
remote_size = int(head_response_headers['Content-Length'])
total_bytes += remote_size
num_blocks, last_block_size = divmod(remote_size, self.blocksize)
total_blocks += num_blocks
if last_block_size:
total_blocks += 1
return total_blocks, total_bytes