我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.encoding.force_text()。
def add_sharing_banner(page, response): if not getattr(settings, 'WAGTAILSHARING_BANNER', True): return if hasattr(response, 'render') and callable(response.render): response.render() html = force_text(response.content) body = re.search(r'(?i)<body.*?>', html) if body: endpos = body.end() banner_template_name = 'wagtailsharing/banner.html' banner_template = loader.get_template(banner_template_name) banner_html = banner_template.render() banner_html = force_text(banner_html) content_with_banner = html[:endpos] + banner_html + html[endpos:] response.content = content_with_banner
def message(self): encoding = self.encoding or settings.DEFAULT_CHARSET msg = SafeMIMEText(self.body, self.content_subtype, encoding) msg = self._create_message(msg) msg['Subject'] = self.subject msg['From'] = self.extra_headers.get('From', self.from_email) msg['To'] = self.extra_headers.get('To', ', '.join(map(force_text, self.to))) if self.cc: msg['Cc'] = ', '.join(map(force_text, self.cc)) if self.reply_to: msg['Reply-To'] = self.extra_headers.get('Reply-To', ', '.join(map(force_text, self.reply_to))) # Email header names are case-insensitive (RFC 2045), so we have to # accommodate that when doing comparisons. header_names = [key.lower() for key in self.extra_headers] if 'date' not in header_names: msg['Date'] = formatdate() if 'message-id' not in header_names: # Use cached DNS_NAME for performance msg['Message-ID'] = make_msgid(domain=DNS_NAME) for name, value in self.extra_headers.items(): if name.lower() in ('from', 'to'): # From and To are already handled continue msg[name] = value return msg
def get(self, request, **kwargs): """uid?token?????????.""" token = kwargs.get('token') uidb64 = kwargs.get('uidb64') try: uid = force_text(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): user = None if user and user.is_active: if default_token_generator.check_token(user, token): user.is_active = True user.save() return super().get(request, **kwargs) raise Http404
def get(self, request, *args, **kwargs): try: uid = force_text(urlsafe_base64_decode(kwargs['uid64'])) user = UserModel.objects.get(pk=uid) except Exception as e: logger.info(e) user = None if user is not None and account_activation_token.check_token(user, kwargs['token']): email_address = user.emailaddress_set.first() email_address.verified = True email_address.save() user.backend = 'django.contrib.auth.backends.ModelBackend' login(request, user) # return redirect('home') return HttpResponse('Thank you for your email confirmation. Now you can login your account.') return HttpResponse('Activation link is invalid!')
def render_options(self, *args): """Render only selected options and set QuerySet from :class:`ModelChoiceIterator`.""" try: selected_choices, = args except ValueError: choices, selected_choices = args choices = chain(self.choices, choices) else: choices = self.choices selected_choices = {force_text(v) for v in selected_choices} output = ['<option></option>' if not self.is_required and not self.allow_multiple_selected else ''] if isinstance(self.choices, ModelChoiceIterator): if self.queryset is None: self.queryset = self.choices.queryset selected_choices = {c for c in selected_choices if c not in self.choices.field.empty_values} choices = [(obj.pk, self.label_from_instance(obj)) for obj in self.choices.queryset.filter(pk__in=selected_choices)] else: choices = [(k, v) for k, v in choices if force_text(k) in selected_choices] for option_value, option_label in choices: output.append(self.render_option(selected_choices, option_value, option_label)) return '\n'.join(output)
def label_from_instance(self, obj): """ Return option label representation from instance. Can be overridden to change the representation of each choice. Example usage:: class MyWidget(ModelSelect2Widget): def label_from_instance(obj): return force_text(obj.title).upper() Args: obj (django.db.models.Model): Instance of Django Model. Returns: str: Option label. """ return force_text(obj)
def get_header_name(self, model, field_name): """ Override if a custom value or behaviour is required for specific fields. """ if '__' not in field_name: try: field = model._meta.get_field(field_name) except FieldDoesNotExist as e: if not hasattr(model, field_name): raise e # field_name is a property. return field_name.replace('_', ' ').title() return force_text(field.verbose_name).title() else: related_field_names = field_name.split('__') field = model._meta.get_field(related_field_names[0]) assert field.is_relation return self.get_header_name(field.related_model, '__'.join(related_field_names[1:]))
def _get_volume_types(self): try: volume_types = cinder.volume_type_list(self.request) except Exception: exceptions.handle(self.request, _('Unable to retrieve volume type list.')) # check if we have default volume type so we can present the # description of no volume type differently no_type_description = None if self.default_vol_type is None: message = \ _("If \"No volume type\" is selected, the volume will be " "created without a volume type.") no_type_description = encoding.force_text(message) type_descriptions = [{'name': 'no_type', 'description': no_type_description}] + \ [{'name': type.name, 'description': getattr(type, "description", "")} for type in volume_types] return json.dumps(type_descriptions)
def render_option(self, selected_choices, option_value, option_label): option_value = force_text(option_value) other_html = (u' selected="selected"' if option_value in selected_choices else '') if callable(self.transform_html_attrs): html_attrs = self.transform_html_attrs(option_label) other_html += flatatt(html_attrs) if not isinstance(option_label, (six.string_types, Promise)): for data_attr in self.data_attrs: data_value = html.conditional_escape( force_text(getattr(option_label, data_attr, ""))) other_html += ' data-%s="%s"' % (data_attr, data_value) if callable(self.transform): option_label = self.transform(option_label) return u'<option value="%s"%s>%s</option>' % ( html.escape(option_value), other_html, html.conditional_escape(force_text(option_label)))
def test_handle_translated(self): translated_unicode = u'\u30b3\u30f3\u30c6\u30ca\u30fc\u304c' \ u'\u7a7a\u3067\u306f\u306a\u3044\u305f' \ u'\u3081\u3001\u524a\u9664\u3067\u304d' \ u'\u307e\u305b\u3093\u3002' # Japanese translation of: # 'Because the container is not empty, it can not be deleted.' expected = ['error', force_text(translated_unicode), ''] req = self.request req.META['HTTP_X_REQUESTED_WITH'] = 'XMLHttpRequest' try: raise exceptions.Conflict(translated_unicode) except exceptions.Conflict: exceptions.handle(req) # The real test here is to make sure the handle method doesn't throw a # UnicodeEncodeError, but making sure the message is correct could be # useful as well. self.assertItemsEqual(req.horizon['async_messages'], [expected])
def render(self, form, form_style, context, template_pack=TEMPLATE_PACK): success_url = form.opts.get('success_url', '') delete_url = form.opts.get('delete_url', '') if delete_url: delete_url += '&' if '?' in delete_url else '?' delete_url += 'success_url=' + force_text(form.opts.get('delete_success_url', success_url)) template = get_template(form.opts.get('form_actions_template', 'ajaxviews/_form_controls.html')) btn_group = template.render({ 'delete_url': delete_url, 'success_url': force_text(success_url), 'modal_form': form.opts.get('modal_form', False), 'form_preview': form.opts.get('preview_stage', False), 'delete_confirmation': form.opts.get('delete_confirmation', False), 'form_cfg': json.dumps(form.form_cfg) if getattr(form, 'form_cfg', None) else None, }) layout_object = FormActions( Submit('save', form.opts.get('save_button_name', 'Save')), HTML(btn_group), style='margin-bottom: 0;' ) return layout_object.render(form, form_style, context)
def process_response(self, request, response): if not request.is_ajax() and not isinstance(response, HttpResponseRedirect) and hasattr(response, 'content'): content = force_text(response.content, encoding=response.charset) if '</body>' not in content: return response json_cfg = {} if hasattr(response, 'context_data'): json_cfg = response.context_data.get('json_cfg', {}) template = get_template('ajaxviews/_middleware.html') html = template.render({ 'json_cfg': json_cfg, 'main_name': settings.REQUIRE_MAIN_NAME, }) l_content, r_content = content.rsplit('</body>', 1) content = ''.join([l_content, html, '</body>', r_content]) response.content = response.make_bytes(content) if response.get('Content-Length', None): response['Content-Length'] = len(response.content) return response
def get_insert_position(admin_menu, item_name): """ Ensures that there is a SHORTCUTS_BREAK and returns a position for an alphabetical position against all items between SHORTCUTS_BREAK, and the ADMINISTRATION_BREAK. """ start = admin_menu.find_first(Break, identifier=SHORTCUTS_BREAK) if not start: end = admin_menu.find_first(Break, identifier=ADMINISTRATION_BREAK) admin_menu.add_break(SHORTCUTS_BREAK, position=end.index) start = admin_menu.find_first(Break, identifier=SHORTCUTS_BREAK) end = admin_menu.find_first(Break, identifier=ADMINISTRATION_BREAK) items = admin_menu.get_items()[start.index + 1: end.index] for idx, item in enumerate(items): try: if force_text(item_name.lower()) < force_text(item.name.lower()): return idx + start.index + 1 except AttributeError: # Some item types do not have a 'name' attribute. pass return end.index
def _format_callback(self, obj, user, admin_site, perms_needed): has_admin = obj.__class__ in admin_site._registry opts = obj._meta if has_admin: admin_url = reverse('%s:%s_%s_change' % (admin_site.name, opts.app_label, opts.object_name.lower()), None, (quote(obj._get_pk_val()),)) p = get_delete_permission(opts) if not user.has_perm(p): perms_needed.add(opts.verbose_name) # Display a link to the admin page. return mark_safe('%s: <a href="%s">%s</a>' % (escape(capfirst(opts.verbose_name)), admin_url, escape(obj))) else: # Don't display link to edit, because it either has no # admin or is edited inline. return '%s: %s' % (capfirst(opts.verbose_name), force_text(obj))
def cast_primitive_value(spec, value): format = spec.get('format') type = spec.get('type') if type == 'boolean': return (force_text(value).lower() in ('1', 'yes', 'true')) if type == 'integer' or format in ('integer', 'long'): return int(value) if type == 'number' or format in ('float', 'double'): return float(value) if format == 'byte': # base64 encoded characters return base64.b64decode(value) if format == 'binary': # any sequence of octets return force_bytes(value) if format == 'date': # ISO8601 date return iso8601.parse_date(value).date() if format == 'dateTime': # ISO8601 datetime return iso8601.parse_date(value) if type == 'string': return force_text(value) return value
def _normalize_name(self, name): """ Normalizes the name so that paths like /path/to/ignored/../foo.txt work. We check to make sure that the path pointed to is not outside the directory specified by the LOCATION setting. """ base_path = force_text(self.location) base_path = base_path.rstrip('/') final_path = urljoin(base_path.rstrip('/') + "/", name) base_path_len = len(base_path) if (not final_path.startswith(base_path) or final_path[base_path_len:base_path_len + 1] not in ('', '/')): raise SuspiciousOperation("Attempted access to '%s' denied." % name) return final_path.lstrip('/')
def regex(self): """ Returns a compiled regular expression, depending upon the activated language-code. """ language_code = get_language() if language_code not in self._regex_dict: if isinstance(self._regex, six.string_types): regex = self._regex else: regex = force_text(self._regex) try: compiled_regex = re.compile(regex, re.UNICODE) except re.error as e: raise ImproperlyConfigured( '"%s" is not a valid regular expression: %s' % (regex, six.text_type(e))) self._regex_dict[language_code] = compiled_regex return self._regex_dict[language_code]
def popen_wrapper(args, os_err_exc_type=CommandError, universal_newlines=True): """ Friendly wrapper around Popen. Returns stdout output, stderr output and OS status code. """ try: p = Popen(args, shell=False, stdout=PIPE, stderr=PIPE, close_fds=os.name != 'nt', universal_newlines=universal_newlines) except OSError as e: strerror = force_text(e.strerror, DEFAULT_LOCALE_ENCODING, strings_only=True) six.reraise(os_err_exc_type, os_err_exc_type('Error executing %s: %s' % (args[0], strerror)), sys.exc_info()[2]) output, errors = p.communicate() return ( output, force_text(errors, DEFAULT_LOCALE_ENCODING, strings_only=True), p.returncode )
def forbid_multi_line_headers(name, val, encoding): """Forbids multi-line headers, to prevent header injection.""" encoding = encoding or settings.DEFAULT_CHARSET val = force_text(val) if '\n' in val or '\r' in val: raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name)) try: val.encode('ascii') except UnicodeEncodeError: if name.lower() in ADDRESS_HEADERS: val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,))) else: val = Header(val, encoding).encode() else: if name.lower() == 'subject': val = Header(val).encode() return str(name), val
def sanitize_address(addr, encoding): if not isinstance(addr, tuple): addr = parseaddr(force_text(addr)) nm, addr = addr nm = Header(nm, encoding).encode() try: addr.encode('ascii') except UnicodeEncodeError: # IDN if '@' in addr: localpart, domain = addr.split('@', 1) localpart = str(Header(localpart, encoding)) domain = domain.encode('idna').decode('ascii') addr = '@'.join([localpart, domain]) else: addr = Header(addr, encoding).encode() return formataddr((nm, addr))
def validate(self, password, user=None): if not user: return for attribute_name in self.user_attributes: value = getattr(user, attribute_name, None) if not value or not isinstance(value, string_types): continue value_parts = re.split('\W+', value) + [value] for value_part in value_parts: if SequenceMatcher(a=password.lower(), b=value_part.lower()).quick_ratio() > self.max_similarity: verbose_name = force_text(user._meta.get_field(attribute_name).verbose_name) raise ValidationError( _("The password is too similar to the %(verbose_name)s."), code='password_too_similar', params={'verbose_name': verbose_name}, )
def encode(self, password, salt): bcrypt = self._load_library() # Need to reevaluate the force_bytes call once bcrypt is supported on # Python 3 # Hash the password prior to using bcrypt to prevent password truncation # See: https://code.djangoproject.com/ticket/20138 if self.digest is not None: # We use binascii.hexlify here because Python3 decided that a hex encoded # bytestring is somehow a unicode. password = binascii.hexlify(self.digest(force_bytes(password)).digest()) else: password = force_bytes(password) data = bcrypt.hashpw(password, salt) return "%s$%s" % (self.algorithm, force_text(data))
def decode(self, session_data): encoded_data = base64.b64decode(force_bytes(session_data)) try: # could produce ValueError if there is no ':' hash, serialized = encoded_data.split(b':', 1) expected_hash = self._hash(serialized) if not constant_time_compare(hash.decode(), expected_hash): raise SuspiciousSession("Session data corrupted") else: return self.serializer().loads(serialized) except Exception as e: # ValueError, SuspiciousOperation, unpickling exceptions. If any of # these happen, just return an empty dictionary (an empty session). if isinstance(e, SuspiciousOperation): logger = logging.getLogger('django.security.%s' % e.__class__.__name__) logger.warning(force_text(e)) return {}
def units(self): """ Returns a 2-tuple of the units value and the units name, and will automatically determines whether to return the linear or angular units. """ units, name = None, None if self.projected or self.local: units, name = capi.linear_units(self.ptr, byref(c_char_p())) elif self.geographic: units, name = capi.angular_units(self.ptr, byref(c_char_p())) if name is not None: name = force_text(name) return (units, name) # #### Spheroid/Ellipsoid Properties ####
def model_format_dict(obj): """ Return a `dict` with keys 'verbose_name' and 'verbose_name_plural', typically for use with string formatting. `obj` may be a `Model` instance, `Model` subclass, or `QuerySet` instance. """ if isinstance(obj, (models.Model, models.base.ModelBase)): opts = obj._meta elif isinstance(obj, models.query.QuerySet): opts = obj.model._meta else: opts = obj return { 'verbose_name': force_text(opts.verbose_name), 'verbose_name_plural': force_text(opts.verbose_name_plural) }
def process_response(self, request, response): """ Send broken link emails for relevant 404 NOT FOUND responses. """ if response.status_code == 404 and not settings.DEBUG: domain = request.get_host() path = request.get_full_path() referer = force_text(request.META.get('HTTP_REFERER', ''), errors='replace') if not self.is_ignorable_request(request, path, domain, referer): ua = force_text(request.META.get('HTTP_USER_AGENT', '<none>'), errors='replace') ip = request.META.get('REMOTE_ADDR', '<none>') mail_managers( "Broken %slink on %s" % ( ('INTERNAL ' if self.is_internal_request(domain, referer) else ''), domain ), "Referrer: %s\nRequested URL: %s\nUser agent: %s\n" "IP address: %s\n" % (referer, path, ua, ip), fail_silently=True) return response
def last_executed_query(self, cursor, sql, params): """ Returns a string of the query last executed by the given cursor, with placeholders replaced with actual values. `sql` is the raw query containing placeholders, and `params` is the sequence of parameters. These are used by default, but this method exists for database backends to provide a better implementation according to their own quoting schemes. """ # Convert params to contain Unicode values. to_unicode = lambda s: force_text(s, strings_only=True, errors='replace') if isinstance(params, (list, tuple)): u_params = tuple(to_unicode(val) for val in params) elif params is None: u_params = () else: u_params = {to_unicode(k): to_unicode(v) for k, v in params.items()} return six.text_type("QUERY = %r - PARAMS = %r") % (sql, u_params)
def contribute_to_class(self, cls, name, virtual_only=False): super(RelatedField, self).contribute_to_class(cls, name, virtual_only=virtual_only) self.opts = cls._meta if not cls._meta.abstract: if self.remote_field.related_name: related_name = force_text(self.remote_field.related_name) % { 'class': cls.__name__.lower(), 'app_label': cls._meta.app_label.lower() } self.remote_field.related_name = related_name def resolve_related_class(model, related, field): field.remote_field.model = related field.do_related_class(related, model) lazy_related_operation(resolve_related_class, cls, self.remote_field.model, field=self)
def cache_key(self, template_name, template_dirs, skip=None): """ Generate a cache key for the template name, dirs, and skip. If skip is provided, only origins that match template_name are included in the cache key. This ensures each template is only parsed and cached once if contained in different extend chains like: x -> a -> a y -> a -> a z -> a -> a """ dirs_prefix = '' skip_prefix = '' if skip: matching = [origin.name for origin in skip if origin.template_name == template_name] if matching: skip_prefix = self.generate_hash(matching) if template_dirs: dirs_prefix = self.generate_hash(template_dirs) return '-'.join(filter(bool, [force_text(template_name), skip_prefix, dirs_prefix]))
def append(self, element): if isinstance(element, six.string_types): element = force_text(element) element = normalize_whitespace(element) if self.children: if isinstance(self.children[-1], six.string_types): self.children[-1] += element self.children[-1] = normalize_whitespace(self.children[-1]) return elif self.children: # removing last children if it is only whitespace # this can result in incorrect dom representations since # whitespace between inline tags like <span> is significant if isinstance(self.children[-1], six.string_types): if self.children[-1].isspace(): self.children.pop() if element: self.children.append(element)
def get_text_list(list_, last_word=ugettext_lazy('or')): """ >>> get_text_list(['a', 'b', 'c', 'd']) 'a, b, c or d' >>> get_text_list(['a', 'b', 'c'], 'and') 'a, b and c' >>> get_text_list(['a', 'b'], 'and') 'a and b' >>> get_text_list(['a']) 'a' >>> get_text_list([]) '' """ if len(list_) == 0: return '' if len(list_) == 1: return force_text(list_[0]) return '%s %s %s' % ( # Translators: This string is used as a separator between list elements _(', ').join(force_text(i) for i in list_[:-1]), force_text(last_word), force_text(list_[-1]))
def is_safe_url(url, host=None): """ Return ``True`` if the url is a safe redirection (i.e. it doesn't point to a different host and uses a safe scheme). Always returns ``False`` on an empty url. """ if url is not None: url = url.strip() if not url: return False if six.PY2: try: url = force_text(url) except UnicodeDecodeError: return False # Chrome treats \ completely as / in paths but it could be part of some # basic auth credentials so we need to check both URLs. return _is_safe_url(url, host) and _is_safe_url(url.replace('\\', '/'), host)
def permission_denied(request, exception, template_name='403.html'): """ Permission denied (403) handler. Templates: :template:`403.html` Context: None If the template does not exist, an Http403 response containing the text "403 Forbidden" (as per RFC 7231) will be returned. """ try: template = loader.get_template(template_name) except TemplateDoesNotExist: return http.HttpResponseForbidden('<h1>403 Forbidden</h1>', content_type='text/html') return http.HttpResponseForbidden( template.render(request=request, context={'exception': force_text(exception)}) )
def get_success_url(self): """ Returns the supplied URL. """ if self.success_url: # force_text can be removed with deprecation warning self.success_url = force_text(self.success_url) if PERCENT_PLACEHOLDER_REGEX.search(self.success_url): warnings.warn( "%()s placeholder style in success_url is deprecated. " "Please replace them by the {} Python format syntax.", RemovedInDjango110Warning, stacklevel=2 ) url = self.success_url % self.object.__dict__ else: url = self.success_url.format(**self.object.__dict__) else: try: url = self.object.get_absolute_url() except AttributeError: raise ImproperlyConfigured( "No URL to redirect to. Either provide a url or define" " a get_absolute_url method on the Model.") return url
def get_success_url(self): if self.success_url: # force_text can be removed with deprecation warning self.success_url = force_text(self.success_url) if PERCENT_PLACEHOLDER_REGEX.search(self.success_url): warnings.warn( "%()s placeholder style in success_url is deprecated. " "Please replace them by the {} Python format syntax.", RemovedInDjango110Warning, stacklevel=2 ) return self.success_url % self.object.__dict__ else: return self.success_url.format(**self.object.__dict__) else: raise ImproperlyConfigured( "No URL to redirect to. Provide a success_url.")
def get_date_list(self, queryset, date_type=None, ordering='ASC'): """ Get a date list by calling `queryset.dates/datetimes()`, checking along the way for empty lists that aren't allowed. """ date_field = self.get_date_field() allow_empty = self.get_allow_empty() if date_type is None: date_type = self.get_date_list_period() if self.uses_datetime_field: date_list = queryset.datetimes(date_field, date_type, ordering) else: date_list = queryset.dates(date_field, date_type, ordering) if date_list is not None and not date_list and not allow_empty: name = force_text(queryset.model._meta.verbose_name_plural) raise Http404(_("No %(verbose_name_plural)s available") % {'verbose_name_plural': name}) return date_list
def to_python(self, value): """ Validates that int() can be called on the input. Returns the result of int(). Returns None for empty values. """ value = super(IntegerField, self).to_python(value) if value in self.empty_values: return None if self.localize: value = formats.sanitize_separators(value) # Strip trailing decimal and zeros. try: value = int(self.re_decimal.sub('', force_text(value))) except (ValueError, TypeError): raise ValidationError(self.error_messages['invalid'], code='invalid') return value
def popen_wrapper(args, os_err_exc_type=CommandError, stdout_encoding='utf-8'): """ Friendly wrapper around Popen. Returns stdout output, stderr output and OS status code. """ try: p = Popen(args, shell=False, stdout=PIPE, stderr=PIPE, close_fds=os.name != 'nt') except OSError as e: strerror = force_text(e.strerror, DEFAULT_LOCALE_ENCODING, strings_only=True) six.reraise(os_err_exc_type, os_err_exc_type('Error executing %s: %s' % (args[0], strerror)), sys.exc_info()[2]) output, errors = p.communicate() return ( force_text(output, stdout_encoding, strings_only=True, errors='strict'), force_text(errors, DEFAULT_LOCALE_ENCODING, strings_only=True, errors='replace'), p.returncode )
def path_and_rename(instance, filename): ext = filename.split('.')[-1] if instance.pk: filename = '{}.{}'.format(instance.pk, ext) else: filename = '{}.{}'.format(uuid4().hex, ext) dirname = force_text(datetime.datetime.now().strftime(force_str('uploads/%Y/%m/%d/'))) return os.path.join(dirname, filename)
def activate(request, uidb64, token): try: uid = force_text(urlsafe_base64_decode(uidb64)) user = User.objects.get(pk=uid) except (TypeError, ValueError, OverflowError, User.DoesNotExist): user = None if user is not None and account_activation_token.check_token(user, token): user.is_active = True user.profile.email_confirmed = True user.save() login(request, user) return redirect('post_detail') else: return render(request, 'blog/post_list.html')
def create_issue(self, request, group, form_data, **kwargs): json_data = { 'story_type': 'bug', 'name': force_text(form_data['title'], encoding='utf-8', errors='replace'), 'description': force_text(form_data['description'], encoding='utf-8', errors='replace'), 'labels': ['sentry'], } try: _url = self.build_api_url(group, 'stories') req = self.make_api_request(group.project, _url, json_data=json_data) body = safe_urlread(req) except requests.RequestException as e: msg = six.text_type(e) raise PluginError('Error communicating with Pivotal: %s' % (msg, )) try: json_resp = json.loads(body) except ValueError as e: msg = six.text_type(e) raise PluginError('Error communicating with Pivotal: %s' % (msg, )) if req.status_code > 399: raise PluginError(json_resp['error']) return json_resp['id']
def last_executed_query(self, cursor, sql, params): return force_text(cursor.statement, errors='replace')
def convert_textfield_value(self, value, expression, connection, context): # New in Django 1.8 if value is not None: value = force_text(value) return value
def test_upload_no_form(self): self.assertEqual(Image.objects.count(), 0) folder = Folder.objects.create(name='foo') with self.login_user_context(self.user): url = reverse('admin:filer-ajax_upload', kwargs={'folder_id': folder.pk}) post_data = { 'jsessionid': self.client.session.session_key } response = self.client.post(url, post_data) self.assertEqual(response.status_code, 500) self.assertTrue(force_text(response.content).find('AJAX request not valid') > -1)
def format_choices(field: models.Field): for choice in field.choices: printable_name = force_text(choice[1]) yield [choice[0], printable_name]
def prepare_value(self, value): if isinstance(value, string_types): return value if isinstance(value, list): return ', '.join(self.prepare_value(item) for item in value) if isinstance(value, dict): return ', '.join(self.prepare_value(item) for item in value.values()) return force_text(value)
def add_items(self, model, objs): content_type_pk = get_content_types_pks((model,), self.db_alias)[0] config = self.get_config() for obj in objs: obj._object_id = force_text(obj.pk) obj._body_ = self.prepare_body(obj) connection = connections[self.db_alias] if connection.pg_version >= 90500: # PostgreSQL >= 9.5 self.add_items_upsert(connection, content_type_pk, objs, config) else: self.add_items_update_then_create(content_type_pk, objs, config)