我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用operator.attrgetter()。
def execute(self, context): selection = bpy.context.selected_sequences if not selection: selection = bpy.context.sequences sequences = [s for s in selection if s.type in SequenceTypes.SOUND] if not sequences: self.report({"ERROR_INVALID_INPUT"}, "Select at least one sound strip") return {'CANCELLED'} show_waveform = None if self.mode == 'auto': from operator import attrgetter show_waveform = not sorted(sequences, key=attrgetter('frame_final_start'))[0].show_waveform else: show_waveform = True if self.mode == 'on' else False for s in sequences: s.show_waveform = show_waveform return {'FINISHED'}
def execute(self, context): selection = bpy.context.selected_sequences if not selection: return {'CANCELLED'} selection = sorted(selection, key=attrgetter('channel', 'frame_final_start')) if self.direction == 'up': for s in reversed(selection): s.channel += 1 elif self.direction == 'down': for s in selection: if (s.channel > 1): s.channel -= 1 return {'FINISHED'} # TODO: find a way to get the selection bounding box and place it # where there is space for it?
def find_next_sequences(sequences): """ Finds the strips following the sequences passed to the function Args: - Sequences, the sequences to check Returns all the strips after the sequence in the current context """ if not sequences: raise AttributeError('Missing sequences parameter') last_seq_start = max(sequences, key=attrgetter('frame_final_start')).frame_final_start next_sequences = [] for s in bpy.context.sequences: if s.frame_final_start > last_seq_start: next_sequences.append(s) return next_sequences
def get_frame_range(sequences=None, get_from_start=False): """ Returns a tuple with the minimum and maximum frames of the list of passed sequences. If no sequences are passed, returns the timeline's start and end frames Args: - sequences, the sequences to use - get_from_start, the returned start frame is set to 1 if this boolean is True """ if not sequences: if bpy.context.sequences: sequences = bpy.context.sequences else: scene = bpy.context.scene return scene.frame_start, scene.frame_end start = 1 if get_from_start else min( sequences, key=attrgetter('frame_final_start')).frame_final_start end = max(sequences, key=attrgetter('frame_final_end')).frame_final_end return start, end
def find_effect_strips(sequence): """ Takes a single strip and finds effect strips that use it as input Returns the effect strip(s) found as a list, ordered by starting frame Returns None if no effect was found """ if sequence.type not in SequenceTypes.VIDEO and sequence.type not in SequenceTypes.IMAGE: return None effect_sequences = (s for s in bpy.context.sequences if s.type in SequenceTypes.EFFECT) found_effect_strips = [] for s in effect_sequences: if s.input_1.name == sequence.name: found_effect_strips.append(s) if s.input_count == 2: if s.input_2.name == sequence.name: found_effect_strips.append(s) found_effect_strips = sorted(found_effect_strips, key=attrgetter('frame_final_start')) return found_effect_strips
def find_snap_candidate(frame=0): """ Finds and returns the best frame snap candidate around the frame """ closest_cut_frame = 1000000 for s in bpy.context.sequences: start_to_frame = frame - s.frame_final_start end_to_frame = frame - s.frame_final_end distance_to_start = abs( start_to_frame ) distance_to_end = abs(end_to_frame) smallest_distance = min(distance_to_start, distance_to_end) if smallest_distance == distance_to_start: snap_candidate = frame - start_to_frame else: snap_candidate = frame - end_to_frame if abs(frame - snap_candidate) < abs(frame - closest_cut_frame): closest_cut_frame = snap_candidate return closest_cut_frame # FIXME: 122, "sorted_sequences = sorted(bpy.context.selected_sequences, key=attrgetter('frame_final_start'))[0]" # If trimming the start of the first sequence, there's no sequence selected. https://github.com/GDquest/GDquest-VSE/issues/1
def execute(self, context): selection = bpy.context.selected_sequences cursor_start_frame = bpy.context.scene.frame_current sequencer = bpy.ops.sequencer # Deactivate audio playback scene = bpy.context.scene initial_audio_setting = scene.use_audio_scrub scene.use_audio_scrub = False first_sequence = min(selection, key=attrgetter('frame_final_start')) bpy.context.scene.frame_current = first_sequence.frame_final_start sequencer.copy() bpy.context.scene.frame_current = cursor_start_frame scene.use_audio_scrub = initial_audio_setting if self.delete_selection: sequencer.delete() plural_string = 's' if len(selection) != 1 else '' action_verb = 'Cut' if self.delete_selection else 'Copied' report_message = '{!s} {!s} sequence{!s} to the clipboard.'.format(action_verb, str(len(selection)), plural_string) self.report({'INFO'}, report_message) return {"FINISHED"}
def find_neighbor_markers(frame=None): """Returns a tuple containing the closest marker to the left and to the right of the frame""" markers = bpy.context.scene.timeline_markers if not (frame and markers): return None, None from operator import attrgetter markers = sorted(markers, key=attrgetter('frame')) previous_marker, next_marker = None, None for m in markers: previous_marker = m if m.frame < frame else previous_marker if m.frame > frame: next_marker = m break return previous_marker, next_marker
def find_unsafe_migrations(connection): loader = MigrationLoader(connection) disk_migrations = set(loader.disk_migrations.keys()) new_migrations = disk_migrations.difference(loader.applied_migrations) unsafe_migrations = [] for app_name, migration_name in new_migrations: migration = loader.get_migration(app_name, migration_name) project_state = loader.project_state((app_name, migration_name), at_end=False) result = analyze_migration(connection, migration, project_state) if result: unsafe_migrations.append(result) unsafe_migrations = sorted(unsafe_migrations, key=operator.attrgetter('app_name', 'migration_name')) conflicts = loader.detect_conflicts() for app, names in conflicts.items(): unsafe_migrations.append(MigrationConflict(app_name=app, migration_names=names)) return unsafe_migrations
def _walk_xml(self, node, depth=0, parent=""): text = self._repr_of_openning_tag(node) + self._repr_of_closing_tag(node) item = self._treeview.insert(parent, END, text = text) self._item_ID_to_element[item] = node if node.text: text = node.text.strip() if text != "": for line in text.splitlines(): self._treeview.insert(item, END, text = line) child_nodes = sorted(list(node), key=attrgetter('tag')) for child_node in node: self._walk_xml(child_node, depth+1, parent=item) if node.tail: tail = node.tail.strip() if tail != "": for line in tail.splitlines(): self._treeview.insert(parent, END, text = line)
def compare_materials(self, material): """ Compare two WoW material properties """ get_attributes = operator.attrgetter( 'Shader', 'TerrainType', 'BlendingMode', 'Texture1', 'EmissiveColor', 'Flags', 'Texture2', 'DiffColor') mat1 = get_attributes(material.WowMaterial) for material2, index in self.material_lookup.items(): if mat1 == get_attributes(material2.WowMaterial): return index return None
def get_hierarchy(self, devices): regions = set() cells = set() labels = set() for device in devices: if device.region not in regions: regions.add(device.region) if device.cell: if device.cell not in cells: cells.add(device.cell) for label in device.labels: if label not in labels: labels.add(label) regions = sorted(regions, key=attrgetter('name')) cells = sorted(cells, key=attrgetter('name')) labels = sorted(labels, key=attrgetter('label')) devices = sorted(devices, key=attrgetter('ip_address')) return regions, cells, labels, devices
def get(ctx, keyword): """ Print a configuration setting. \b Example: farmer config get api_url """ config = ctx.obj['config'] try: value = operator.attrgetter(keyword)(config) except AttributeError as exc: raise click.ClickException(exc.message) if isinstance(value, SCALARS): click.echo(value) else: # Resolve top-most LayeredConfig config and dump it as YAML. click.echo(dump_yaml(value))
def to_text(passage, sentences=True, *args, **kwargs): """Converts from a Passage object to tokenized strings. :param passage: the Passage object to convert :param sentences: whether to break the Passage to sentences (one for string) or leave as one string. Defaults to True :return a list of strings - 1 if sentences=False, # of sentences otherwise """ del args, kwargs tokens = [x.text for x in sorted(passage.layer(layer0.LAYER_ID).all, key=operator.attrgetter('position'))] # break2sentences return the positions of the end tokens, which is # always the index into tokens incremented by ones (tokens index starts # with 0, positions with 1). So in essence, it returns the index to start # the next sentence from, and we should add index 0 for the first sentence if sentences: starts = [0] + textutil.break2sentences(passage) else: starts = [0, len(tokens)] return [' '.join(tokens[starts[i]:starts[i + 1]]) for i in range(len(starts) - 1)]
def get_word_vectors(dim=None, size=None, filename=None): vocab = get_nlp().vocab if filename is not None: print("Loading word vectors from '%s'..." % filename) try: with open(filename, encoding="utf-8") as f: first_line = f.readline().split() if len(first_line) == 2 and all(s.isdigit() for s in first_line): vocab.resize_vectors(int(first_line[1])) else: f.seek(0) # First line is already a vector and not a header, so let load_vectors read it vocab.load_vectors(f) except OSError as e: raise IOError("Failed loading word vectors from '%s'" % filename) from e elif dim is not None and dim < vocab.vectors_length: vocab.resize_vectors(dim) lexemes = sorted([l for l in vocab if l.has_vector], key=attrgetter("prob"), reverse=True)[:size] return {l.orth_: l.vector for l in lexemes}, vocab.vectors_length
def get_prefetch_queryset(self, instances, queryset=None): if queryset is None: queryset = self.get_queryset() queryset._add_hints(instance=instances[0]) rel_obj_attr = attrgetter(self.related.field.attname) instance_attr = lambda obj: obj._get_pk_val() instances_dict = {instance_attr(inst): inst for inst in instances} query = {'%s__in' % self.related.field.name: instances} queryset = queryset.filter(**query) # Since we're going to assign directly in the cache, # we must manage the reverse relation cache manually. rel_obj_cache_name = self.related.field.get_cache_name() for rel_obj in queryset: instance = instances_dict[rel_obj_attr(rel_obj)] setattr(rel_obj, rel_obj_cache_name, instance) return queryset, rel_obj_attr, instance_attr, True, self.cache_name
def group_by_type(self, select_types: List[str] = None) -> 'EventGroupList': """ Groups events by type Attributes ---------- select_types A list of types for which to select groups in the resulting EventGroupList. If no types are specified, all resulting groups will be selected. Returns ------- An EventGroupList partitioned by type """ if select_types is None: select_types = [] groups = [EventList(list(group), end=self.end) for index, group in groupby(self, key=attrgetter('__class__'))] if not select_types: selected_groups = groups else: selected_groups = [group for group in groups if group.type in select_types] return EventGroupList(groups, selected=selected_groups)
def _preprocess_batch(self, datum_batch, include_datum=False, random_transform=True): imgs_path = map(attrgetter('img_path'), datum_batch) captions_txt = map(attrgetter('caption_txt'), datum_batch) img_batch = self._image_preprocessor.preprocess_images(imgs_path, random_transform) caption_batch = self._caption_preprocessor.encode_captions(captions_txt) imgs_input = self._image_preprocessor.preprocess_batch(img_batch) captions = self._caption_preprocessor.preprocess_batch(caption_batch) captions_input, captions_output = captions X, y = [imgs_input, captions_input], captions_output if include_datum: return X, y, datum_batch else: return X, y
def __init__(self, label=None, validators=None, query_factory=None, get_pk=None, get_label=None, allow_blank=False, blank_text='', **kwargs): super(QuerySelectField, self).__init__(label, validators, **kwargs) self.query_factory = query_factory if get_pk is None: if not has_identity_key: raise Exception('The sqlalchemy identity_key function could not be imported.') self.get_pk = get_pk_from_identity else: self.get_pk = get_pk if get_label is None: self.get_label = lambda x: x elif isinstance(get_label, string_types): self.get_label = operator.attrgetter(get_label) else: self.get_label = get_label self.allow_blank = allow_blank self.blank_text = blank_text self.query = None self._object_list = None
def __init__(self, label=None, validators=None, reference_class=None, get_label=None, allow_blank=False, blank_text='', **kwargs): super(ReferencePropertyField, self).__init__(label, validators, **kwargs) if get_label is None: self.get_label = lambda x: x elif isinstance(get_label, string_types): self.get_label = operator.attrgetter(get_label) else: self.get_label = get_label self.allow_blank = allow_blank self.blank_text = blank_text self._set_data(None) if reference_class is not None: self.query = reference_class.all()
def __get__(self, obj, type=None): # raise an AttributeError if the attribute is not present on the object if obj is not None: # delegate only on instances, not the classes. # this is to allow access to the docstrings. for delegate_name in self.delegate_names: try: delegate = attrgetter(delegate_name)(obj) except AttributeError: continue else: getattr(delegate, self.attribute_name) break else: attrgetter(self.delegate_names[-1])(obj) # lambda, but not partial, allows help() to work with update_wrapper out = lambda *args, **kwargs: self.fn(obj, *args, **kwargs) # update the docstring of the returned function update_wrapper(out, self.fn) return out
def __get__(self, obj, type=None): # raise an AttributeError if the attribute is not present on the object if obj is not None: # delegate only on instances, not the classes. # this is to allow access to the docstrings. for delegate_name in self.delegate_names: try: delegate = getattr(obj, delegate_name) except AttributeError: continue else: if not isinstance(delegate, self.instance_type): raise TypeError('delegate (%s) is not an instance of %s' % (delegate, self.instance_type)) break else: attrgetter(self.delegate_names[-1])(obj) # lambda, but not partial, allows help() to work with update_wrapper out = lambda *args, **kwargs: self.fn(obj, *args, **kwargs) # update the docstring of the returned function update_wrapper(out, self.fn) return out
def test_attrgetter(self): from operator import attrgetter ser = CloudPickleSerializer() class C(object): def __getattr__(self, item): return item d = C() getter = attrgetter("a") getter2 = ser.loads(ser.dumps(getter)) self.assertEqual(getter(d), getter2(d)) getter = attrgetter("a", "b") getter2 = ser.loads(ser.dumps(getter)) self.assertEqual(getter(d), getter2(d)) d.e = C() getter = attrgetter("e.a") getter2 = ser.loads(ser.dumps(getter)) self.assertEqual(getter(d), getter2(d)) getter = attrgetter("e.a", "e.b") getter2 = ser.loads(ser.dumps(getter)) self.assertEqual(getter(d), getter2(d)) # Regression test for SPARK-3415
def save_attrgetter(self, obj): """attrgetter serializer""" class Dummy(object): def __init__(self, attrs, index=None): self.attrs = attrs self.index = index def __getattribute__(self, item): attrs = object.__getattribute__(self, "attrs") index = object.__getattribute__(self, "index") if index is None: index = len(attrs) attrs.append(item) else: attrs[index] = ".".join([attrs[index], item]) return type(self)(attrs, index) attrs = [] obj(Dummy(attrs)) return self.save_reduce(operator.attrgetter, tuple(attrs))
def _super_run(self): dist = CustomizedDist() for attr in 'allow_hosts index_url'.split(): setattr(dist, attr, getattr(self, attr)) for attr in ( 'dependency_links install_requires ' 'tests_require extras_require ' ).split(): setattr(dist, attr, getattr(self.distribution, attr)) installed_dists = self.install_dists(dist) if self.dry_run: self.announce('skipping tests (dry run)') return paths = map(_operator.attrgetter('location'), installed_dists) with self.paths_on_pythonpath(paths): self.with_project_on_sys_path(self.run_tests)
def __init__(self, fields, field_prefix=None): """ Initializes a new TSV writer The first value of each fields tuple is the destination field name. The second value is a str property path (e.g. "one.two.three") or a callable that when passed a row returns a computed field value Arguments: fields (List): list of (str, str|callable) tuples field_prefix (str): path prefix to prefix field lookups with """ self.fields = OrderedDict(fields) self.columns = self.fields.keys() self.field_mappers = {column: self.get_field_mapper(field) for (column, field) in fields} self.prefix_mapper = attrgetter(field_prefix) if field_prefix is not None else None
def get_packages_from_lockfile(): """ Return object that contains default and development packages from Pipfile.lock Returns: SimpleNamespace(default=[...], development=[...]) """ result = SimpleNamespace(default=list(), development=list()) lockfile = Path('Pipfile.lock') lockfile_data = json.loads(lockfile.read_text()) for key in ('default', 'develop'): for package, version_info in lockfile_data[key].items(): packages = attrgetter('development' if key == 'develop' else key)(result) packages.append(package + version_info['version']) return result
def choice(node_id, choice): from models import Agent try: exp = MCMCP(db.session) node = Agent.query.get(node_id) infos = node.infos() if choice == 0: info = max(infos, key=attrgetter("id")) elif choice == 1: info = min(infos, key=attrgetter("id")) else: raise ValueError("Choice must be 1 or 0") info.chosen = True exp.save() return Response( status=200, mimetype='application/json') except Exception: return Response( status=403, mimetype='application/json')
def moran_cultural(network): """Generalized cultural Moran process. At eachtime step, an individual is chosen to receive information from another individual. Nobody dies, but perhaps their ideas do. """ if not network.transmissions(): # first step, replacer is a source replacer = random.choice(network.nodes(type=Source)) replacer.transmit() else: replacer = random.choice(network.nodes(type=Agent)) replaced = random.choice( replacer.neighbors(direction="to", type=Agent)) from operator import attrgetter replacer.transmit( what=max(replacer.infos(), key=attrgetter('creation_time')), to_whom=replaced)
def shortest_string(self): """ Uses BFS in order to find the shortest string Args: None Returns: str: The shortest string """ initialstates = sorted( self.states, key=attrgetter('initial'), reverse=True) if len(initialstates) > 0: return bfs(self, initialstates[0]) else: return None
def init_from_acceptor(self, acceptor): """ Adds a sink state Args: alphabet (list): The input alphabet Returns: None """ states = sorted( acceptor.states, key=attrgetter('initial'), reverse=True) for state in states: for arc in state.arcs: itext = acceptor.isyms.find(arc.ilabel) if itext in self.alphabet: self.add_arc(state.stateid, arc.nextstate, itext) if state.final: self[state.stateid].final = True if state.initial: self[state.stateid].initial = True
def consume_input(self, inp): """ Return True/False if the machine accepts/reject the input. Args: inp (str): input string to be consumed Returns: bool: A true or false value depending on if the DFA accepts the provided input """ cur_state = sorted( self.states, key=attrgetter('initial'), reverse=True)[0] while len(inp) > 0: found = False for arc in cur_state.arcs: if self.isyms.find(arc.ilabel) == inp[0]: cur_state = self[arc.nextstate] inp = inp[1:] found = True break if not found: return False return cur_state.final != TropicalWeight(float('inf'))
def save(self, txt_fst_filename): """ Save the machine in the openFST format in the file denoted by txt_fst_filename. Args: txt_fst_filename (str): The name of the file Returns: None """ txt_fst = open(txt_fst_filename, 'w+') states = sorted(self.states, key=attrgetter('initial'), reverse=True) for state in states: for arc in state.arcs: itext = self.isyms.find(arc.ilabel) otext = self.osyms.find(arc.ilabel) txt_fst.write( '{}\t{}\t{}\t{}\n'.format( state.stateid, arc.nextstate, itext.encode('hex'), otext.encode('hex'))) if state.final: txt_fst.write('{}\n'.format(state.stateid)) txt_fst.close()
def consume_input(self, inp): """ Return True/False if the machine accepts/reject the input. Args: inp (str): input string to be consumed Returns: bool: A true or false value depending on if the DFA accepts the provided input """ cur_state = sorted( self.states, key=attrgetter('initial'), reverse=True)[0] while len(inp) > 0: found = False for arc in cur_state.arcs: if self.isyms.find(arc.ilabel) == inp[0]: cur_state = self[arc.nextstate] inp = inp[1:] found = True break if not found: return False return cur_state.final
def save(self, txt_fst_file_name): """ Save the machine in the openFST format in the file denoted by txt_fst_file_name. Args: txt_fst_file_name (str): The output file Returns: None """ output_filename = open(txt_fst_file_name, 'w+') states = sorted(self.states, key=attrgetter('initial'), reverse=True) for state in states: for arc in state.arcs: itext = self.isyms.find(arc.ilabel) otext = self.osyms.find(arc.ilabel) output_filename.write( '{}\t{}\t{}\t{}\n'.format( state.stateid, arc.nextstate, itext.encode('hex'), otext.encode('hex'))) if state.final: output_filename.write('{}\n'.format(state.stateid)) output_filename.close()
def qtproperty(name, unwrap=lambda x: x, wrap=lambda x: x): """ Expose a property of an attribute that respects Qt's get/setter conventions. If transform is defined, it is applied to the value passed to the setter method. Example:: class MyObj: title = qtproperty('_widget.title') """ prop, action = name.split('.') set_name = '%s.set%s' % (prop, action.title()) getter = op.attrgetter(name) setter = op.attrgetter(set_name) return property( lambda x: unwrap(getter(x)()), lambda x, v: setter(x)(wrap(v)), )
def add(self, dist): """Add `dist` if we ``can_add()`` it and it has not already been added """ if self.can_add(dist) and dist.has_version(): dists = self._distmap.setdefault(dist.key, []) if dist not in dists: dists.append(dist) dists.sort(key=operator.attrgetter('hashcmp'), reverse=True)
def run(self): installed_dists = self.install_dists(self.distribution) cmd = ' '.join(self._argv) if self.dry_run: self.announce('skipping "%s" (dry run)' % cmd) return self.announce('running "%s"' % cmd) paths = map(operator.attrgetter('location'), installed_dists) with self.paths_on_pythonpath(paths): with self.project_on_sys_path(): self.run_tests()
def execute(self, context): selection = sorted(bpy.context.selected_sequences, key=attrgetter('frame_final_start')) time_move = selection[0].frame_final_start - bpy.context.scene.frame_current selection = reversed(selection) empty_channel = find_empty_channel() for s in selection: if s.type in SequenceTypes.VIDEO or s.type in SequenceTypes.IMAGE or s.type in SequenceTypes.SOUND: s.frame_start -= time_move return {'FINISHED'}
def load_feeds(self): with concurrent.futures.ProcessPoolExecutor() as executor: feeds = executor.map(scrape_rss, self.feed_manager.get_feeds()) frontpage_entries = [] for feed in feeds: feed.entries = feed.entries[:10] # Only get 1st 10 GLib.idle_add(self.add_new_feed_tab, feed.feed.title, feed.entries) # Load into frontpage-o frontpage_entries.extend(feed.entries) frontpage_entries = sorted(frontpage_entries, key=operator.attrgetter('updated')) GLib.idle_add(self.add_new_feed_tab, "Frontpage", frontpage_entries)
def extractall(self, path=".", members=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). """ directories = [] if members is None: members = self for tarinfo in members: if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0700 self.extract(tarinfo, path) # Reverse sort directories. directories.sort(key=operator.attrgetter('name')) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError, e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e)
def getvalue(self, key, default=None): """Dictionary style get() method, including 'value' lookup.""" if key in self: value = self[key] if type(value) is type([]): return map(attrgetter('value'), value) else: return value.value else: return default
def getlist(self, key): """ Return list of received values.""" if key in self: value = self[key] if type(value) is type([]): return map(attrgetter('value'), value) else: return [value.value] else: return []