我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用django.core.validators.URLValidator()。
def get_http_headers(url): """ Get HTTP headers for given url """ if not url: raise BadRequest('Missing url') url = _get_deobfuscate_item(url) try: validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp')) validate(url) except ValidationError: raise BadRequest('Not a valid URL') try: response = ImplementationFactory.instance.get_singleton_of( 'PhishingServiceBase' ).get_http_headers(url) schema.valid_adapter_response('PhishingServiceBase', 'get_http_headers', response) return response except (PhishingServiceException, schema.InvalidFormatError, schema.SchemaNotFound) as ex: raise InternalServerError(str(ex))
def handle(self, **kwargs): self.write('Bootstrapping Promgen') if not os.path.exists(settings.CONFIG_DIR): self.write('Creating config directory {} ', settings.CONFIG_DIR) os.makedirs(settings.CONFIG_DIR) if not os.path.exists(settings.PROMGEN_CONFIG): path = os.path.join(settings.BASE_DIR, 'promgen', 'tests', 'examples', 'promgen.yml') self.write('Creating promgen config {} from {}', settings.PROMGEN_CONFIG, path) shutil.copy(path, settings.PROMGEN_CONFIG) self.write_setting('SECRET_KEY', default=settings.SECRET_KEY) self.write_setting('DATABASE_URL', test=dj_database_url.parse) # Schemes based on list of supported brokers # http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html self.write_setting('CELERY_BROKER_URL', test=URLValidator(schemes=['redis', 'amqp', 'sqs']))
def api_url_request(request): if request.method=="POST": url_verification=URLValidator() post_url=request.POST.get('url'); result=UrlShrinked(url="") if post_url: try: url_verification(post_url) result.url=post_url result.save() result.publish() except Exception as e: result.url="url_invalid" result.shrinked_code="url_invalid" else: result.url="url_empty" result.shrinked_code="url_empty" object_response={'url':result.url,'shrink':result.shrinked_code} return HttpResponse(JsonResponse(object_response), content_type="application/json") #Search url using shrinked code, private function, TO MOVE
def create(self, request, *args, **kwargs): """ Creates a Torrent object links it to the user. Validates `url` query parameter whether is a magnet/torrent link or not. :return: Response """ link = request.data.get('link', '').strip() url_validator = URLValidator() try: if not link.startswith('magnet:'): url_validator(link) if not link.endswith('.torrent'): raise ValidationError("Invalid torrent URL.") except ValidationError as e: return Response({'detail': e}, status=status.HTTP_400_BAD_REQUEST) torrent = transmission.add_torrent(link) torrent_model, created = Torrent.objects.get_or_create( hash=torrent.hashString, defaults={ 'name': torrent.name, 'private': torrent.isPrivate, }, ) request.user.torrents.add(torrent_model) return Response( self.serializer_class(torrent_model).data, status=created and status.HTTP_201_CREATED or status.HTTP_200_OK )
def __init__(self, page, subject, predicate, obj): """Turn http parameters into a triple pattern query object using RDFlib objects.""" self.page = int(page) self.subject = URIRef(subject) if subject else None self.predicate = URIRef(predicate) if predicate else None self.obj = URIRef(obj) if obj else None if self.obj is not None: validator = URLValidator() try: validator(self.obj) self.obj = URIRef(self.obj) except ValidationError: self.obj = string_to_literal(self.obj, validator)
def validate_bugs(value): """ Inherits from a Built-in URLValidator """ return bugs_validator(value)
def get_url_hostname(url): """ Try to get domain for an url :param str url: The url to extract hostname :rtype: str :return: the hostname or None """ try: validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp')) validate(url) except (ValueError, ValidationError): return None parsed = urlparse(url) return parsed.hostname
def get_url_external_reputation(url): """ External check for url """ try: validate = URLValidator() validate(url) except (ValueError, ValidationError): raise BadRequest('Not a valid URL') results = [] if ImplementationFactory.instance.is_implemented('ReputationDaoBase'): try: results = ImplementationFactory.instance.get_singleton_of( 'ReputationDaoBase' ).get_url_external_reputations(url) except ReputationDaoException: pass return results
def _get_item_ip_hostname_url(item): """ Get item infos """ ip_addr = hostname = url = None try: validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp')) validate(item['rawItem']) item['itemType'] = 'URL' url = item['rawItem'] except ValidationError: try: validate_ipv46_address(item['rawItem']) item['itemType'] = 'IP' except ValidationError: item['itemType'] = 'FQDN' if item['itemType'] == 'URL': hostname = utils.get_url_hostname(item['rawItem']) ips = utils.get_ips_from_url(item['rawItem']) if ips: ip_addr = ips[0] elif item['itemType'] == 'IP': item['itemType'] = 'IP' ip_addr = item['rawItem'] elif item['itemType'] == 'FQDN': hostname = item['rawItem'] ips = utils.get_ips_from_fqdn(item['rawItem']) if ips: ip_addr = ips[0] return ip_addr, hostname, url
def validate_url(value): url_validator = URLValidator() reg_val = value if "http" in reg_val: new_value = reg_val else: new_value = 'http://' + value try: url_validator(new_value) except: raise ValidationError("Invalid URL for this field") return new_value
def is_valid_url(url): validate = URLValidator() try: validate(url) return True except ValidationError: return False
def GitURLField(**kwargs): r = models.URLField(**kwargs) for i in range(len(r.validators)): if isinstance(r.validators[i], validators.URLValidator): r.validators[i] = GitURLValidator() return r
def validate_default(self, parts, verify_exists=False): """ Validation for FTP, FTPS, HTTP, and HTTPS scehems. When `verify_exists` is set to True, this validator will make HEAD requests for the URL and will return False if the URL returns a status outside of the range of 200 >= «status» > 400. :param parts: :param verify_exists: :return: """ validator = URLValidator() if not parts['netloc']: # If there is no host/port, then this may be a link to a local # resource (media or static asset, etc.) Use the provided default. parts['netloc'] = self.netloc url = urlunparse(parts.values()) try: validator(url) except ValidationError: return False else: if verify_exists: try: response = urlopen(HeadRequest(url)) # NOTE: urllib should have already resolved any 301/302s return 200 <= response.code < 400 # pragma: no cover except (HTTPError, URLError, BadStatusLine, UnicodeEncodeError): return False else: return True
def do_final_redirect(state, loggedin, msg): """ As final step in the oauth callback process, redirect the user either to the api root, or if there was an original_url to indicate where the user was when they started the oauth process, move them back to that url instead. This redirect is accompanied by a URL query pair "loggedin=..." which can either be 'true' or 'false', and can be used to determine whether the login attempd succeeded or not. """ redirectUrl = '/' # Do we need to redirect the user to some explicit URL after login? try: validator = URLValidator() validator(state) redirectUrl = state except ValidationError: pass # Add the result of the login attempt to the redirect URL as query pair if '?' in redirectUrl: redirectUrl += '&' else: redirectUrl += '?' redirectUrl += 'loggedin=' + str(loggedin) return redirect(redirectUrl) # API Route: /oauth2callback (Redirects to / on success)
def test_enketo_remote_server(self): if not self._running_enketo(): raise SkipTest with HTTMock(enketo_mock): server_url = 'https://testserver.com/bob' form_id = "test_%s" % re.sub(re.compile("\."), "_", str(time())) url = enketo_url(server_url, form_id) self.assertIsInstance(url, basestring) self.assertIsNone(URLValidator()(url))
def __init__(self, schemes=('http', 'https'), **kwargs): if 'validators' not in kwargs: kwargs['validators'] = (validators.URLValidator(schemes=schemes),) super(URLField, self).__init__(**kwargs)
def clean(self): super(ItemForm, self).clean() if 'typ' in self.cleaned_data and self.cleaned_data['typ'] == 'U': url = self.cleaned_data['content'] if '://' not in url and not url.startswith('http'): url = 'http://' + url self.cleaned_data['content'] = url v = URLValidator() v(url)
def clean(self): cleaned_data = super(ServiceMetaForm, self).clean() try: validator = validators.URLValidator() validator(cleaned_data['value']) self.instance.is_url = True except ValidationError as e: if self.instance.type in (self.instance.META_WEBSITE, self.instance.META_DOC, self.instance.META_DOWNLOAD): raise e
def _check_remote_availability(self, config): """ Check if remote ABACUS environment is ready. """ try: validator = URLValidator() for url in config.get('ABACUS_REMOTE_SERVERS', ['']): validator(url) return True except ValidationError: return False
def is_url_valid(url): # http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not from django.core.validators import URLValidator from django.core.exceptions import ValidationError val = URLValidator() try: val(url) except ValidationError: return False else: return True
def exif_orientation(im): try: exif = im._getexif() except Exception: # There are many ways that _getexif fails, we're just going to blanket # cover them all. return im if exif is None: return im orientation = exif.get(0x0112) if orientation == 2: im = im.transpose(Image.FLIP_LEFT_RIGHT) elif orientation == 3: im = im.transpose(Image.ROTATE_180) elif orientation == 4: im = im.transpose(Image.FLIP_TOP_BOTTOM) elif orientation == 5: im = im.transpose(Image.ROTATE_270).transpose(Image.FLIP_LEFT_RIGHT) elif orientation == 6: im = im.transpose(Image.ROTATE_270) elif orientation == 7: im = im.transpose(Image.ROTATE_90).transpose(Image.FLIP_LEFT_RIGHT) elif orientation == 8: im = im.transpose(Image.ROTATE_90) return im # Low-tech approach to work around the too strict URLValidator. # Context: https://code.djangoproject.com/ticket/20264 # replace() isn't super elegant, but I prefer this to having to copy/paste the whole big regexp # soup from URLValidator so that I can add one underscore...
def clean(self): csv_file = self.cleaned_data.get('csv_file') csv_data = '' if csv_file: csv_data = csv_file.read().decode('utf-8', errors='replace') self.cleaned_data['csv_data'] = csv_data elif self.cleaned_data['csv_data']: csv_data = self.cleaned_data['csv_data'] csv_file = io.StringIO(csv_data) reader = csv.reader(csv_file, delimiter=';') try: table_header = next(reader) except StopIteration: self.add_error('csv_data', forms.ValidationError(_('Invalid CSV header.'), code='invalid_csv_header')) return try: table = [] for line in reader: table.append(line) except csv.Error: self.add_error('csv_data', ValidationError(_('Invalid CSV data.'), code='invalid_csv')) return last_index = len(table_header) - 1 for last_index in range(last_index, 0, -1): if table_header[last_index]: break table_header = table_header[:last_index + 1] for i in range(len(table_header)): if not table_header[i]: table_header[i] = _('Unknown column {}').format(i) table_header[0] = _('URL') invalid_rows = set() num_columns = len(table_header) validate_url = URLValidator(schemes=('http', 'https')) for i in range(len(table)): row = table[i] if len(row) > num_columns: row = row[:num_columns] for j in range(num_columns - len(row)): row.append('') if row[0] and not row[0].startswith(('http', 'https')): row[0] = 'http://' + row[0] try: validate_url(row[0]) except ValidationError: invalid_rows.add(i) table[i] = row self._table_header = table_header self._table = table self._invalid_rows = invalid_rows
def is_url(context, value, original_value): from django.core.validators import URLValidator from django.core.exceptions import ValidationError from django.utils.text import ugettext_lazy as _ validate_url = URLValidator() try: validate_url(value) except ValidationError as e: raise ValidationError(_('This field must be a valid url')) return value
def get_urls( lines ): """ validates text lines containing EventUrl entries, raising ValidationErrors if there are errors, otherwise it returns a dictionary with names and urls (both unicode objects). If ``line[0]`` is not empty it is the default url. """ urls = {} # keys are url-names, values are urls if lines[0].strip(): urls['url'] = lines[0].strip() # default url if len(lines) > 1: field_p = re.compile(r"^\s+(.*)\s+(.+?)\s*$") for line in lines[1:]: field_m = field_p.match(line) if not field_m: empty_line_p = re.compile("^\s*$") if empty_line_p.match(line): raise ValidationError( _(u"an unexpected empty line was found.")) raise ValidationError( _(u"the following line is malformed: ") + line) name = field_m.group(1) if urls.has_key( name ): raise ValidationError( _('found more than one url with the same name: ' \ u'%(name)s') % {'name': name} ) urls[name] = field_m.group(2) errors = [] url_validators = EventUrl._meta.get_field('url').validators url_validators.append(URLValidator()) url_name_validators = \ EventUrl._meta.get_field('url_name').validators for url_name, url in urls.items(): for val in url_name_validators: try: val( url_name ) except ValidationError, e: errors.append( _('Error in url name %(url_name)s') % {'url_name': url_name,} ) errors.extend( e.messages ) for val in url_validators: try: val(url) except ValidationError, e: errors.append( _('Error in url %(url)s') % {'url': url,} ) errors.extend( e.messages ) if errors: raise ValidationError( errors ) return urls
def to_internal_value(self, base64_data): # Check if this is a base64 string if base64_data in EMPTY_VALUES: return None try: # Strip out the base64 prefix sent by browsers base64_data = re.sub(r"^data\:.+base64\,(.+)$", r"\1", base64_data) except TypeError: # This is already a file object (i.e. standard file # was uploaded in stead of base64 string). return super(Base64FileField, self).to_internal_value(base64_data) try: # If this is a valid URL, then assume that the file has not changed. # This was implemented to fix a prod only error, where patching a # PDF page without changing the pdf_file field caused an error # (as it was treated as a base64 string). Only happens in prod # because S3 is used there for media storage. val = URLValidator() val(base64_data) raise SkipField() except ValidationError: pass if isinstance(base64_data, six.string_types): # Try to decode the file. Return validation error if it fails. try: decoded_file = base64.b64decode(base64_data) except (TypeError, binascii.Error): raise ValidationError(_("Please upload a valid file.")) # Generate file name. # 12 characters are more than enough. file_name = str(uuid.uuid4())[:12] # Get the file name extension. file_extension = self.get_file_extension(file_name, decoded_file) if not self.is_valid(file_name, decoded_file, file_extension): raise ValidationError(_("The type of the file could not be determined.")) complete_file_name = file_name + "." + file_extension data = ContentFile(decoded_file, name=complete_file_name) return super(Base64FileField, self).to_internal_value(data) raise ValidationError(_('This is not a base64 string'))