我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用builtins.next()。
def get_assignment(result, skip=False, **kwargs): # print(result) result = iter(listize(result)) if skip: return None, result first_result = next(result) try: second_result = next(result) except StopIteration: # pipe delivers one result, e.g., strconcat result = chain([first_result], result) multiple = False else: # pipe delivers multiple results, e.g., fetchpage/tokenizer result = chain([first_result], [second_result], result) multiple = True first = kwargs.get('count') == 'first' _all = kwargs.get('count') == 'all' one = first or not (multiple or _all) return one, iter([first_result]) if one else result
def deserialize(self, stream): data = [] while True: assert stream.first.item_type == self.item_type data.append(stream.first.numpy_array_value.data) if stream.first.numpy_array_value.last: break next(stream) value = np.ndarray( shape=stream.first.numpy_array_value.shape, dtype=np.dtype(stream.first.numpy_array_value.dtype), # optimization to avoid extra data copying if array data fits to one block # TODO: compare actual performance buffer=data[0] if len(data) == 1 else b''.join(data) ) return value
def rmsection(filename, pattern): pattern_compiled = re.compile(pattern) with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_file: with open(filename) as src_file: for line in src_file: (sline, nsub) = pattern_compiled.subn('', line) tmp_file.write(sline) if nsub > 0: next(src_file) shutil.copystat(filename, tmp_file.name) shutil.move(tmp_file.name, filename) # See https://github.com/rtfd/readthedocs.org/issues/1139
def handle_nextarg(self, span, name): '''Should be called to signalize a nextarg directive. Args: span (tuple of int): Start and end line of the directive. name (str or None): Name of the argument following next or None if it should be the next positional argument. ''' self._check_for_open_block(span, 'nextarg') block = self._open_blocks[-1] directive, fname, spans = block[0:3] self._check_if_matches_last( directive, 'call', spans[-1], span, 'nextarg') args, argnames = block[5:7] args.append(self._curnode) spans.append(span) if name is not None: argnames.append(name) elif argnames: msg = 'non-keyword argument following keyword argument' raise FyppFatalError(msg, fname, span) self._curnode = []
def fit(self, data, batch_size=32, epochs=1, **kwargs): """ Fit the Gaussian Variational Autoencoder onto the training data. Args: data: ndarray, training data batch_size: int, number of samples to be fit at one pass epochs: int, number of whole-size iterations on the training data **kwargs: Returns: A training history dict. """ data_iterator, batches_per_epoch = self.data_iterator.iter(data, batch_size, mode='training', shuffle=True) history = {'vae_loss': []} for _ in tqdm(range(epochs)): epoch_loss_history_vae = [] for it in range(batches_per_epoch): data_batch = next(data_iterator) loss_autoencoder = self.vae_model.train_on_batch(data_batch[:-1], None) epoch_loss_history_vae.append(loss_autoencoder) history['vae_loss'].append(epoch_loss_history_vae) return history
def post_event(self, dispatcher, event, *args): '''Post an event into the main application thread. The event is queued internally until the `run` method's thread is able to dispatch the event. This method can be safely called from any thread. If the method is called from the `run` method's thread (for example, from within an event handler), the event may be dispatched within the same runloop iteration or the next one; the choice is nondeterministic. :Parameters: `dispatcher` : EventDispatcher Dispatcher to process the event. `event` : str Event name. `args` : sequence Arguments to pass to the event handlers. ''' self._event_queue.put((dispatcher, event, args)) self.notify()
def get_style_range(self, attribute, start, end): """Get an attribute style over the given range. If the style varies over the range, `STYLE_INDETERMINATE` is returned. :Parameters: `attribute` : str Name of style attribute to query. `start` : int Starting character position. `end` : int Ending character position (exclusive). :return: The style set for the attribute over the given range, or `STYLE_INDETERMINATE` if more than one value is set. """ iterable = self.get_style_runs(attribute) _, value_end, value = next(iterable.ranges(start, end)) if value_end < end: return STYLE_INDETERMINATE else: return value
def get_style_range(self, attribute, start, end): '''Get an attribute style over the given range. If the style varies over the range, `STYLE_INDETERMINATE` is returned. :Parameters: `attribute` : str Name of style attribute to query. `start` : int Starting character position. `end` : int Ending character position (exclusive). :return: The style set for the attribute over the given range, or `STYLE_INDETERMINATE` if more than one value is set. ''' iter = self.get_style_runs(attribute) _, value_end, value = next(iter.ranges(start, end)) if value_end < end: return STYLE_INDETERMINATE else: return value
def valueChanged(self, val): if isinstance(self.layer, QgsVectorLayer): idx = min(next(i for i,v in enumerate(self.times.keys()) if v >= val), len(self.times) - 1) fids = list(self.times.values())[idx] if self.mode == CUMULATIVE: for i in range(idx): fids.extend(list(self.times.values())[i]) subsetString = "$id IN (%s)" % ",".join(fids) self.layer.setSubsetString(subsetString) iface.mapCanvas().refresh() dt = datetime.datetime(1, 1, 1) + datetime.timedelta(milliseconds=val * 3600 * 1000) self.labelCurrent.setText(str(dt.replace(microsecond=0))) else: idx = min(next(i for i,v in enumerate(self.times.keys()) if v >= val), len(self.times) - 1) t = list(self.times.values())[idx] self.layer.dataProvider().setDataSourceUri(self.IGNORE_PREFIX + self.originalUri + "&TIME=%s" % t) iface.mapCanvas().refresh()
def moveback(self): '''Moves the iterator back of one value and decreases the step''' # outofbounds: if self._val - self.currentstep < self._start or self._raisestop: return False # we cannot move back if self._iterindex == len(self._iterindices) - 1: # no more depth available self._iterindex = 0 self._refreshitr() next(self._itr) # hack: move forward as first _itr item has already been yielded return False # we cannot move back else: # move to next depth level self._val -= self.currentstep self._iterindex += 1 self._refreshitr() # hack: move forward as first _itr item has already been yielded next(self._itr) return True
def apply_conll2003_ner(ner, testfile, outfile): """ Inputs: - ner: named entity classifier with find_ne_in_text method - testfile: path to the testfile - outfile: where the output should be saved """ documents = CoNLL2003(sources=[testfile], to_lower=True) documents_it = documents.__iter__() local_context_mat, tok_idx = None, {} # read in test file + generate outfile with open(outfile, 'w') as f_out: # collect all the words in a sentence and save other rest of the lines to_write, tokens = [], [] doc_tokens = [] for line in open(testfile): if line.startswith("-DOCSTART- -X- -X-"): f_out.write("-DOCSTART- -X- -X- O O\n") # we're at a new document, time for a new local context matrix if ner.context_model: doc_tokens = next(documents_it) local_context_mat, tok_idx = ner.context_model.get_local_context_matrix(doc_tokens) # outfile: testfile + additional column with predicted label elif line.strip(): to_write.append(line.strip()) tokens.append(clean_conll2003(line.split()[0])) else: # end of sentence: find all named entities! if to_write: ne_results = ner.find_ne_in_text(" ".join(tokens), local_context_mat, tok_idx) assert " ".join(tokens) == "".join(r[0] for r in ne_results), "returned text doesn't match" # sanity check l_list = ne_results_2_labels(ne_results) assert len(l_list) == len(tokens), "Error: %i labels but %i tokens" % (len(l_list), len(tokens)) for i, line in enumerate(to_write): f_out.write(to_write[i] + " " + l_list[i] + "\n") to_write, tokens = [], [] f_out.write("\n")
def next(itr, default=_undef): """compat wrapper for next()""" if default is _undef: return itr.next() try: return itr.next() except StopIteration: return default
def assign(item, assignment, **kwargs): key = kwargs.get('assign') value = next(assignment) if kwargs.get('one') else list(assignment) merged = merge([item, {key: value}]) yield DotDict(merged) if kwargs.get('dictize') else merged
def find_class_by_name(name, modules): """Searches the provided modules for the named class and returns it.""" modules = [getattr(module, name, None) for module in modules] return next(a for a in modules if a)
def sliding(it, window): x = list(islice(it, window)) try: if len(x) == window: while True: yield x x2 = x[1:] x2.append(next(it)) x = x2 except StopIteration: pass
def quantize(args, quantizer, classes): cnt = count() for _, X, ys in quantizer.stream(args.input_file): nys = [] for y in ys: if y not in classes: classes[y] = y if getattr(args, 'noRemap', False) else next(cnt) nys.append(classes[y]) yield X, nys
def quantize_y(args, quantizer, classes): cnt = count() for _, ys in quantizer.stream(args.input_file, no_features=True): nys = [] for y in ys: if y not in classes: classes[y] = y if getattr(args, 'noRemap', False) else next(cnt) nys.append(classes[y]) yield nys
def complete(self): return next(self.requires()).complete()
def test08(self): """Check unicode representation of target markers """ tgt = next(TaskA().output()) marker = ORMTargetMarker(name=tgt.name, params=tgt.params) self.assertEqual(str(marker), "TaskA {} None") tgt = next(TaskB(p1=5, p2='lala').output()) marker = ORMTargetMarker(name=tgt.name, params=tgt.params) self.assertEqual(str(marker), "TaskB {'p1': '5', 'p2': 'lala'} None")
def __init__(self, messages): self.messages = messages self.first = next(messages)
def __next__(self): self.first = next(self.messages) return self.first
def deserialize(self, stream): container = self.new_container() while next(stream).item_type != self.end_item_type: self.insert_item(container, BridgeMessage.deserialize_any(stream), stream.first) return self.cast(container)
def deserialize(cls, messages): value = cls.deserialize_any(MessageStream(messages)) the_end = object() assert next(messages, the_end) == the_end return value
def prepare(self): """Prepare by truncating all existing logs.""" logger.info('Truncating logs for {}'.format(self.ident)) for logset in self.iterlogs(): log = next(logset, None) if not log: continue log.write('') # Truncate by replacing it with an empty file for log in logset: log.unlink()
def addr_family(request, dut_ctl): '''The address family to determine the mode - ipv4 or ipv6 - to run tests under''' if request.param == socket.AF_INET6: if socket.AF_INET6 not in dut_ctl.addrinfo: pytest.fail("IPv6 mode not supported on DUT") elif not next(network.ip_ifaces(version=6), None): pytest.fail("No IPv6 interfaces configured") return request.param
def get_storage(self, item): storagedir = getattr(item, '_storagedir', None) if not storagedir: storagedir = '{:03}-{}'.format(next(self.counter), sanitized_name(item.name)) item._storagedir = storagedir return self.join(storagedir)
def get_first_atom(self, atomIndices): if len(atomIndices) < 1: return '' else: atom = self.mol.atoms[next(iter(atomIndices))] return u"<b>Atom</b> {atom.name} at coordinates " \ u"x:{p[0]:.3f}, y:{p[1]:.3f}, z:{p[2]:.3f} \u212B".format( atom=atom, p=atom.position.value_in(u.angstrom))
def get_selected_bond(bonds): return next(iter(bonds))
def test_next_1(self): """ Custom next methods should not be converted to __next__ in stage1, but any obj.next() calls should be converted to next(obj). """ before = """ class Upper: def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return next(self._iter).upper() def __iter__(self): return self itr = Upper('hello') assert itr.next() == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ after = """ class Upper: def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return next(self._iter).upper() def __iter__(self): return self itr = Upper('hello') assert next(itr) == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ self.convert_check(before, after, stages=[1], run=PY2)
def test_next_2(self): """ This version of the above doesn't currently work: the self._iter.next() call in line 5 isn't converted to next(self._iter). """ before = """ class Upper: def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return self._iter.next().upper() def __iter__(self): return self itr = Upper('hello') assert itr.next() == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ after = """ class Upper(object): def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return next(self._iter).upper() def __iter__(self): return self itr = Upper('hello') assert next(itr) == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ self.convert_check(before, after, stages=[1], run=PY2)
def handle_nextarg(self, span, name): '''Called when parser encounters a nextarg directive. It is a dummy method and should be overriden for actual use. Args: span (tuple of int): Start and end line of the directive. name (str or None): Name of the argument following next or None if it should be the next positional argument. ''' self._log_event('nextarg', span, name=name)
def _postprocess_eval_line(self, evalline, fname, span): lines = evalline.split('\n') # If line ended on '\n', last element is ''. We remove it and # add the trailing newline later manually. trailing_newline = (lines[-1] == '') if trailing_newline: del lines[-1] lnum = linenumdir(span[0], fname) if self._linenums else '' clnum = lnum if self._contlinenums else '' linenumsep = '\n' + lnum clinenumsep = '\n' + clnum foldedlines = [self._foldline(line) for line in lines] outlines = [clinenumsep.join(lines) for lines in foldedlines] result = linenumsep.join(outlines) # Add missing trailing newline if trailing_newline: trailing = '\n' if self._linenums: # Last line was folded, but no linenums were generated for # the continuation lines -> current line position is not # in sync with the one calculated from the last line number unsync = ( len(foldedlines) and len(foldedlines[-1]) > 1 and not self._contlinenums) # Eval directive in source consists of more than one line multiline = span[1] - span[0] > 1 if unsync or multiline: # For inline eval directives span[0] == span[1] # -> next line is span[0] + 1 and not span[1] as for # line eval directives nextline = max(span[1], span[0] + 1) trailing += linenumdir(nextline, fname) else: trailing = '' return result + trailing
def getTrianglesCoordinates(self): """ A method to retrieve triplet of coordinates representing the triangles in lon,lat,height. """ triangles = [] self._computeVerticesCoordinates() indices = iter(self.indices) for i in xrange(0, len(self.indices) - 1, 3): vi1 = next(indices) vi2 = next(indices) vi3 = next(indices) triangle = ( (self._longs[vi1], self._lats[vi1], self._heights[vi1]), (self._longs[vi2], self._lats[vi2], self._heights[vi2]), (self._longs[vi3], self._lats[vi3], self._heights[vi3]) ) triangles.append(triangle) if len(list(indices)) > 0: raise Exception('Corrupted tile') return triangles
def next(itr, default=_undef): "compat wrapper for next()" if default is _undef: return itr.next() try: return itr.next() except StopIteration: return default
def has_error(self): """Returns whether there was a business logic error when fetching data for any components for this property. Returns: boolean """ return next( (True for cr in self.component_results if cr.has_error()), False )
def has_object_error(self): """Returns true if any requested object had a business logic error, otherwise returns false Returns: boolean """ if self._has_object_error is None: # scan the objects for any business error codes self._has_object_error = next( (True for o in self.objects() if o.has_error()), False) return self._has_object_error
def _run_estimated(self): '''Run-loop that continually estimates function mapping requested timeout to measured timeout using a least-squares linear regression. Suitable for oddball platforms (Windows). XXX: There is no real relation between the timeout given by self.idle(), and used to calculate the estimate, and the time actually spent waiting for events. I have seen this cause a negative gradient, showing a negative relation. Then CPU use runs out of control due to very small estimates. ''' platform_event_loop = app.platform_event_loop predictor = self._least_squares() gradient, offset = next(predictor) time = self.clock.time while not self.has_exit: timeout = self.idle() if timeout is None: estimate = None else: estimate = max(gradient * timeout + offset, 0.0) if False: print('Gradient = %f, Offset = %f' % (gradient, offset)) print('Timeout = %f, Estimate = %f' % (timeout, estimate)) t = time() if not platform_event_loop.step(estimate) and estimate != 0.0 and \ estimate is not None: dt = time() - t gradient, offset = predictor.send((dt, estimate))
def __init__(self, run_list): self._run_list_iter = iter(run_list) self.start, self.end, self.value = next(self)
def __next__(self): return next(self._run_list_iter)
def __getitem__(self, index): while index >= self.end and index > self.start: # condition has special case for 0-length run (fixes issue 471) self.start, self.end, self.value = next(self) return self.value
def ranges(self, start, end): iterators = [i.ranges(start, end) for i in self.range_iterators] starts, ends, values = zip(*[next(i) for i in iterators]) starts = list(starts) ends = list(ends) values = list(values) while start < end: min_end = min(ends) yield start, min_end, values start = min_end for i, iterator in enumerate(iterators): if ends[i] == min_end: starts[i], ends[i], values[i] = next(iterator)
def __init__(self, elements, length): self._run_list_iter = _iter_elements(elements, length) self.start, self.end, self.value = next(self)
def headers(self): try: headers = next(yaml.load_all(self.read(body=False))) except StopIteration as e: raise ValueError("YAML header is missing. Please ensure that the top of your post has a header of the following form:\n" + SAMPLE_HEADER) except yaml.YAMLError as e: raise ValueError( "YAML header is incorrectly formatted or missing. The following information may be useful:\n{}\nIf you continue to have difficulties, try pasting your YAML header into an online parser such as http://yaml-online-parser.appspot.com/.".format(str(e))) for key, value in headers.copy().items(): if isinstance(value, datetime.date): headers[key] = datetime.datetime.combine(value, datetime.time(0)) if key == 'tags' and isinstance(value, list): headers[key] = [str(v) if six.PY3 else unicode(v) for v in value] return headers
def load_data(path): """ This is a utility function to load a text categorization dataset. It assumes the data is organized in the folder supplied in the path argument with different folders for each class, where each folder contains individual text documents (.txt). Alternatively, unlabeled data can also just be in the current folder and will receive the class label '.'. The function returns two dictionaries, one with the raw texts, one with the corresponding classes (= subdirectory names). The document ids used to index both dictionaries and match raw texts with categories are constructed as classname + name of text file. Input: path: path to a folder with the data Returns: textdict: dict with {doc_id: text} doccats: dict with {doc_id: category} """ textdict = {} doccats = {} # if there are unlabeled documents in the current directory for fname in iglob(os.path.join(path, '*.txt')): # construct unique docid docid = os.path.splitext(os.path.basename(fname))[0] # save category + text doccats[docid] = '.' with open(fname) as f: textdict[docid] = f.read() # go through all category subdirectories for cat in next(os.walk(path))[1]: if not cat.startswith('.'): cat_path = os.path.join(path, cat) # go through all txt documents for fname in iglob(os.path.join(cat_path, '*.txt')): # construct unique docid docid = cat + ' ' + os.path.splitext(os.path.basename(fname))[0] # save category + text doccats[docid] = cat with open(fname) as f: textdict[docid] = f.read() return textdict, doccats