我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.encoding.force_str()。
def url_replace_param(url, name, value): """ Replace a GET parameter in an URL """ url_components = urlparse(force_str(url)) params = parse_qs(url_components.query) if value is None: del params[name] else: params[name] = value return mark_safe(urlunparse([ url_components.scheme, url_components.netloc, url_components.path, url_components.params, urlencode(params, doseq=True), url_components.fragment, ]))
def get_connection_params(self): kwargs = { 'conv': django_conversions, 'charset': 'utf8', } if six.PY2: kwargs['use_unicode'] = True settings_dict = self.settings_dict if settings_dict['USER']: kwargs['user'] = settings_dict['USER'] if settings_dict['NAME']: kwargs['db'] = settings_dict['NAME'] if settings_dict['PASSWORD']: kwargs['passwd'] = force_str(settings_dict['PASSWORD']) if settings_dict['HOST'].startswith('/'): kwargs['unix_socket'] = settings_dict['HOST'] elif settings_dict['HOST']: kwargs['host'] = settings_dict['HOST'] if settings_dict['PORT']: kwargs['port'] = int(settings_dict['PORT']) # We need the number of potentially affected rows after an # "UPDATE", not the number of changed rows. kwargs['client_flag'] = CLIENT.FOUND_ROWS kwargs.update(settings_dict['OPTIONS']) return kwargs
def get_connection_params(self): settings_dict = self.settings_dict # None may be used to connect to the default 'postgres' db if settings_dict['NAME'] == '': raise ImproperlyConfigured( "settings.DATABASES is improperly configured. " "Please supply the NAME value.") conn_params = { 'database': settings_dict['NAME'] or 'postgres', } conn_params.update(settings_dict['OPTIONS']) conn_params.pop('isolation_level', None) if settings_dict['USER']: conn_params['user'] = settings_dict['USER'] if settings_dict['PASSWORD']: conn_params['password'] = force_str(settings_dict['PASSWORD']) if settings_dict['HOST']: conn_params['host'] = settings_dict['HOST'] if settings_dict['PORT']: conn_params['port'] = settings_dict['PORT'] return conn_params
def post_raw_data(self, path, post_data): """ The built-in test client's post() method assumes the data you pass is a dictionary and encodes it. If we just want to pass the data unmodified, we need our own version of post(). """ parsed = urlparse(path) r = { 'CONTENT_LENGTH': len(post_data), 'CONTENT_TYPE': "text/plain", 'PATH_INFO': self.client._get_path(parsed), 'QUERY_STRING': force_str(parsed[4]), 'REQUEST_METHOD': str('POST'), 'wsgi.input': FakePayload(post_data), } # Add the request signature to the headers being sent. r.update(self.get_signature(path, method='POST', body=post_data)) # Make the actual request. return self.client.request(**r)
def render(self, name, value, attrs=None): if value is None: return format_html( '<p class="text-error">{text}</p>', text=_('Populate form fields above'), ) final_attrs = self.build_attrs(attrs) href = smart_urlquote(value) text = self.text or href return format_html( '<p><a href="{href}" {attrs}>{text}</a></p>', href=href, attrs=flatatt(final_attrs), text=force_str(text), )
def test_sign_unsign(self): """sign/unsign should be reversible""" signer = signing.Signer('predictable-secret') examples = [ 'q;wjmbk;wkmb', '3098247529087', '3098247:529:087:', 'jkw osanteuh ,rcuh nthu aou oauh ,ud du', '\u2019', ] if six.PY2: examples.append(b'a byte string') for example in examples: signed = signer.sign(example) self.assertIsInstance(signed, str) self.assertNotEqual(force_str(example), signed) self.assertEqual(example, signer.unsign(signed))
def test_sign_unsign(self): """sign/unsign should be reversible""" signer = signing.BytesSigner('predictable-secret') examples = [ b'q;wjmbk;wkmb', b'3098247529087', b'3098247:529:087:', b'jkw osanteuh ,rcuh nthu aou oauh ,ud du', b'\u2019', ] if six.PY2: examples.append(b'a byte string') for example in examples: signed = signer.sign(example) self.assertIsInstance(signed, six.binary_type) self.assertNotEqual(force_str(example), signed) self.assertEqual(example, signer.unsign(signed))
def modify_query(context, *params_to_remove, **params_to_change): """ Renders a link with modified current query parameters """ query_params = [] for key, value_list in context["request"].GET._iterlists(): if not key in params_to_remove: # don"t add key-value pairs for params to change if key in params_to_change: query_params.append((key, params_to_change[key])) params_to_change.pop(key) else: # leave existing parameters as they were if not mentioned in the params_to_change for value in value_list: query_params.append((key, value)) # attach new params for key, value in params_to_change.items(): query_params.append((key, value)) query_string = context["request"].path if len(query_params): query_string += "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def add_to_query(context, *params_to_remove, **params_to_add): """ Renders a link with modified current query parameters """ query_params = [] # go through current query params.. for key, value_list in context["request"].GET._iterlists(): if not key in params_to_remove: # don"t add key-value pairs which already exist in the query if key in params_to_add and unicode(params_to_add[key]) in value_list: params_to_add.pop(key) for value in value_list: query_params.append((key, value)) # add the rest key-value pairs for key, value in params_to_add.items(): query_params.append((key, value)) # empty values will be removed query_string = context["request"].path if len(query_params): query_string += "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def remove_from_query(context, *args, **kwargs): """ Renders a link with modified current query parameters """ query_params = [] # go through current query params.. for key, value_list in context["request"].GET._iterlists(): # skip keys mentioned in the args if not key in args: for value in value_list: # skip key-value pairs mentioned in kwargs if not (key in kwargs and unicode(value) == unicode(kwargs[key])): query_params.append((key, value)) # empty values will be removed query_string = context["request"].path if len(query_params): query_string = "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def modify_query(context, *params_to_remove, **params_to_change): """ Renders a link with modified current query parameters """ query_params = [] for key, value_list in context["request"].GET._iterlists(): if not key in params_to_remove: # don't add key-value pairs for params to change if key in params_to_change: query_params.append((key, params_to_change[key])) params_to_change.pop(key) else: # leave existing parameters as they were if not mentioned in the params_to_change for value in value_list: query_params.append((key, value)) # attach new params for key, value in params_to_change.items(): query_params.append((key, value)) query_string = context["request"].path if len(query_params): query_string += "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def modify_query(context, *params_to_remove, **params_to_change): """ Renders a link with modified current query parameters """ query_params = [] for key, value_list in context['request'].GET._iterlists(): if not key in params_to_remove: # don't add key-value pairs for params to change if key in params_to_change: query_params.append((key, params_to_change[key])) params_to_change.pop(key) else: # leave existing parameters as they were if not mentioned in the params_to_change for value in value_list: query_params.append((key, value)) # attach new params for key, value in params_to_change.items(): query_params.append((key, value)) query_string = context['request'].path if len(query_params): query_string += "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def add_to_query(context, *params_to_remove, **params_to_add): """ Renders a link with modified current query parameters """ query_params = [] # go through current query params.. for key, value_list in context['request'].GET._iterlists(): if not key in params_to_remove: # don't add key-value pairs which already exist in the query if key in params_to_add and unicode(params_to_add[key]) in value_list: params_to_add.pop(key) for value in value_list: query_params.append((key, value)) # add the rest key-value pairs for key, value in params_to_add.items(): query_params.append((key, value)) # empty values will be removed query_string = context['request'].path if len(query_params): query_string += "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def remove_from_query(context, *args, **kwargs): """ Renders a link with modified current query parameters """ query_params = [] # go through current query params.. for key, value_list in context['request'].GET._iterlists(): # skip keys mentioned in the args if not key in args: for value in value_list: # skip key-value pairs mentioned in kwargs if not (key in kwargs and unicode(value) == unicode(kwargs[key])): query_params.append((key, value)) # empty values will be removed query_string = context['request'].path if len(query_params): query_string = "?%s" % urllib.urlencode([ (key, force_str(value)) for (key, value) in query_params if value ]).replace("&", "&") return query_string
def __str__(self): from django.db import models if self.obj is None: obj = "?" elif isinstance(self.obj, models.base.ModelBase): # We need to hardcode ModelBase and Field cases because its __str__ # method doesn't return "applabel.modellabel" and cannot be changed. model = self.obj app = model._meta.app_label obj = '%s.%s' % (app, model._meta.object_name) else: obj = force_str(self.obj) id = "(%s) " % self.id if self.id else "" hint = "\n\tHINT: %s" % self.hint if self.hint else '' return "%s: %s%s%s" % (obj, id, self.msg, hint)
def path_and_rename(instance, filename): ext = filename.split('.')[-1] if instance.pk: filename = '{}.{}'.format(instance.pk, ext) else: filename = '{}.{}'.format(uuid4().hex, ext) dirname = force_text(datetime.datetime.now().strftime(force_str('uploads/%Y/%m/%d/'))) return os.path.join(dirname, filename)
def _mjml_render_by_tcpserver(mjml_code): if len(mjml_settings.MJML_TCPSERVERS) > 1: servers = list(mjml_settings.MJML_TCPSERVERS)[:] random.shuffle(servers) else: servers = mjml_settings.MJML_TCPSERVERS mjml_code = mjml_code.encode('utf8') or ' ' s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for host, port in servers: try: s.connect((host, port)) except socket.error: continue try: s.send(mjml_code) ok = force_str(s.recv(1)) == '0' result_len = int(force_str(s.recv(9))) result = force_str(s.recv(result_len)) if ok: return result else: raise RuntimeError('MJML compile error (via MJML TCP server): {}'.format(result)) finally: s.close() raise RuntimeError('MJML compile error (via MJML TCP server): no working server')
def strptime(self, value, format): return datetime.datetime.strptime(force_str(value), format).date()
def strptime(self, value, format): return datetime.datetime.strptime(force_str(value), format).time()
def strptime(self, value, format): return datetime.datetime.strptime(force_str(value), format)
def __repr__(self): return force_str('<%s %s %s>' % (self.__class__.__name__, self.name, self.regex.pattern))
def __repr__(self): return force_str("<%s: %s (%s)>" % ( self.__class__.__name__, self.name, self.content_type))
def signature(self, value): signature = base64_hmac(self.salt + 'signer', value, self.key) # Convert the signature from bytes to str only on Python 3 return force_str(signature)
def sign(self, value): value = force_str(value) return str('%s%s%s') % (value, self.sep, self.signature(value))
def unsign(self, signed_value): signed_value = force_str(signed_value) if self.sep not in signed_value: raise BadSignature('No "%s" found in value' % self.sep) value, sig = signed_value.rsplit(self.sep, 1) if constant_time_compare(sig, self.signature(value)): return force_text(value) raise BadSignature('Signature "%s" does not match' % sig)
def sign(self, value): value = force_str(value) value = str('%s%s%s') % (value, self.sep, self.timestamp()) return super(TimestampSigner, self).sign(value)
def write(self, msg, style_func=None, ending=None): ending = self.ending if ending is None else ending if ending and not msg.endswith(ending): msg += ending style_func = style_func or self.style_func self._out.write(force_str(style_func(msg)))
def __str__(self): from django.db import models if self.obj is None: obj = "?" elif isinstance(self.obj, models.base.ModelBase): # We need to hardcode ModelBase and Field cases because its __str__ # method doesn't return "applabel.modellabel" and cannot be changed. obj = self.obj._meta.label else: obj = force_str(self.obj) id = "(%s) " % self.id if self.id else "" hint = "\n\tHINT: %s" % self.hint if self.hint else '' return "%s: %s%s%s" % (obj, id, self.msg, hint)
def __call__(self, environ, start_response): # Set up middleware if needed. We couldn't do this earlier, because # settings weren't available. if self._request_middleware is None: with self.initLock: try: # Check that middleware is still uninitialized. if self._request_middleware is None: self.load_middleware() except: # Unload whatever middleware we got self._request_middleware = None raise set_script_prefix(get_script_name(environ)) signals.request_started.send(sender=self.__class__, environ=environ) try: request = self.request_class(environ) except UnicodeDecodeError: logger.warning('Bad Request (UnicodeDecodeError)', exc_info=sys.exc_info(), extra={ 'status_code': 400, } ) response = http.HttpResponseBadRequest() else: response = self.get_response(request) response._handler_class = self.__class__ status = '%s %s' % (response.status_code, response.reason_phrase) response_headers = [(str(k), str(v)) for k, v in response.items()] for c in response.cookies.values(): response_headers.append((str('Set-Cookie'), str(c.output(header='')))) start_response(force_str(status), response_headers) if getattr(response, 'file_to_stream', None) is not None and environ.get('wsgi.file_wrapper'): response = environ['wsgi.file_wrapper'](response.file_to_stream) return response
def make_key(self, key, version=None): # Python 2 memcache requires the key to be a byte string. return force_str(super(BaseMemcachedCache, self).make_key(key, version))
def _get_pass(self, prompt="Password: "): p = getpass.getpass(prompt=force_str(prompt)) if not p: raise CommandError("aborted") return p
def encode(self, password, salt): crypt = self._load_library() assert len(salt) == 2 data = crypt.crypt(force_str(password), salt) # we don't need to store the salt, but Django used to do this return "%s$%s$%s" % (self.algorithm, '', data)
def load(self, rawdata): self.bad_cookies = set() if six.PY2 and isinstance(rawdata, six.text_type): rawdata = force_str(rawdata) super(SimpleCookie, self).load(rawdata) for key in self.bad_cookies: del self[key] # override private __set() method: # (needed for using our Morsel, and for laxness with CookieError
def _BaseCookie__set(self, key, real_value, coded_value): key = force_str(key) try: M = self.get(key, Morsel()) M.set(key, real_value, coded_value) dict.__setitem__(self, key, M) except http_cookies.CookieError: if not hasattr(self, 'bad_cookies'): self.bad_cookies = set() self.bad_cookies.add(key) dict.__setitem__(self, key, http_cookies.Morsel())
def __repr__(self): try: u = six.text_type(self) except (UnicodeEncodeError, UnicodeDecodeError): u = '[Bad Unicode data]' return force_str('<%s: %s>' % (self.__class__.__name__, u))
def _get_path(self, parsed): path = force_str(parsed[2]) # If there are parameters, add them if parsed[3]: path += str(";") + force_str(parsed[3]) path = uri_to_iri(path).encode(UTF_8) # Under Python 3, non-ASCII values in the WSGI environ are arbitrarily # decoded with ISO-8859-1. We replicate this behavior here. # Refs comment in `get_bytes_from_wsgi()`. return path.decode(ISO_8859_1) if six.PY3 else path
def generic(self, method, path, data='', content_type='application/octet-stream', secure=False, **extra): """Constructs an arbitrary HTTP request.""" parsed = urlparse(force_str(path)) data = force_bytes(data, settings.DEFAULT_CHARSET) r = { 'PATH_INFO': self._get_path(parsed), 'REQUEST_METHOD': str(method), 'SERVER_PORT': str('443') if secure else str('80'), 'wsgi.url_scheme': str('https') if secure else str('http'), } if data: r.update({ 'CONTENT_LENGTH': len(data), 'CONTENT_TYPE': str(content_type), 'wsgi.input': FakePayload(data), }) r.update(extra) # If QUERY_STRING is absent or empty, we want to extract it from the URL. if not r.get('QUERY_STRING'): query_string = force_bytes(parsed[4]) # WSGI requires latin-1 encoded strings. See get_path_info(). if six.PY3: query_string = query_string.decode('iso-8859-1') r['QUERY_STRING'] = query_string return self.request(**r)
def get_runner(settings, test_runner_class=None): if not test_runner_class: test_runner_class = settings.TEST_RUNNER test_path = test_runner_class.split('.') # Allow for Python 2.5 relative paths if len(test_path) > 1: test_module_name = '.'.join(test_path[:-1]) else: test_module_name = '.' test_module = __import__(test_module_name, {}, {}, force_str(test_path[-1])) test_runner = getattr(test_module, test_path[-1]) return test_runner
def get_format(format_type, lang=None, use_l10n=None): """ For a specific format type, returns the format for the current language (locale), defaults to the format in the settings. format_type is the name of the format, e.g. 'DATE_FORMAT' If use_l10n is provided and is not None, that will force the value to be localized (or not), overriding the value of settings.USE_L10N. """ format_type = force_str(format_type) if use_l10n or (use_l10n is None and settings.USE_L10N): if lang is None: lang = get_language() cache_key = (format_type, lang) try: cached = _format_cache[cache_key] if cached is not None: return cached except KeyError: for module in get_format_modules(lang): try: val = getattr(module, format_type) for iso_input in ISO_INPUT_FORMATS.get(format_type, ()): if iso_input not in val: if isinstance(val, tuple): val = list(val) val.append(iso_input) _format_cache[cache_key] = val return val except AttributeError: pass _format_cache[cache_key] = None if format_type not in FORMAT_SETTINGS: return format_type # Return the general setting by default return getattr(settings, format_type)
def urlquote(url, safe='/'): """ A version of Python's urllib.quote() function that can operate on unicode strings. The url is first UTF-8 encoded before quoting. The returned string can safely be used as part of an argument to a subsequent iri_to_uri() call without double-quoting occurring. """ return force_text(quote(force_str(url), force_str(safe)))
def urlquote_plus(url, safe=''): """ A version of Python's urllib.quote_plus() function that can operate on unicode strings. The url is first UTF-8 encoded before quoting. The returned string can safely be used as part of an argument to a subsequent iri_to_uri() call without double-quoting occurring. """ return force_text(quote_plus(force_str(url), force_str(safe)))
def urlunquote(quoted_url): """ A wrapper for Python's urllib.unquote() function that can operate on the result of django.utils.http.urlquote(). """ return force_text(unquote(force_str(quoted_url)))
def urlunquote_plus(quoted_url): """ A wrapper for Python's urllib.unquote_plus() function that can operate on the result of django.utils.http.urlquote_plus(). """ return force_text(unquote_plus(force_str(quoted_url)))
def urlencode(query, doseq=0): """ A version of Python's urllib.urlencode() function that can operate on unicode strings. The parameters are first cast to UTF-8 encoded strings and then encoded as per normal. """ if isinstance(query, MultiValueDict): query = query.lists() elif hasattr(query, 'items'): query = query.items() return original_urlencode( [(force_str(k), [force_str(i) for i in v] if isinstance(v, (list, tuple)) else force_str(v)) for k, v in query], doseq)
def _date_from_string(year, year_format, month='', month_format='', day='', day_format='', delim='__'): """ Helper: get a datetime.date object given a format string and a year, month, and day (only year is mandatory). Raise a 404 for an invalid date. """ format = delim.join((year_format, month_format, day_format)) datestr = delim.join((year, month, day)) try: return datetime.datetime.strptime(force_str(datestr), format).date() except ValueError: raise Http404(_("Invalid date string '%(datestr)s' given format '%(format)s'") % { 'datestr': datestr, 'format': format, })