我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.utils.six.iteritems()。
def _generate_thumbnails(self, required_thumbnails): _thumbnails = {} for name, opts in iteritems(required_thumbnails): try: opts.update({'subject_location': self.subject_location}) thumb = get_thumbnail_lazy(self, opts) _thumbnails[name] = thumb.url except Exception as e: # pragma: no cover # catch exception and manage it. We can re-raise it for debugging # purposes and/or just logging it, provided user configured # proper logging configuration if filer_settings.FILER_ENABLE_LOGGING: logger.error('Error while generating thumbnail: %s', e) if filer_settings.FILER_DEBUG: raise return _thumbnails
def changes_str(self, colon=': ', arrow=smart_text(' \u2192 '), separator='; '): """ Return the changes recorded in this log entry as a string. The formatting of the string can be customized by setting alternate values for colon, arrow and separator. If the formatting is still not satisfying, please use :py:func:`LogAction.changes_dict` and format the string yourself. :param colon: The string to place between the field name and the values. :param arrow: The string to place between each old and new value. :param separator: The string to place between each field. :return: A readable string of the changes in this log entry. """ substrings = [] for field, values in iteritems(self.changes_dict): substring = smart_text('{field_name:s}{colon:s}{old:s}{arrow:s}{new:s}').format( field_name=field, colon=colon, old=values[0], arrow=arrow, new=values[1], ) substrings.append(substring) return separator.join(substrings)
def _generate_thumbnails(self, required_thumbnails): _thumbnails = {} for name, opts in six.iteritems(required_thumbnails): try: opts.update({'subject_location': self.subject_location}) thumb = self.file.get_thumbnail(opts) _thumbnails[name] = thumb.url except Exception as e: # catch exception and manage it. We can re-raise it for debugging # purposes and/or just logging it, provided user configured # proper logging configuration if filer_settings.FILER_ENABLE_LOGGING: logger.error('Error while generating thumbnail: %s', e) if filer_settings.FILER_DEBUG: raise return _thumbnails
def _generate_thumbnails(self, required_thumbnails): _thumbnails = {} for name, opts in six.iteritems(required_thumbnails): try: # opts.update({'subject_location': self.subject_location}) thumb = self.diricon.file.get_thumbnail(opts) _thumbnails[name] = thumb.url except Exception as e: # catch exception and manage it. We can re-raise it for debugging # purposes and/or just logging it, provided user configured # proper logging configuration if filer_settings.FILER_ENABLE_LOGGING: logger.error('Error while generating thumbnail: %s', e) if filer_settings.FILER_DEBUG: raise return _thumbnails
def rec_update(self, other, **third): """Recursively update the dictionary with the contents of other and third like dict.update() does - but don't overwrite sub-dictionaries. Example: >>> d = RecursiveDictionary({'foo': {'bar': 42}}) >>> d.rec_update({'foo': {'baz': 36}}) >>> d {'foo': {'baz': 36, 'bar': 42}} """ try: iterator = six.iteritems(other) except AttributeError: iterator = other self.iter_rec_update(iterator) self.iter_rec_update(six.iteritems(third))
def update_or_create(self, defaults=None, **kwargs): """ Looks up an object with the given kwargs, updating one with defaults if it exists, otherwise creates a new one. Returns a tuple (object, created), where created is a boolean specifying whether an object was created. """ defaults = defaults or {} lookup, params = self._extract_model_params(defaults, **kwargs) self._for_write = True try: obj = self.get(**lookup) except self.model.DoesNotExist: obj, created = self._create_object_from_params(lookup, params) if created: return obj, created for k, v in six.iteritems(defaults): setattr(obj, k, v) with transaction.atomic(using=self.db, savepoint=False): obj.save(using=self.db) return obj, False
def add_update_values(self, values): """ Convert a dictionary of field name to value mappings into an update query. This is the entry point for the public update() method on querysets. """ values_seq = [] for name, val in six.iteritems(values): field = self.get_meta().get_field(name) direct = not (field.auto_created and not field.concrete) or not field.concrete model = field.model._meta.concrete_model if not direct or (field.is_relation and field.many_to_many): raise FieldError( 'Cannot update model field %r (only non-relations and ' 'foreign keys permitted).' % field ) if model is not self.get_meta().model: self.add_related_update(model, field, val) continue values_seq.append((field, model, val)) return self.add_update_fields(values_seq)
def get_related_updates(self): """ Returns a list of query objects: one for each update required to an ancestor model. Each query will have the same filtering conditions as the current query but will only update a single table. """ if not self.related_updates: return [] result = [] for model, values in six.iteritems(self.related_updates): query = UpdateQuery(model) query.values = values if self.related_ids is not None: query.add_filter(('pk__in', self.related_ids)) result.append(query) return result
def _execute_query(self): connection = connections[self.using] # Adapt parameters to the database, as much as possible considering # that the target type isn't known. See #17755. params_type = self.params_type adapter = connection.ops.adapt_unknown_value if params_type is tuple: params = tuple(adapter(val) for val in self.params) elif params_type is dict: params = dict((key, adapter(val)) for key, val in six.iteritems(self.params)) else: raise RuntimeError("Unexpected params type: %s" % params_type) self.cursor = connection.cursor() self.cursor.execute(self.sql, params)
def update(self, *args, **kwargs): """ update() extends rather than replaces existing key lists. Also accepts keyword args. """ if len(args) > 1: raise TypeError("update expected at most 1 arguments, got %d" % len(args)) if args: other_dict = args[0] if isinstance(other_dict, MultiValueDict): for key, value_list in other_dict.lists(): self.setlistdefault(key).extend(value_list) else: try: for key, value in other_dict.items(): self.setlistdefault(key).append(value) except TypeError: raise ValueError("MultiValueDict.update() takes either a MultiValueDict or dictionary") for key, value in six.iteritems(kwargs): self.setlistdefault(key).append(value)
def parse_time(value): """Parses a string and return a datetime.time. This function doesn't support time zone offsets. Raises ValueError if the input is well formatted but not a valid time. Returns None if the input isn't well formatted, in particular if it contains an offset. """ match = time_re.match(value) if match: kw = match.groupdict() if kw['microsecond']: kw['microsecond'] = kw['microsecond'].ljust(6, '0') kw = {k: int(v) for k, v in six.iteritems(kw) if v is not None} return datetime.time(**kw)
def handle(self, *args, **options): if hasattr(settings, 'DJANGO_TWILIO_SMS_RESPONSES'): for action in Action.objects.all(): action.delete() for action, response in iteritems( settings.DJANGO_TWILIO_SMS_RESPONSES): action = Action.objects.create(name=action) response = Response.objects.create( body=response, action=action ) self.stdout.write('CREATED: {}-{}'.format( action.name, response.body )) else: self.stdout.write('No responses found in settings.') if Action.objects.all().count() > 0: for action in Action.objects.all(): action.delete() self.stdout.write('All saved responses have been deleted.')
def _request(self, endpoint, method="GET", lookup=None, data={}, params={}, userargs=None, password=None): """ Generic request method designed to handle any morango endpoint. :param endpoint: constant representing which morango endpoint we are querying :param method: HTTP verb/method for request :param lookup: the pk value for the specific object we are querying :param data: dict that will be form-encoded in request :param params: dict to be sent as part of URL's query string :param userargs: Authorization credentials :param password: :return: ``Response`` object from request """ # convert user arguments into query str for passing to auth layer if isinstance(userargs, dict): userargs = "&".join(["{}={}".format(key, val) for (key, val) in iteritems(userargs)]) # build up url and send request if lookup: lookup = lookup + '/' url = urljoin(urljoin(self.base_url, endpoint), lookup) auth = (userargs, password) if userargs else None resp = requests.request(method, url, json=data, params=params, auth=auth) resp.raise_for_status() return resp
def update_fsics(cls, fsics, sync_filter): internal_fsic = DatabaseMaxCounter.calculate_filter_max_counters(sync_filter) updated_fsic = {} for key, value in iteritems(fsics): if key in internal_fsic: # if same instance id, update fsic with larger value if fsics[key] > internal_fsic[key]: updated_fsic[key] = fsics[key] else: # if instance id is not present, add it to updated fsics updated_fsic[key] = fsics[key] # load database max counters for (key, value) in iteritems(updated_fsic): for f in sync_filter: DatabaseMaxCounter.objects.update_or_create(instance_id=key, partition=f, defaults={'counter': value})
def get_form_list(self): """ This method returns a form_list based on the initial form list but checks if there is a condition method/value in the condition_list. If an entry exists in the condition list, it will call/read the value and respect the result. (True means add the form, False means ignore the form) The form_list is always generated on the fly because condition methods could use data from other (maybe previous forms). """ form_list = OrderedDict() for form_key, form_class in six.iteritems(self.form_list): # try to fetch the value from condition list, by default, the form # gets passed to the new list. condition = self.condition_dict.get(form_key, True) if callable(condition): # call the value if needed, passes the current instance. condition = condition(self) if condition: form_list[form_key] = form_class return form_list
def get_step_files(self, step): wizard_files = self.data[self.step_files_key].get(step, {}) if wizard_files and not self.file_storage: raise NoFileStorageConfigured( "You need to define 'file_storage' in your " "wizard view in order to handle file uploads.") files = {} for field, field_dict in six.iteritems(wizard_files): field_dict = field_dict.copy() tmp_name = field_dict.pop('tmp_name') if (step, field) not in self._files: self._files[(step, field)] = UploadedFile( file=self.file_storage.open(tmp_name), **field_dict) files[field] = self._files[(step, field)] return files or None
def set_step_files(self, step, files): if files and not self.file_storage: raise NoFileStorageConfigured( "You need to define 'file_storage' in your " "wizard view in order to handle file uploads.") if step not in self.data[self.step_files_key]: self.data[self.step_files_key][step] = {} for field, field_file in six.iteritems(files or {}): tmp_filename = self.file_storage.save(field_file.name, field_file) file_dict = { 'tmp_name': tmp_filename, 'name': field_file.name, 'content_type': field_file.content_type, 'size': field_file.size, 'charset': field_file.charset } self.data[self.step_files_key][step][field] = file_dict
def _to_xml(self, xml, data): if isinstance(data, (list, tuple)): for item in data: xml.startElement(self.element_node, {}) self._to_xml(xml, item) xml.endElement(self.element_node) elif isinstance(data, dict): for key, value in six.iteritems(data): xml.startElement(key, {}) self._to_xml(xml, value) xml.endElement(key) elif data is None: # Don't output any value pass else: xml.characters(smart_text(data))
def get_tasks(self, match_dict=None): """Return pending tasks for this VM as a dict with task_id as keys. If match_dict is specified then try to match key/values to current tasks and if task is found return only the one task else return {}.""" res = {} for tid, task in iteritems(self._get_tasks(self.owner.id)): if task.get(self._pk_key, None) == self.pk: res[tid] = task.get('apiview', {}) if match_dict: subtasks = {} for tid, task in iteritems(res): match_found = all(task.get(k, None) == v for k, v in iteritems(match_dict)) if match_found: subtasks[tid] = task return subtasks return res
def update2(self, d2): """ Recursive dict.update() - http://stackoverflow.com/a/3233356 """ def update(d, u): for k, v in iteritems(u): if isinstance(v, collections.Mapping): r = update(d.get(k, {}), v) d[k] = r else: d[k] = u[k] return d # noinspection PyMethodFirstArgAssignment,PyUnusedLocal self = update(self, d2) return None
def _local_settings_new(self, changes): """Generate new local_settings.py as string. The changes parameter should be a list generated by check_modules()""" changes = dict(changes) res = [] # Update or copy existing settings for opt in dir(local_settings): if not opt[0].isupper(): continue if opt in changes: opt_value = changes.pop(opt) else: opt_value = getattr(local_settings, opt) res.append('%s = %s' % (opt, repr(opt_value))) # Add new settings for opt, opt_value in iteritems(changes): res.append('%s = %s' % (opt, repr(opt_value))) return '\n'.join(res)
def _get_declared_fields(bases, attrs): """ Create a list of serializer field instances from the passed in 'attrs', plus any fields on the base classes (in 'bases'). Note that all fields from the base classes are used. """ fields = [(field_name, attrs.pop(field_name)) for field_name, obj in list(six.iteritems(attrs)) if isinstance(obj, Field)] fields.sort(key=lambda x: x[1].creation_counter) # If this class is subclassing another Serializer, add that Serializer's # fields. Note that we loop over the bases in *reverse*. This is necessary # in order to maintain the correct order of fields. for base in bases[::-1]: if hasattr(base, 'base_fields'): fields = list(base.base_fields.items()) + fields return OrderedDict(fields)
def __init__(self, data=None, status=None, template_name=None, headers=None, exception=False, content_type=None, request=None): """ Alters the init arguments slightly. For example, drop 'template_name', and instead use 'data'. Setting 'renderer' and 'media_type' will typically be deferred, For example being set automatically by the `APIView`. """ super(Response, self).__init__(None, status=status) self.data = data self.template_name = template_name self.exception = exception self.content_type = content_type if headers: for name, value in six.iteritems(headers): self[name] = value if request: self.set_response_headers(request)
def to_string(x, quote_string=True): """Format value so it can be used for task log detail""" if isinstance(x, string_types): if quote_string: return "'%s'" % x else: return '%s' % x elif isinstance(x, bool): return str(x).lower() elif isinstance(x, (int, float)): return str(x) elif isinstance(x, NoneType): return 'null' elif isinstance(x, dict): return to_string(','.join('%s:%s' % (to_string(k, quote_string=False), to_string(v, quote_string=False)) for k, v in iteritems(x))) elif isinstance(x, (list, tuple)): return to_string(','.join(to_string(i, quote_string=False) for i in x)) return to_string(str(x))
def check_update(self, json_update): """Changing most of the VM's parameters does not require a VM to be in stopped state. VM has to be stopped when changing some disk/NIC parameters or adding/deleting disks/NICS - issue #chili-879.""" vm = self.vm must_be_stopped = False for key, val in iteritems(json_update): if key in ('add_nics', 'remove_nics', 'add_disks', 'remove_disks'): must_be_stopped = True break if key == 'update_disks': if self._check_disk_update(val): must_be_stopped = True break if key == 'update_nics': if self._check_nic_update(val): must_be_stopped = True break if vm.status != vm.STOPPED and must_be_stopped: raise PreconditionRequired('VM has to be stopped when updating disks or NICs')
def __init__(self, request, obj, *args, **kwargs): self._request = request self._obj = obj self._read_only_fields = set() init = kwargs.pop('init', False) # Initial data are useful only for updates, or enabled manually by param if (obj and request.POST.get('action', None) == 'update') or init: kwargs['initial'] = self._initial_data(request, obj) # Parent constructor super(SerializerForm, self).__init__(*args, **kwargs) # Copy serializer fields if self._serializer: for name, field in iteritems(self._serializer.base_fields): field_not_defined = name not in self.fields field_not_excluded = name not in self._exclude_fields if field_not_defined and field_not_excluded: self.fields[name] = self._serializer_field(name, field) # Set fancy placeholder for key, field in self.fields.items(): field.widget.attrs['placeholder'] = self._get_placeholder(field, key)
def get_query_string(request, **kwargs): """ Conditional query string creator. Only useful for boolean parameters. """ qs = QueryDict('', mutable=True) for name, condition in iteritems(kwargs): try: value = request.GET[name] except KeyError: pass else: if condition and value: qs[name] = 1 return qs
def render_option(self, selected_choices, option_value, option_label): try: properties = ' '.join('%s="%s"' % kv for kv in iteritems(option_value[2])) except IndexError: properties = '' metadata = option_value[1] option_value = force_text(option_value[0]) if option_value in selected_choices: selected_html = mark_safe(' selected="selected"') if not self.allow_multiple_selected: # Only allow for a single selection. selected_choices.remove(option_value) else: selected_html = '' return format_html(u"<option value='{0}'{1} data-meta='{2}' {3}>{4}</option>", option_value, selected_html, mark_safe(dumps(metadata, indent=None).replace("'", "\\'")), mark_safe(properties), force_text(option_label))
def get_form_list(self): """ This method returns a form_list based on the initial form list but checks if there is a condition method/value in the condition_list. If an entry exists in the condition list, it will call/read the value and respect the result. (True means add the form, False means ignore the form) The form_list is always generated on the fly because condition methods could use data from other (maybe previous forms). """ form_list = SortedDict() for form_key, form_class in six.iteritems(self.form_list): # try to fetch the value from condition list, by default, the form # gets passed to the new list. condition = self.condition_dict.get(form_key, True) if callable(condition): # call the value if needed, passes the current instance. condition = condition(self) if condition: form_list[form_key] = form_class return form_list
def update_or_create(self, defaults=None, **kwargs): """ Looks up an object with the given kwargs, updating one with defaults if it exists, otherwise creates a new one. Returns a tuple (object, created), where created is a boolean specifying whether an object was created. """ defaults = defaults or {} lookup, params = self._extract_model_params(defaults, **kwargs) self._for_write = True with transaction.atomic(using=self.db): try: obj = self.select_for_update().get(**lookup) except self.model.DoesNotExist: obj, created = self._create_object_from_params(lookup, params) if created: return obj, created for k, v in six.iteritems(defaults): setattr(obj, k, v() if callable(v) else v) obj.save(using=self.db) return obj, False
def set_group_by(self): """ Expands the GROUP BY clause required by the query. This will usually be the set of all non-aggregate fields in the return data. If the database backend supports grouping by the primary key, and the query would be equivalent, the optimization will be made automatically. """ self.group_by = [] for col in self.select: self.group_by.append(col) if self.annotation_select: for alias, annotation in six.iteritems(self.annotation_select): for col in annotation.get_group_by_cols(): self.group_by.append(col)
def parse_duration(value): """Parses a duration string and returns a datetime.timedelta. The preferred format for durations in Django is '%d %H:%M:%S.%f'. Also supports ISO 8601 representation. """ match = standard_duration_re.match(value) if not match: match = iso8601_duration_re.match(value) if match: kw = match.groupdict() sign = -1 if kw.pop('sign', '+') == '-' else 1 if kw.get('microseconds'): kw['microseconds'] = kw['microseconds'].ljust(6, '0') kw = {k: float(v) for k, v in six.iteritems(kw) if v is not None} return sign * datetime.timedelta(**kw)
def get_catalog(self): pdict = {} maxcnts = {} catalog = {} trans_cat = self.translation._catalog trans_fallback_cat = self.translation._fallback._catalog if self.translation._fallback else {} for key, value in itertools.chain(six.iteritems(trans_cat), six.iteritems(trans_fallback_cat)): if key == '' or key in catalog: continue if isinstance(key, six.string_types): catalog[key] = value elif isinstance(key, tuple): msgid = key[0] cnt = key[1] maxcnts[msgid] = max(cnt, maxcnts.get(msgid, 0)) pdict.setdefault(msgid, {})[cnt] = value else: raise TypeError(key) for k, v in pdict.items(): catalog[k] = [v.get(i, '') for i in range(maxcnts[msgid] + 1)] return catalog
def merge_dict(a, b): """ Recursively merges and returns dict a with dict b. :param a: dictionary object :param b: dictionary object :return: merged dictionary object """ if not isinstance(a, dict) or not isinstance(b, dict): raise TypeError('Invalid type. Expected dict object.') result = deepcopy(a) for key, val in six.iteritems(b): if key in result and isinstance(result[key], dict): result[key] = merge_dict(result[key], val) else: result[key] = deepcopy(val) return result
def __repr__(self): return next(x for x, y in iteritems(self.typeset) if y == self.oftype)
def handle(self, *args, **options): schema = getattr(settings, 'SWAGGER_SCHEMA', None) module = getattr(settings, 'SWAGGER_MODULE', None) if not schema: raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema') if not module: raise ImproperlyConfigured('You have to specify desired controller module name in SWAGGER_MODULE setting') router = SwaggerRouter() print('Inspecting available controllers...') router.update(True) router.process() print() print('Following classes and methods are going to be generated:') enum = router.get_enum() for name in enum: print("{} : {}".format(name, [x['method'] for x in enum[name]['methods']])) if(options['generate']): template = Template() filename = module.split('.')[-1] + '.py' structure = [{ 'name' : name, 'data' : data } for name, data in six.iteritems(enum)] print('Generating handlers ({})...'.format(filename)) with codecs.open(filename, 'w', 'utf-8') as f: f.write(template.render(template_name = 'view.jinja', names = structure)) print('Done.') else: print() print('Use --generate option to create them')
def get_view_key(self, viewname): for reg, data in six.iteritems(self.handlers): if data['name'] == viewname: return data['key'] return None #: create root api view
def __init__(self, handle, module): self.schema = None self.module = None self.loaded = False self.handle = handle self.models = [] self.router = None self.models = dict() # parse # TODO: proper errors try: self.schema = flex.load(self.handle) self.module = module self.loaded = True except: raise SwaggerGenericError('Cannot process schema {} : check resource availability'.format(self.handle)) # make models for definitions if 'definitions' in self.schema: # make external models for name, data in six.iteritems(self.schema['definitions']): model = None if 'properties' in data: model = list() #dict() for prop, data in six.iteritems(data['properties']): model.append(prop) if model: self.models[name] = model # make routes if 'paths' in self.schema and 'basePath' in self.schema: self.router = SwaggerRouter(self.schema, self.module, self.models) else: raise SwaggerValidationError('Schema is missing paths and/or basePath values') # some advanced parsing techniques to be implemented
def get_mocked_properties(**kwargs): mock = MagicMock() for (k, v) in six.iteritems(kwargs): setattr(type(mock), k, PropertyMock(return_value=v)) return mock
def get_results_to_send(self): by_type = dict(read=[], write=[], other=[]) for result in self.queries: tp = classify_query(result['sql']) by_type[tp].append(result) by_type['total'] = self.queries return list( QueryCountResult(name=tp, queries=q) for (tp, q) in six.iteritems(by_type))
def get_public_serializer_formats(): if not _serializers: _load_serializers() return [k for k, v in six.iteritems(_serializers) if not v.Serializer.internal_use_only]
def main_help_text(self, commands_only=False): """ Returns the script's main help text, as a string. """ if commands_only: usage = sorted(get_commands().keys()) else: usage = [ "", "Type '%s help <subcommand>' for help on a specific subcommand." % self.prog_name, "", "Available subcommands:", ] commands_dict = defaultdict(lambda: []) for name, app in six.iteritems(get_commands()): if app == 'django.core': app = 'django' else: app = app.rpartition('.')[-1] commands_dict[app].append(name) style = color_style() for app in sorted(commands_dict.keys()): usage.append("") usage.append(style.NOTICE("[%s]" % app)) for name in sorted(commands_dict[app]): usage.append(" %s" % name) # Output an extra note if settings are not properly configured if self.settings_exception is not None: usage.append(style.NOTICE( "Note that only Django core commands are listed " "as settings are not properly configured (error: %s)." % self.settings_exception)) return '\n'.join(usage)
def default_units(self, kwargs): """ Return the unit value and the default units specified from the given keyword arguments dictionary. """ val = 0.0 default_unit = self.STANDARD_UNIT for unit, value in six.iteritems(kwargs): if not isinstance(value, float): value = float(value) if unit in self.UNITS: val += self.UNITS[unit] * value default_unit = unit elif unit in self.ALIAS: u = self.ALIAS[unit] val += self.UNITS[u] * value default_unit = u else: lower = unit.lower() if lower in self.UNITS: val += self.UNITS[lower] * value default_unit = lower elif lower in self.LALIAS: u = self.LALIAS[lower] val += self.UNITS[u] * value default_unit = u else: raise AttributeError('Unknown unit type: %s' % unit) return val, default_unit
def process_messages(self, obj): if isinstance(obj, list) and obj: if obj[0] == MessageEncoder.message_key: if len(obj) == 3: # Compatibility with previously-encoded messages return Message(*obj[1:]) if obj[1]: obj[3] = mark_safe(obj[3]) return Message(*obj[2:]) return [self.process_messages(item) for item in obj] if isinstance(obj, dict): return {key: self.process_messages(value) for key, value in six.iteritems(obj)} return obj