Python django.core.validators 模块,URLValidator() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用django.core.validators.URLValidator()

项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def get_http_headers(url):
    """
        Get HTTP headers for given url
    """
    if not url:
        raise BadRequest('Missing url')

    url = _get_deobfuscate_item(url)
    try:
        validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp'))
        validate(url)
    except ValidationError:
        raise BadRequest('Not a valid URL')

    try:
        response = ImplementationFactory.instance.get_singleton_of(
            'PhishingServiceBase'
        ).get_http_headers(url)
        schema.valid_adapter_response('PhishingServiceBase', 'get_http_headers', response)
        return response
    except (PhishingServiceException, schema.InvalidFormatError, schema.SchemaNotFound) as ex:
        raise InternalServerError(str(ex))
项目:promgen    作者:line    | 项目源码 | 文件源码
def handle(self, **kwargs):
        self.write('Bootstrapping Promgen')

        if not os.path.exists(settings.CONFIG_DIR):
            self.write('Creating config directory {} ', settings.CONFIG_DIR)
            os.makedirs(settings.CONFIG_DIR)

        if not os.path.exists(settings.PROMGEN_CONFIG):
            path = os.path.join(settings.BASE_DIR, 'promgen', 'tests', 'examples', 'promgen.yml')
            self.write('Creating promgen config {} from {}', settings.PROMGEN_CONFIG, path)
            shutil.copy(path, settings.PROMGEN_CONFIG)

        self.write_setting('SECRET_KEY', default=settings.SECRET_KEY)
        self.write_setting('DATABASE_URL', test=dj_database_url.parse)
        # Schemes based on list of supported brokers
        # http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
        self.write_setting('CELERY_BROKER_URL', test=URLValidator(schemes=['redis', 'amqp', 'sqs']))
项目:Squozy    作者:Remeic    | 项目源码 | 文件源码
def api_url_request(request):
    if request.method=="POST":
        url_verification=URLValidator()
        post_url=request.POST.get('url');
        result=UrlShrinked(url="")
        if post_url:
            try:
                url_verification(post_url)
                result.url=post_url
                result.save()
                result.publish()
            except Exception as e:
                result.url="url_invalid"
                result.shrinked_code="url_invalid"
        else:
            result.url="url_empty"
            result.shrinked_code="url_empty"
    object_response={'url':result.url,'shrink':result.shrinked_code}
    return HttpResponse(JsonResponse(object_response), content_type="application/json")

#Search url using shrinked code, private function, TO MOVE
项目:kuzgun.io    作者:yigitgenc    | 项目源码 | 文件源码
def create(self, request, *args, **kwargs):
        """
        Creates a Torrent object links it to the user.
        Validates `url` query parameter whether is a magnet/torrent link or not.

        :return: Response
        """
        link = request.data.get('link', '').strip()
        url_validator = URLValidator()

        try:
            if not link.startswith('magnet:'):
                url_validator(link)
                if not link.endswith('.torrent'):
                    raise ValidationError("Invalid torrent URL.")
        except ValidationError as e:
            return Response({'detail': e}, status=status.HTTP_400_BAD_REQUEST)

        torrent = transmission.add_torrent(link)
        torrent_model, created = Torrent.objects.get_or_create(
            hash=torrent.hashString,
            defaults={
                'name': torrent.name,
                'private': torrent.isPrivate,
            },
        )

        request.user.torrents.add(torrent_model)

        return Response(
            self.serializer_class(torrent_model).data,
            status=created and status.HTTP_201_CREATED or status.HTTP_200_OK
        )
项目:odmtp-tpf    作者:benjimor    | 项目源码 | 文件源码
def __init__(self, page, subject, predicate, obj):
        """Turn http parameters into a triple pattern query object using RDFlib objects."""
        self.page = int(page)
        self.subject = URIRef(subject) if subject else None
        self.predicate = URIRef(predicate) if predicate else None
        self.obj = URIRef(obj) if obj else None
        if self.obj is not None:
            validator = URLValidator()
            try:
                validator(self.obj)
                self.obj = URIRef(self.obj)
            except ValidationError:
                self.obj = string_to_literal(self.obj, validator)
项目:DCRM    作者:82Flex    | 项目源码 | 文件源码
def validate_bugs(value):
    """
    Inherits from a Built-in URLValidator
    """
    return bugs_validator(value)
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def get_url_hostname(url):
    """
        Try to get domain for an url

        :param str url: The url to extract hostname
        :rtype: str
        :return: the hostname or None
    """
    try:
        validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp'))
        validate(url)
    except (ValueError, ValidationError):
        return None

    parsed = urlparse(url)
    return parsed.hostname
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def get_url_external_reputation(url):
    """
        External check for url
    """
    try:
        validate = URLValidator()
        validate(url)
    except (ValueError, ValidationError):
        raise BadRequest('Not a valid URL')

    results = []

    if ImplementationFactory.instance.is_implemented('ReputationDaoBase'):
        try:
            results = ImplementationFactory.instance.get_singleton_of(
                'ReputationDaoBase'
            ).get_url_external_reputations(url)
        except ReputationDaoException:
            pass

    return results
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def _get_item_ip_hostname_url(item):
    """ Get item infos
    """
    ip_addr = hostname = url = None
    try:
        validate = URLValidator(schemes=('http', 'https', 'ftp', 'ftps', 'rtsp', 'rtmp'))
        validate(item['rawItem'])
        item['itemType'] = 'URL'
        url = item['rawItem']
    except ValidationError:
        try:
            validate_ipv46_address(item['rawItem'])
            item['itemType'] = 'IP'
        except ValidationError:
            item['itemType'] = 'FQDN'

    if item['itemType'] == 'URL':
        hostname = utils.get_url_hostname(item['rawItem'])
        ips = utils.get_ips_from_url(item['rawItem'])
        if ips:
            ip_addr = ips[0]
    elif item['itemType'] == 'IP':
        item['itemType'] = 'IP'
        ip_addr = item['rawItem']
    elif item['itemType'] == 'FQDN':
        hostname = item['rawItem']
        ips = utils.get_ips_from_fqdn(item['rawItem'])
        if ips:
            ip_addr = ips[0]

    return ip_addr, hostname, url
项目:Try-Django-1.10    作者:codingforentrepreneurs    | 项目源码 | 文件源码
def validate_url(value):
    url_validator = URLValidator()
    reg_val = value
    if "http" in reg_val:
        new_value = reg_val
    else:
        new_value = 'http://' + value
    try:
        url_validator(new_value)
    except:
        raise ValidationError("Invalid URL for this field")
    return new_value
项目:doork    作者:AeonDave    | 项目源码 | 文件源码
def is_valid_url(url):
    validate = URLValidator()
    try:
        validate(url)
        return True
    except ValidationError:
        return False
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def GitURLField(**kwargs):
    r = models.URLField(**kwargs)
    for i in range(len(r.validators)):
        if isinstance(r.validators[i], validators.URLValidator):
            r.validators[i] = GitURLValidator()
    return r
项目:djangocms-link-manager    作者:divio    | 项目源码 | 文件源码
def validate_default(self, parts, verify_exists=False):
        """
        Validation for FTP, FTPS, HTTP, and HTTPS scehems.
        When `verify_exists` is set to True, this validator will make HEAD
        requests for the URL and will return False if the URL returns a status
        outside of the range of 200 >= «status» > 400.

        :param parts:
        :param verify_exists:
        :return:
        """
        validator = URLValidator()
        if not parts['netloc']:
            # If there is no host/port, then this may be a link to a local
            # resource (media or static asset, etc.) Use the provided default.
            parts['netloc'] = self.netloc
        url = urlunparse(parts.values())
        try:
            validator(url)
        except ValidationError:
            return False
        else:
            if verify_exists:
                try:
                    response = urlopen(HeadRequest(url))
                    # NOTE: urllib should have already resolved any 301/302s
                    return 200 <= response.code < 400  # pragma: no cover
                except (HTTPError, URLError, BadStatusLine, UnicodeEncodeError):
                    return False
            else:
                return True
项目:network-pulse-api    作者:mozilla    | 项目源码 | 文件源码
def do_final_redirect(state, loggedin, msg):
    """
    As final step in the oauth callback process, redirect the user either to
    the api root, or if there was an original_url to indicate where the user
    was when they started the oauth process, move them back to that url instead.

    This redirect is accompanied by a URL query pair "loggedin=..." which can
    either be 'true' or 'false', and can be used to determine whether the login
    attempd succeeded or not.
    """
    redirectUrl = '/'

    # Do we need to redirect the user to some explicit URL after login?
    try:
        validator = URLValidator()
        validator(state)
        redirectUrl = state
    except ValidationError:
        pass

    # Add the result of the login attempt to the redirect URL as query pair
    if '?' in redirectUrl:
        redirectUrl += '&'
    else:
        redirectUrl += '?'
    redirectUrl += 'loggedin=' + str(loggedin)

    return redirect(redirectUrl)


# API Route: /oauth2callback (Redirects to / on success)
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def test_enketo_remote_server(self):
        if not self._running_enketo():
            raise SkipTest
        with HTTMock(enketo_mock):
            server_url = 'https://testserver.com/bob'
            form_id = "test_%s" % re.sub(re.compile("\."), "_", str(time()))
            url = enketo_url(server_url, form_id)
            self.assertIsInstance(url, basestring)
            self.assertIsNone(URLValidator()(url))
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def __init__(self, schemes=('http', 'https'), **kwargs):
        if 'validators' not in kwargs:
            kwargs['validators'] = (validators.URLValidator(schemes=schemes),)
        super(URLField, self).__init__(**kwargs)
项目:lasernotes    作者:msipos    | 项目源码 | 文件源码
def clean(self):
        super(ItemForm, self).clean()

        if 'typ' in self.cleaned_data and self.cleaned_data['typ'] == 'U':
            url = self.cleaned_data['content']
            if '://' not in url and not url.startswith('http'):
                url = 'http://' + url
                self.cleaned_data['content'] = url
            v = URLValidator()
            v(url)
项目:waves-demo    作者:lirmm    | 项目源码 | 文件源码
def clean(self):
        cleaned_data = super(ServiceMetaForm, self).clean()
        try:
            validator = validators.URLValidator()
            validator(cleaned_data['value'])
            self.instance.is_url = True
        except ValidationError as e:
            if self.instance.type in (self.instance.META_WEBSITE, self.instance.META_DOC, self.instance.META_DOWNLOAD):
                raise e
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def _check_remote_availability(self, config):
        """
        Check if remote ABACUS environment is ready.
        """
        try:
            validator = URLValidator()
            for url in config.get('ABACUS_REMOTE_SERVERS', ['']):
                validator(url)

            return True
        except ValidationError:
            return False
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def test_enketo_remote_server(self):
        if not self._running_enketo():
            raise SkipTest
        with HTTMock(enketo_mock):
            server_url = 'https://testserver.com/bob'
            form_id = "test_%s" % re.sub(re.compile("\."), "_", str(time()))
            url = enketo_url(server_url, form_id)
            self.assertIsInstance(url, basestring)
            self.assertIsNone(URLValidator()(url))
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def is_url_valid(url):
    # http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not
    from django.core.validators import URLValidator
    from django.core.exceptions import ValidationError
    val = URLValidator()
    try:
        val(url)
    except ValidationError:
        return False
    else:
        return True
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def is_url_valid(url):
    # http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not
    from django.core.validators import URLValidator
    from django.core.exceptions import ValidationError
    val = URLValidator()
    try:
        val(url)
    except ValidationError:
        return False
    else:
        return True
项目:LinuxBashShellScriptForOps    作者:DingGuodong    | 项目源码 | 文件源码
def is_url_valid(url):
    # http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not
    from django.core.validators import URLValidator
    from django.core.exceptions import ValidationError
    val = URLValidator()
    try:
        val(url)
    except ValidationError:
        return False
    else:
        return True
项目:tn2    作者:hsoft    | 项目源码 | 文件源码
def exif_orientation(im):
    try:
        exif = im._getexif()
    except Exception:
        # There are many ways that _getexif fails, we're just going to blanket
        # cover them all.
        return im
    if exif is None:
        return im
    orientation = exif.get(0x0112)
    if orientation == 2:
        im = im.transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 3:
        im = im.transpose(Image.ROTATE_180)
    elif orientation == 4:
        im = im.transpose(Image.FLIP_TOP_BOTTOM)
    elif orientation == 5:
        im = im.transpose(Image.ROTATE_270).transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 6:
        im = im.transpose(Image.ROTATE_270)
    elif orientation == 7:
        im = im.transpose(Image.ROTATE_90).transpose(Image.FLIP_LEFT_RIGHT)
    elif orientation == 8:
        im = im.transpose(Image.ROTATE_90)
    return im

# Low-tech approach to work around the too strict URLValidator.
# Context: https://code.djangoproject.com/ticket/20264
# replace() isn't super elegant, but I prefer this to having to copy/paste the whole big regexp
# soup from URLValidator so that I can add one underscore...
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def clean(self):
        csv_file = self.cleaned_data.get('csv_file')
        csv_data = ''
        if csv_file:
            csv_data = csv_file.read().decode('utf-8', errors='replace')
            self.cleaned_data['csv_data'] = csv_data
        elif self.cleaned_data['csv_data']:
            csv_data = self.cleaned_data['csv_data']

        csv_file = io.StringIO(csv_data)

        reader = csv.reader(csv_file, delimiter=';')
        try:
            table_header = next(reader)
        except StopIteration:
            self.add_error('csv_data', forms.ValidationError(_('Invalid CSV header.'), code='invalid_csv_header'))
            return

        try:
            table = []
            for line in reader:
                table.append(line)
        except csv.Error:
            self.add_error('csv_data', ValidationError(_('Invalid CSV data.'), code='invalid_csv'))
            return

        last_index = len(table_header) - 1
        for last_index in range(last_index, 0, -1):
            if table_header[last_index]:
                break
        table_header = table_header[:last_index + 1]
        for i in range(len(table_header)):
            if not table_header[i]:
                table_header[i] = _('Unknown column {}').format(i)
        table_header[0] = _('URL')

        invalid_rows = set()
        num_columns = len(table_header)
        validate_url = URLValidator(schemes=('http', 'https'))
        for i in range(len(table)):
            row = table[i]
            if len(row) > num_columns:
                row = row[:num_columns]
            for j in range(num_columns - len(row)):
                row.append('')
            if row[0] and not row[0].startswith(('http', 'https')):
                row[0] = 'http://' + row[0]
            try:
                validate_url(row[0])
            except ValidationError:
                invalid_rows.add(i)
            table[i] = row

        self._table_header = table_header
        self._table = table
        self._invalid_rows = invalid_rows
项目:django-shared-schema-tenants    作者:hugobessa    | 项目源码 | 文件源码
def is_url(context, value, original_value):
    from django.core.validators import URLValidator
    from django.core.exceptions import ValidationError
    from django.utils.text import ugettext_lazy as _
    validate_url = URLValidator()
    try:
        validate_url(value)
    except ValidationError as e:
        raise ValidationError(_('This field must be a valid url'))
    return value
项目:grical    作者:wikical    | 项目源码 | 文件源码
def get_urls( lines ):
        """ validates text lines containing EventUrl entries, raising
        ValidationErrors if there are errors, otherwise it returns a dictionary
        with names and urls (both unicode objects).

        If ``line[0]`` is not empty it is the default url.
        """
        urls = {} # keys are url-names, values are urls
        if lines[0].strip():
            urls['url'] = lines[0].strip() # default url
        if len(lines) > 1:
            field_p = re.compile(r"^\s+(.*)\s+(.+?)\s*$")
            for line in lines[1:]:
                field_m = field_p.match(line)
                if not field_m:
                    empty_line_p = re.compile("^\s*$")
                    if empty_line_p.match(line):
                        raise ValidationError(
                            _(u"an unexpected empty line was found."))
                    raise ValidationError(
                            _(u"the following line is malformed: ") + line)
                name = field_m.group(1)
                if urls.has_key( name ):
                    raise ValidationError(
                            _('found more than one url with the same name: ' \
                                    u'%(name)s') % {'name': name} )
                urls[name] = field_m.group(2)
        errors = []
        url_validators = EventUrl._meta.get_field('url').validators
        url_validators.append(URLValidator())
        url_name_validators = \
                EventUrl._meta.get_field('url_name').validators
        for url_name, url in urls.items():
            for val in url_name_validators:
                try:
                    val( url_name )
                except ValidationError, e:
                    errors.append( _('Error in url name %(url_name)s') %
                            {'url_name': url_name,} )
                    errors.extend( e.messages )
            for val in url_validators:
                try:
                    val(url)
                except ValidationError, e:
                    errors.append( _('Error in url %(url)s') %
                            {'url': url,} )
                    errors.extend( e.messages )
        if errors:
            raise ValidationError( errors )
        return urls
项目:django-simple-api    作者:Myagi    | 项目源码 | 文件源码
def to_internal_value(self, base64_data):
        # Check if this is a base64 string
        if base64_data in EMPTY_VALUES:
            return None

        try:
            # Strip out the base64 prefix sent by browsers
            base64_data = re.sub(r"^data\:.+base64\,(.+)$", r"\1", base64_data)
        except TypeError:
            # This is already a file object (i.e. standard file
            # was uploaded in stead of base64 string).
            return super(Base64FileField, self).to_internal_value(base64_data)

        try:
            # If this is a valid URL, then assume that the file has not changed.
            # This was implemented to fix a prod only error, where patching a
            # PDF page without changing the pdf_file field caused an error
            # (as it was treated as a base64 string). Only happens in prod
            # because S3 is used there for media storage.
            val = URLValidator()
            val(base64_data)
            raise SkipField()
        except ValidationError:
            pass

        if isinstance(base64_data, six.string_types):
            # Try to decode the file. Return validation error if it fails.
            try:
                decoded_file = base64.b64decode(base64_data)
            except (TypeError, binascii.Error):
                raise ValidationError(_("Please upload a valid file."))
            # Generate file name.
            # 12 characters are more than enough.
            file_name = str(uuid.uuid4())[:12]
            # Get the file name extension.
            file_extension = self.get_file_extension(file_name, decoded_file)
            if not self.is_valid(file_name, decoded_file, file_extension):
                raise ValidationError(_("The type of the file could not be determined."))
            complete_file_name = file_name + "." + file_extension
            data = ContentFile(decoded_file, name=complete_file_name)
            return super(Base64FileField, self).to_internal_value(data)
        raise ValidationError(_('This is not a base64 string'))