我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用validators.url()。
def route_micro(micro): ''' Micro to real URL redirection handler. ''' try: temp = lookup_micro(micro) if urlcheck(temp): return redirect(temp) elif domaincheck(temp): return redirect("http://" + temp) elif ipcheck(temp.split(':')[0]) and urlcheck('http://' + temp): # checks for plain ip or an ip with something after it return redirect("http://" + temp) else: abort(404) except Exception as e: # If micro is not registered, handle the exception from trying to look # it up and raise a 404 HTTP error. sys.stderr.write(str(e)) abort(404)
def __do_http_request(self, type_, url, data): """make http get and post requests""" parsed_url = self.__parse_url(url) parameter = self.__get_parameter_from_parsed_url(parsed_url) hostname = self.__get_host_from_parsed_url(parsed_url) url = hostname + parsed_url.path # url is overwritten payload = { parameter: data } if type_ == 'GET': request = requests.get(url, payload) elif type_ == 'POST': request = requests.post(url, payload) else: request = None return self.__validate_request_status(request)
def import_extract_main(chars={}, datafile=os.path.join("data", "ccew", "extract_main_charity.csv")): with open(datafile, encoding="latin1") as a: csvreader = csv.reader(a, doublequote=False, escapechar='\\') ccount = 0 for row in csvreader: if len(row) > 1: row = clean_row(row) if row[1]: chars[row[0]]["company_number"].append({ "number": parse_company_number(row[1]), "url": "http://beta.companieshouse.gov.uk/company/" + parse_company_number(row[1]), "source": "ccew" }) if row[9]: chars[row[0]]["url"] = row[9] if row[6]: chars[row[0]]["latest_income"] = int(row[6]) ccount += 1 if ccount % 10000 == 0: print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount, end='') print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount) return chars
def clean_chars(chars={}, pc_es=None, es_pc_index="postcode", es_pc_type="postcode"): ccount = 0 for c in chars: if pc_es: geo_data = fetch_postcode(chars[c]["geo"]["postcode"], pc_es, es_pc_index, es_pc_type) if geo_data: chars[c]["geo"]["location"] = geo_data[0] chars[c]["geo"]["areas"] = geo_data[1] chars[c]["url"] = parse_url(chars[c]["url"]) chars[c]["domain"] = get_domain(chars[c]["url"]) chars[c]['org-ids'] = add_org_id_prefix(chars[c]) chars[c]["alt_names"] = [n["name"] for n in chars[c]["names"] if n["name"] != chars[c]["known_as"]] # @TODO capitalisation of names ccount += 1 if ccount % 10000 == 0: print('\r', "[Geo] %s charites added location details" % ccount, end='') print('\r', "[Geo] %s charites added location details" % ccount) return chars
def checkType(self, argument): """ Identify observable type """ if validators.url(argument): return "URL" elif validators.md5(argument): return "MD5" elif validators.sha1(argument): return "SHA1" elif validators.sha256(argument): return "SHA256" elif validators.sha512(argument): return "SHA512" elif validators.ipv4(argument): return "IPv4" elif validators.ipv6(argument): return "IPv6" elif validators.domain(argument): return "domain" else: mod.display("MAIN", argument, "ERROR", "Unable to retrieve observable type") return None
def do_import_bookmarks(filename): content = [] first = _("Oops, import failed") second = _("could be corrupted or a invalid HTML bookmark file") with open(filename) as f: l = f.readlines() if not re.findall("<!DOCTYPE NETSCAPE-Bookmark-file-1>", l[0], re.IGNORECASE): dialog().error(first, "<span size='small'>\"<b>{}</b>\" {}.</span>".format(filename, second)) return True title = re.findall(r'<a[^>]*>(.*?)</a>', str(l), re.IGNORECASE) url = re.findall(r'<a[^>]* href="([^"]*)"', str(l), re.IGNORECASE) for c, i in enumerate(title): if title[c] and url[c]: content.append([title[c]] + [url[c]]) return content
def do_export_bookmarks(list): content = [] header = "<!DOCTYPE NETSCAPE-Bookmark-file-1><!--This is an automatically generated file.\ It will be read and overwritten. Do Not Edit! --><META HTTP-EQUIV=\"Content-Type\" CONTENT=\"text/html;\ charset=UTF-8\"><Title>{}</Title><H1>{}</H1><DL><p>".format(_("Bookmarks"), _("Bookmarks")) footer = "</DL><p>" content.append(header) for i in list: timestamp = int(datetime.datetime.strptime(i[0], "%Y-%m-%d %H:%M").timestamp()) title = i[1] url = i[2] content.append("<DT><A HREF=\"{}\" ADD_DATE=\"{}\">{}</A>".format(url, timestamp, title)) content.append(footer) content = "".join([s for s in content]) return content
def on_insert_bookmarks(self, title, url): with bookmarks_con: cur = bookmarks_con.cursor() cur.execute("SELECT * FROM bookmarks;") urls = cur.fetchall() if len(urls) != 0: for i in urls: if url == i[1]: return True cur.execute("INSERT INTO bookmarks VALUES(?, ?, ?);",\ (title.replace("\n","").strip(), url, time.strftime("%Y-%m-%d %H:%M"))) self.refresh_liststore(1) return True
def on_js_switch(self, button, active): if not set_enable_javascript: return True page = self.tabs[self.current_page][0] settings = page.webview.get_settings() if button.get_active(): settings.set_property("enable-javascript", True) self.js_label.set_markup(self.jse_label_text) else: settings.set_property("enable-javascript", False) self.js_label.set_markup(self.jsd_label_text) page.webview.set_settings(settings) url = page.webview.get_uri() if url and validators.url(url): page.webview.reload()
def on_decide_destination(self, download, name): url = download.get_request().get_uri() if not name: name = get_domain(url).replace(".", "_") if not "." in name: mime = download.get_response().get_mime_type() suf = mime.split("/") name = "{}.{}".format(name, suf[1]) for i in self.dlview: for a in i: if type(a) == Gtk.ModelButton: if a.get_name().split("/")[-1] == name: self.downloads_menu.show() return True if url: pathchooser().save(name, download, url)
def dynamic_title(self, view, title): url = view.get_uri() if not url and not title: title = tab_name if not title: title = url counter = 0 for tab, widget in self.tabs: widget = self.check_tab(widget, 0) if tab.webview is view: if widget: widget.set_text(minify(title, 50)) widget.set_tooltip_text("") if len(title) > 50: widget.set_tooltip_text(title) counter += 1
def is_url_safe(url): if not url.startswith("https://"): return False if not validators.url(url, public=True): return False whitelist_urls = os.environ["WHITELISTED_CALLBACK_URLS"].split(';') if url in whitelist_urls: return True forbidden_urls = os.environ["FORBIDDEN_CALLBACK_URLS"].split(';') for furl in forbidden_urls: if furl in url: return False return True
def __init__(self, url, max_worker=10, timeout=3, scan_dict=None, verbose=False, status=None): self.site_lang = '' self.raw_base_url = url self.base_url = url self.max_worker = max_worker self.timeout = timeout self.scan_dict = scan_dict self.verbose = verbose self.first_item = '' self.dict_data = {} self.first_queue = [] self.found_items = {} if status is None or len(status) == 0: self.status = [200, 301, 302, 304, 401, 403] else: self.status = [int(t) for t in status]
def on_response(self, url, item, method, response, queue): if response.code in self.status: if item in self.found_items: return self.found_items[item] = None logger.warning('[Y] %s %s %s' % (response.code, method, url)) # ?????????????????????? if any(map(item.endswith, ['.php', '.asp', '.jsp'])): bak_list = self.make_bak_file_list(item) bak_list = [(t, 'HEAD') for t in bak_list] queue.extendleft(bak_list) else: if response.code == 405 and method != 'POST': queue.appendleft((item, 'POST')) if self.verbose: logger.info('[N] %s %s %s' % (response.code, method, url))
def prepare_url(self): url_parsed = urlparse(self.raw_base_url) items = url_parsed.path.split('/') if len(items) > 0: item = items[-1] items = items[:-1] new_path = '/'.join(items) else: item = '' new_path = url_parsed.path url = urlunparse((url_parsed.scheme, url_parsed.netloc, new_path, '', '', '')) if item.endswith('.php'): self.site_lang = 'php' elif item.endswith('.asp'): self.site_lang = 'asp' elif item.endswith('.aspx'): self.site_lang = 'aspx' if self.site_lang != '': logger.info('site_lang: %s' % self.site_lang) self.base_url = url self.first_item = item logger.info('base_url: %s' % url) logger.info('first_item: %s' % item)
def get_comments_based_on_keyword(search): logging.info('SEARCH = {}'.format(search)) url = 'http://www.amazon.co.jp/s/ref=nb_sb_noss_2?url=search-alias%3Daps&field-keywords=' + \ search + '&rh=i%3Aaps%2Ck%3A' + search soup = get_soup(url) items = [] for a in soup.find_all('a', class_='s-access-detail-page'): if a.find('h2') is not None and validators.url(a.get('href')): name = str(a.find('h2').string) link = a.get('href') items.append((link, name)) logging.info('Found {} items.'.format(len(items))) for (link, name) in items: logging.debug('link = {}, name = {}'.format(link, name)) product_id = extract_product_id(link) get_comments_with_product_id(product_id)
def get_paginated_list(results, url, start, size, page_size=settings.PAGINATION_SIZE): # check if page exists count = size # make response obj = {} obj['start'] = start obj['page_size'] = page_size obj['count'] = count # make URLs # make previous url if start == 1: obj['previous'] = '' else: start_copy = max(1, start - page_size) page_size_copy = start - 1 obj['previous'] = url + '?start=%d' % (start_copy) # make next url if start + page_size > count: obj['next'] = '' else: start_copy = start + page_size obj['next'] = url + '?start=%d' % (start_copy) # finally extract result according to bounds obj['results'] = results return obj
def format_urls_in_text(text): new_text = [] accepted_protocols = ['http://', 'https://', 'ftp://', 'ftps://'] for word in str(text).split(): new_word = word accepted = [protocol for protocol in accepted_protocols if protocol in new_word] if not accepted: new_word = 'http://{0}'.format(new_word) if validators.url(new_word)==True: new_word = '<a href="{0}">{1}</a>'.format(new_word, word) else: new_word = word new_text.append(new_word) return ' '.join(new_text)
def loadm3u(url): hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Accept-Encoding': 'none', 'Accept-Language': 'en-US,en;q=0.8', 'Connection': 'keep-alive'} req = urllib2.Request(url, headers=hdr) response = urllib2.urlopen(req) data = response.read() if not 'EXTM3U' in data: raise Exception(url + " is not a m3u8 file.") #return data.encode('utf-8') return data
def dictToM3U(cumulustv): channels = cumulustv["channels"] channelDataMap = [ ("number", "tvg-id"), ("name", "tvg-name"), ("logo", "tvg-logo"), ("genres", "group-title"), ("country", "tvg-country"), ("lang", "tvg-language") ] m3uStr = "#EXTM3U\n" for channel in channels: m3uStr += "#EXTINF:-1" for dataId, extinfId in channelDataMap: if channel[dataId] is not None and channel[dataId] != "": m3uStr += " " + extinfId + "=\"" + channel[dataId].strip() + "\"" m3uStr += "," + channel["name"].strip() + "\n" m3uStr += channel["url"] + "\n" return m3uStr
def get_api_url(): """ Get management url from the config file """ config_key = 'api_url' try: url = CONFIG.get(URL_SECTION, config_key) if validators.url(str(url)): return url else: print_config_error_and_exit(URL_SECTION, 'REST API URL(%s)' % config_key) except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): return DEFAULT_API_URL
def build_url(nodes): """ Build a url with the given array of nodes for the url and return path and url respectively Ordering is important """ path = str.join('/', nodes) url = str.join('/', [get_api_url(), path]) return path, url
def parse_result(self): """ Abstract parse_result method. It calls when analyze is finished. It uptade malware with indicators. """ if not self._result: return json_ole = self.json_decode(self._result) if not json_ole: return for item in json_ole: if "IOC" in item["type"]: score = 7 if "URL" in item['description'] and validators.url(item['keyword']): extract_malware = self.malware.add_extract_malware( self.module_cls_name, item['keyword'], Type.get_label(Type.URL)) Input.analyse_malware(extract_malware) elif "AutoExec" in item["type"]: score = 7 elif "Suspicious" in item["type"]: score = 5 elif "VBA string" in item["type"]: score = 3 elif "Hex String" in item["type"]: score = 1 else: score = -1 indicator = Indicator.factory(module_cls_name=self.module_cls_name, name="item", content_type=Type.JSON, content=json.dumps(item), score=score) self._malware.get_module_status(self.module_cls_name ).add_indicator(indicator)
def success(): try: URL = session["url"] if URL.find("jobs") is not -1 and URL.find("metadata") is -1: data = urllib.urlopen(URL).read() data = json.loads(data) temp = data["job_ids"] if temp: info = {} for ID in temp: url = URL + "?id=" + ID + "&type=metadata" data_temp = urllib.urlopen(url).read() data_temp = json.loads(data_temp) report_data = get_data(data_temp)[-1] info[ID] = report_data return render_template('plot_jobs.html', results=info) if validators.url(URL): data = urllib.urlopen(URL).read() else: data = open("./static/testdata/" + URL).read() data = json.loads(data) response = get_data(data) if response[0] == "single": metrics, report_data = response[1], response[2] results = response[3] return render_template('plot_tables.html', metrics=metrics, report_data=report_data, results=results) else: return render_template('plot_multi_data.html', results=response[1]) except Exception as e: session['server_error'] = e.message + ' ' + repr(e.args) return redirect(url_for('file_not_found'))
def url(): if request.method == 'POST': url = request.form['url'] session["url"] = url return redirect(url_for('success'))
def validate_result(current, default, type): """ Validates the data, whether it needs to be url, twitter, linkedin link etc. """ if current is None: current = "" if default is None: default = "" if type == "URL" and validators.url(current, require_tld=True) and not validators.url(default, require_tld=True): return current if type == "EMAIL" and validators.email(current) and not validators.email(default): return current return default
def fetch_tsv_data(gid): base_url = 'https://docs.google.com/spreadsheets/d/' + SHEET_ID + '/export?format=tsv' url = base_url + '&gid=' + gid logging.info('GET ' + url) res = urllib2.urlopen(url) return res.read()
def iterate_bytechunks(hashme, is_string, use_json, hash_many): """ Prep our bytes. """ # URL if not is_string and validators.url(hashme): if not use_json: click.echo("Hashing content of URL " + click.style(hashme, bold=True) + "..", err=not hash_many) try: response = requests.get(hashme) except requests.exceptions.ConnectionError as e: raise ValueError("Not a valid URL. :(") except Exception as e: raise ValueError("Not a valid URL. {}.".format(e)) if response.status_code != 200: click.echo("Response returned %s. :(" % response.status_code, err=True) bytechunks = response.iter_content() # File elif os.path.exists(hashme) and not is_string: if os.path.isdir(hashme): if not use_json: click.echo(click.style("Skipping", fg="yellow") + " directory " + "'" + hashme + "'..", err=True) return None if not use_json: click.echo("Hashing file " + click.style(hashme, bold=True) + "..", err=not hash_many) bytechunks = FileIter(open(hashme, mode='rb')) # String else: if not use_json: click.echo("Hashing string " + click.style(hashme, bold=True) + "..", err=not hash_many) bytechunks = (hashme.encode('utf-8'), ) return bytechunks
def test_valid_project_url(): assert validators.url(quantopian_tools.__project_url__)
def post(self): url = self.get_body_argument("url") if not validators.url(url): self.set_status(400, "bad URL") return with self._connect() as connection: try: createSource(connection, url) except sqlite3.IntegrityError: self.set_status(400, "duplicate URL") return self.set_status(201)
def get(self, url): with self._connect() as connection: try: item = source(connection, url) self.write(json_encode(item)) except IndexError: self.set_status(404, "Can't find '%s'" % url)
def delete(self, url): with self._connect() as connection: count = delete(connection, url) self.set_status(204, "Deleted %s with count %d"% (url, count))
def __parse_url(url): """return urlparsed url""" return urlparse(url, allow_fragments=False)
def f_http_get(self, url, ): """send http get request""" return self.__do_http_request('GET', url, self.input)
def f_http_post(self, url): """send http post request""" return self.__do_http_request('POST', url, self.input)
def f_retrieve_image(self, url=None): """retrieve an image, image is stored in memory only :rtype: StringIO object :param url: Image URL :return: Object of image """ if self.input: if not self.input.startswith('http'): self.input = 'http://' + self.input output = BytesIO(requests.get(self.input).content) else: output = BytesIO(requests.get(url).content) return output
def strip_link(url): if validators.url(url) == True: return url.split("/")[3] else: exit("[!] Not a valid URL")
def main(): parser = ArgumentParser(description="Transcode text from an imgur image to an mp3") parser.add_argument("-l", "--link", dest="link_input", help="The text or image file to transcode", metavar="STRING") parser.add_argument("-o", "--output", dest="file_output", help="The file name to output the result into (whether it be an image, or other file type)", metavar="FILE") if len(sys.argv) == 1: parser.print_help() sys.exit() args = vars(parser.parse_args()) if args['link_input'] != None: output_filename = args['file_output'] url = args['link_input'] if output_filename == None: #if a filename isn't specified, make it the imgur image code output_fileWithExtension = strip_link(url) output_file = output_fileWithExtension.split(".", 1)[0] urllib.urlretrieve(url, "temp_"+output_fileWithExtension) os.system("python pngify.py -i temp_"+output_fileWithExtension+" -o "+output_file+".mp3; rm temp_"+output_filename) #run pngify.py then delete temp file else: #image name is specified o = output_filename.split(".", 1) output_file = output_filename.split(".", 1)[0] if o[len(o)-1] != "png": #if the user didn't input the file extension, add it in automatically output_filename = output_filename+".png" urllib.urlretrieve(url, "temp_"+output_filename) #download file os.system("python pngify.py -i temp_"+output_filename+" -o "+output_file+".mp3; rm temp_"+output_filename) #run pngify.py then delete temp file
def parse_url(url): if url is None: return None url = url.strip() if validators.url(url): return url if validators.url("http://%s" % url): return "http://%s" % url if url in ["n.a", 'non.e', '.0', '-.-', '.none', '.nil', 'N/A', 'TBC', 'under construction', '.n/a', '0.0', '.P', b'', 'no.website']: return None for i in ['http;//', 'http//', 'http.//', 'http:\\\\', 'http://http://', 'www://', 'www.http://']: url = url.replace(i, 'http://') url = url.replace('http:/www', 'http://www') for i in ['www,', ':www', 'www:', 'www/', 'www\\\\', '.www']: url = url.replace(i, 'www.') url = url.replace(',', '.') url = url.replace('..', '.') if validators.url(url): return url if validators.url("http://%s" % url): return "http://%s" % url
def get_domain(url=None, email=None): if url is None: return None u = urlparse(url) domain = u.netloc if domain.startswith('www.'): domain = domain[4:] return domain
def enumerate_http_resources(package, package_path): with (package_path / 'resource.json').open() as json_file: resource = json.load(json_file) for name, url in resource.get('images', {}).items(): if name != 'screenshots': yield url, pathlib.Path(package, 'images') for name, url in resource.get('assets', {}).get('uris', {}).items(): yield url, pathlib.Path(package, 'uris') for k in resource.get('cli', {}).get('binaries', {}): url = resource.get('cli', {}).get('binaries', {}).get(k, {}).get('x86-64', {}).get('url', '') yield url, pathlib.Path(package, 'cli', k) command_path = (package_path / 'command.json') if command_path.exists(): with command_path.open() as json_file: commands = json.load(json_file) for url in commands.get("pip", []): if url.startswith('http'): yield url, pathlib.Path(package, 'commands') def traverse_yield(d, key='root'): if isinstance(d, dict): if 'default' in d and str(d['default']).startswith('http'): url = d['default'] if valid_download(url): yield url, pathlib.Path(package, 'config', key) else: for k, v in d.items(): yield from traverse_yield(v, k) config_path = (package_path / 'config.json') if config_path.exists(): with config_path.open() as json_file: config = json.load(json_file) yield from traverse_yield(config)
def add_http_resource(dir_path, url, base_path): archive_path = (dir_path / base_path / pathlib.Path(urllib.parse.urlparse(url).path).name) print('Adding {} at {}.'.format(url, archive_path)) os.makedirs(str(archive_path.parent), exist_ok=True) urllib.request.urlretrieve(url, str(archive_path))
def valid_download(url): return bool(validators.url(url)) and int(httplib2.Http().request(url, 'HEAD')[0]['status']) < 400
def url_exists(path): if validators.url(path): return True return False
def validate_account_signin(account): result = { 'accountAlias': None, 'accountId': None, 'signinUri': 'https://' + account + '.signin.aws.amazon.com/', 'exists': False, 'error': None } if re.match(r'\d{12}', account): result['accountId'] = account else: result['accountAlias'] = account if not validators.url(result['signinUri']): result['error'] = 'Invalid URI' return result try: r = requests.get(result['signinUri'], allow_redirects=False) if r.status_code == 302: result['exists'] = True except requests.exceptions.RequestException as e: result['error'] = e return result
def run(self): while True: if self.count >= self.workers_number: break try: test = self.queue_logging.get() if test is None: self.count += 1 else: if test['success']: log.success('Authentication successful: Username: \"{}\" Password: \"{}\" URL: {}'.format(test['username'], test['password'], test['target']['url'])) else: if self.verbose: log.debug('Authentication failed: Username: \"{}\" Password: \"{}\" URL: {}'.format(test['username'], test['password'], test['target']['url'])) self.progress += 1 if self.progress % 10 == 0: if self.verbose: log.info('Progress : {}'.format(self.progress)) else: log.info('Progress : {}'.format(self.progress), update=True) except Exception as e: traceback.print_exc() log.error('WorkerLogging => {} : {}'.format(type(e), e)) log.info('Progress : {} (end)'.format(self.progress))
def has_url(_string): if validators.url(_string): return True return False