我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用bisect.insort()。
def update(self,record,**kw): """Update the record with new keys and values and update indices""" # update indices _id = record["__id__"] for indx in self.indices.keys(): if indx in kw.keys(): if record[indx] == kw[indx]: continue # remove id for the old value old_pos = bisect.bisect(self.indices[indx][record[indx]],_id)-1 del self.indices[indx][record[indx]][old_pos] if not self.indices[indx][record[indx]]: del self.indices[indx][record[indx]] # insert new value bisect.insort(self.indices[indx].setdefault(kw[indx],[]),_id) # update record values record.update(kw) # increment version number record["__version__"] += 1
def parse_runner_book(book): back_levels = [] lay_levels = [] order_book = {'batb': [], 'batl': []} for level in book: for side, order in level.items(): if order: side = price_side_map.get(side) if side == 'back': bisect.insort(back_levels, floatify(order.get('Price'))) order_book['batb'].append([floatify(order.get('Price')), floatify(order.get('Stake'))]) elif side == 'lay': bisect.insort_right(lay_levels, floatify(order.get('Price'))) order_book['batl'].append([floatify(order.get('Price')), floatify(order.get('Stake'))]) back_levels.reverse() order_book['batb'] = [[back_levels.index(x[0]), x[0], x[1]] for x in order_book['batb']] order_book['batl'] = [[lay_levels.index(x[0]), x[0], x[1]] for x in order_book['batl']] return order_book
def maxSumSubmatrix(self, matrix, k): """ :type matrix: List[List[int]] :type k: int :rtype: int """ ans = float("-inf") dp = [[0] * len(matrix[0]) for _ in range(len(matrix))] for i in range(0, len(matrix)): for j in range(0, len(matrix[0])): dp[i][j] = dp[i][j - 1] + matrix[i][j] for start in range(0, len(matrix[0])): for end in range(start, len(matrix[0])): sums = [0] subsum = 0 for i in range(0, len(matrix)): if start > 0: subsum += dp[i][end] - dp[i][start - 1] else: subsum += dp[i][end] idx = bisect.bisect_left(sums, subsum - k) if idx < len(sums): ans = max(ans, subsum - sums[idx]) bisect.insort(sums, subsum) return ans
def containsNearbyAlmostDuplicate(self, nums, k, t): """ :type nums: List[int] :type k: int :type t: int :rtype: bool """ if k == 0: return False bst = [] if k < 0 or t < 0: return False for i, num in enumerate(nums): idx = bisect.bisect_left(bst, num) if idx < len(bst) and abs(bst[idx] - num) <= t: return True if idx > 0 and abs(bst[idx - 1] - num) <= t: return True if len(bst) >= k: del bst[bisect.bisect_left(bst, nums[i - k])] bisect.insort(bst, num) return False
def updateData(): global stops, trips _updateStopData() stopTimesRaw = request("http://api.jxeeno.com/tfnsw/static/schedule/sydneytrains/latest/stop_times.txt") stopTimesReader = csv.DictReader(stopTimesRaw.split("\n"), delimiter=',', quotechar='"') newTripsDict = {} for row in stopTimesReader: if doCareAboutStop(row['stop_id']): entry = StopEntry(row['stop_id'], row['arrival_time'], row['trip_id'], row['departure_time']) if row['trip_id'] in newTripsDict: bisect.insort(newTripsDict[row['trip_id']], entry) else: newTripsDict[row['trip_id']] = [entry] addToLookup(entry) trips = merge_two_dicts(trips, newTripsDict) deleteOldFromTrips() deleteOldFromHomeEntries()
def maxSumSubmatrix(self, matrix, k): """ :type matrix: List[List[int]] :type k: int :rtype: int """ m = len(matrix) n = len(matrix[0]) if m else 0 M = max(m, n) N = min(m, n) ans = None for x in range(N): sums = [0] * M for y in range(x, N): slist, num = [], 0 for z in range(M): sums[z] += matrix[z][y] if m > n else matrix[y][z] num += sums[z] if num <= k: ans = max(ans, num) i = bisect.bisect_left(slist, num - k) if i != len(slist): ans = max(ans, num - slist[i]) bisect.insort(slist, num) return ans or 0
def getSortedGaps(self): """ Returns dictionary of gaps partitioned by scaffold and sorted by location """ ret = defaultdict(list) for key in self: if self[key].start == 'na': continue #print str(self[key]) #print ret[self[key].scaffold] bisect.insort(ret[self[key].scaffoldId], self[key]) #print ret[self[key].scaffold] #raw_input() return dict(ret)
def _release(self, offset, width): # Coalesce while True: f,b,ndx = self._buddy(offset, width) if ndx is None: break offset &= b width += 1 del f[ndx] # Add to the list bisect.insort(f, offset) # Mark as dirty self._dirty = True
def completion_add(toot): """Add usernames (original author, mentions, booster) co completion_list""" if toot['reblog']: username = '@' + toot['reblog']['account']['acct'] if username not in completion_list: bisect.insort(completion_list, username) username = '@' + toot['account']['acct'] if username not in completion_list: bisect.insort(completion_list, username) for user in ['@' + user['acct'] for user in toot['mentions']]: if user not in completion_list: bisect.insort(completion_list, username) ##################################### ######## CONFIG FUNCTIONS ######## #####################################
def _free(self, block): # free location and try to merge with neighbours (arena, start, stop) = block try: prev_block = self._stop_to_block[(arena, start)] except KeyError: pass else: start, _ = self._absorb(prev_block) try: next_block = self._start_to_block[(arena, stop)] except KeyError: pass else: _, stop = self._absorb(next_block) block = (arena, start, stop) length = stop - start try: self._len_to_seq[length].append(block) except KeyError: self._len_to_seq[length] = [block] bisect.insort(self._lengths, length) self._start_to_block[(arena, start)] = block self._stop_to_block[(arena, stop)] = block
def nsmallest(n, iterable): """Find the n smallest elements in a dataset. Equivalent to: sorted(iterable)[:n] """ if hasattr(iterable, '__len__') and n * 10 <= len(iterable): # For smaller values of n, the bisect method is faster than a minheap. # It is also memory efficient, consuming only n elements of space. it = iter(iterable) result = sorted(islice(it, 0, n)) if not result: return result insort = bisect.insort pop = result.pop los = result[-1] # los --> Largest of the nsmallest for elem in it: if cmp_lt(elem, los): insort(result, elem) pop() los = result[-1] return result # An alternative approach manifests the whole iterable in memory but # saves comparisons by heapifying all at once. Also, saves time # over bisect.insort() which has O(n) data movement time for every # insertion. Finding the n smallest of an m length iterable requires # O(m) + O(n log m) comparisons. h = list(iterable) heapify(h) return map(heappop, repeat(h, min(n, len(h)))) # 'heap' is a heap at all indices >= startpos, except possibly for pos. pos # is the index of a leaf with a possibly out-of-order value. Restore the # heap invariant.
def append(self, item): bisect.insort(self.A, (self.f(item), item))
def create_index(self,*fields): """Create an index on the specified field names An index on a field is a mapping between the values taken by the field and the sorted list of the ids of the records whose field is equal to this value For each indexed field, an attribute of self is created, an instance of the class Index (see above). Its name it the field name, with the prefix _ to avoid name conflicts """ reset = False for f in fields: if not f in self.fields: raise NameError,"%s is not a field name" %f # initialize the indices if self.mode == "open" and f in self.indices: continue reset = True self.indices[f] = {} for _id,record in self.records.iteritems(): # use bisect to quickly insert the id in the list bisect.insort(self.indices[f].setdefault(record[f],[]), _id) # create a new attribute of self, used to find the records # by this index setattr(self,'_'+f,Index(self,f)) if reset: self.commit()
def insert(self,*args,**kw): """Insert a record in the database Parameters can be positional or keyword arguments. If positional they must be in the same order as in the create() method If some of the fields are missing the value is set to None Returns the record identifier """ if args: kw = dict([(f,arg) for f,arg in zip(self.fields,args)]) # initialize all fields to None record = dict([(f,None) for f in self.fields]) # set keys and values for (k,v) in kw.iteritems(): record[k]=v # add the key __id__ : record identifier record['__id__'] = self.next_id # add the key __version__ : version number record['__version__'] = 0 # create an entry in the dictionary self.records, indexed by __id__ self.records[self.next_id] = record # update index for ix in self.indices.keys(): bisect.insort(self.indices[ix].setdefault(record[ix],[]), self.next_id) # increment the next __id__ to attribute self.next_id += 1 return record['__id__']
def append(self,data,priority): """append a new element to the queue according to its priority""" bisect.insort(self.queue,(priority,data))
def add(self, item, priority): idx = next(self._counter) if self._q and priority > self._q[-1][0]: # optimization for use case where new item has largest priority self._q.append((priority, idx)) else: bisect.insort(self._q, (priority, idx)) self._items[idx] = item
def _play(self, note): if note.duration == float('inf'): self.infinite.append((self.frame, note)) else: end = self.frame + int(note.duration) bisect.insort(self.playing, (-end, self.frame, note))
def queue(self, when, func, relative=True): """ Queue an event some number of frames in the future. :param when: The number of frames in the future to perform the event :param func: The thing to do in the future :param relative: Optional. If set to false, `when` serves as an absolute timestamp since play started instead of a relative count. If func is a callable object, it will be called. If func is a tuple, `self.play()` will be called with the tuple contents as args Other wise, `self.play()` will be called with func as an argument. """ if relative: when += self.frame bisect.insort(self.callbacks, (-when, func))
def add_events(self, events: List[Union[Event, TIME_FORMAT]]): for event in events: if not isinstance(event, Event): event = Event(event) bisect.insort(self, event)
def callLater(self, delay, callback): bisect.insort(self.queue, (self.time + delay, callback))
def reversePairs(self, nums): """ :type nums: List[int] :rtype: int """ n = len(nums) ans = 0 bst = [] for num in nums: right = 2 * num idx = bisect.bisect_right(bst, right) ans += len(bst) - idx bisect.insort(bst, num) return ans
def countSmaller(self, nums): """ :type nums: List[int] :rtype: List[int] """ ans = [] bst = [] for num in reversed(nums): idx = bisect.bisect_left(bst, num) ans.append(idx) bisect.insort(bst, num) return ans[::-1]
def nbest_ascending(self, docids, limit, from_, until, raise_unsortable=False): if limit is None: #pragma NO COVERAGE raise RuntimeError('n-best used without limit') # lifted from heapq.nsmallest h = nsort(docids, self._rev_index, ASC, from_, until) it = iter(h) result = sorted(islice(it, 0, limit)) if not result: #pragma NO COVERAGE raise StopIteration insort = bisect.insort pop = result.pop los = result[-1] # los --> Largest of the nsmallest for elem in it: if los <= elem: continue insort(result, elem) pop() los = result[-1] missing_docids = [] for value, docid in result: if value is ASC: missing_docids.append(docid) else: yield docid if raise_unsortable and missing_docids: raise Unsortable(missing_docids)
def sched (self, interval, func, args = None): now = time.time () self.q.append ((now + interval, interval, func, args)) self.q.sort (key = lambda x: x [0]) #bisect.insort (self.q, (now + interval, interval, func, args))
def __call__ (self, now): excutes = 0 for exetime, interval, func, args in self.q: if exetime > now: break excutes += 1 if args: func (now, *args) else: func (now) for i in range (excutes): exetime, interval, func, args = self.q.pop (0) #bisect.insort (self.q, (now + interval, interval, func, args)) self.q.append ((now + interval, interval, func, args)) self.q.sort (key = lambda x: x [0])
def insertChild(self, child): child.parent = self bisect.insort(self.children, (child.orderKey(), child))
def add(self, item, priority): idx = self._idx self._idx += 1 bisect.insort(self._q, (-priority, idx)) self._items[idx] = item
def update(self): """Removes unused old versions of mods from the cache folder.""" all_mods = set([(m.name, m.required_version) for mp in self.mod_manager.modpacks for m in mp.contents]) mods = {} for cmod in self.mods: if not cmod.name in mods: mods[cmod.name] = [] # keep the version list sorted bisect.insort(mods[cmod.name], parse_version(cmod.version)) for name, versions in mods.items(): newest = "{}.{}.{}".format(*versions[0]) all_versions = {"{}.{}.{}".format(*v) for v in versions} candidates = {v for n, v in all_mods if n == name and v} # preserve all mods that are a part of avalable modpack preserve = {v for v in all_versions if v in candidates} # preserve newest version of each mod preserve.add(newest) for version in (all_versions - preserve): # NOTE: Far too lazy to figure out what the actual issue is, so I'll assume the CachedMod constructor wants a filename # cmod = CachedMod(name, version) cmod = CachedMod(name + "_" + version + ".zip") assert os.path.exists(cmod.path) os.remove(cmod.path)
def add_time (self, t): bisect.insort (self.times, t)
def add (self, k, evt): if k in self.k2e: self.k2e[k].append (evt) else: self.k2e[k] = [evt] bisect.insort (self.k, k)
def _validate(self): super(ComputableTerm, self)._validate() if self.inputs is NotSpecified: raise TermInputsNotSpecified(termname=type(self).__name__) if self.outputs is NotSpecified: pass elif not self.outputs: raise TermOutputsEmpty(termname=type(self).__name__) else: # Raise an exception if there are any naming conflicts between the # term's output names and certain attributes. disallowed_names = [ attr for attr in dir(ComputableTerm) if not attr.startswith('_') ] # The name 'compute' is an added special case that is disallowed. # Use insort to add it to the list in alphabetical order. insort(disallowed_names, 'compute') for output in self.outputs: if output.startswith('_') or output in disallowed_names: raise InvalidOutputName( output_name=output, termname=type(self).__name__, disallowed_names=disallowed_names, ) if self.window_length is NotSpecified: raise WindowLengthNotSpecified(termname=type(self).__name__) if self.mask is NotSpecified: # This isn't user error, this is a bug in our code. raise AssertionError("{term} has no mask".format(term=self)) if self.window_length: for child in self.inputs: if not child.window_safe: raise NonWindowSafeInput(parent=self, child=child)
def add(self, item): bisect.insort(self._items, item)
def build(target, s_ref): func_shadows = target.shadow_chains[s_ref.func_name] shadow = ScriptShadow(s_ref.script_func, s_ref.build_args['priority']) bisect.insort(func_shadows, shadow)
def add_variable(self, var): self.variables[var.conc_addr] = var bisect.insort(self.addr_list, var.conc_addr)
def nsmallest(n, iterable): """Find the n smallest elements in a dataset. Equivalent to: sorted(iterable)[:n] """ if n < 0: return [] if hasattr(iterable, '__len__') and n * 10 <= len(iterable): # For smaller values of n, the bisect method is faster than a minheap. # It is also memory efficient, consuming only n elements of space. it = iter(iterable) result = sorted(islice(it, 0, n)) if not result: return result insort = bisect.insort pop = result.pop los = result[-1] # los --> Largest of the nsmallest for elem in it: if elem < los: insort(result, elem) pop() los = result[-1] return result # An alternative approach manifests the whole iterable in memory but # saves comparisons by heapifying all at once. Also, saves time # over bisect.insort() which has O(n) data movement time for every # insertion. Finding the n smallest of an m length iterable requires # O(m) + O(n log m) comparisons. h = list(iterable) heapify(h) return list(map(heappop, repeat(h, min(n, len(h))))) # 'heap' is a heap at all indices >= startpos, except possibly for pos. pos # is the index of a leaf with a possibly out-of-order value. Restore the # heap invariant.
def queue_command(self, after, cmd, persists=False, unique=True): log.debug('Queueing command "%s" to execute in %s second(s)', cmd.__name__, after) timestamp = datetime.datetime.now() + datetime.timedelta(seconds=after) queued_command = QueuedCommand(after, timestamp, cmd, persists) if not unique or queued_command not in self.queue: bisect.insort(self.queue, queued_command) else: log.warning('Failed to queue command "%s" because it\'s already queued.', cmd.__name__)
def addIntervalBlock(self, intervalBlock): assert self.isParentOf(intervalBlock) bisect.insort(self.intervalBlocks, intervalBlock) intervalBlock.meterReading = self
def _freeLine(self, offset): self._file.seek(offset) self._file.write(b'#') self._file.flush() line = self._file.readline() size = len(line) + 1 # one character was written beforehand if size > 5: bisect.insort(self._free_lines, (len(line)+1, offset) )
def add_row(self, member): if not self.have_member(member): bisect.insort(self.items, member) self.refresh_collisions() self.modelReset.emit()
def _queue_request(self, n): bisect.insort(self._queue, n) if not self._in_flight: self._consume_queue()
def containsNearbyAlmostDuplicate(self, nums, k, t): """ :type nums: List[int] :type k: int :type t: int :rtype: bool """ import bisect l = len(nums) if k < 1 or t < 0 or not nums or l < 2: return False k+=1 win=nums[:k] win.sort() for i in range(len(win)-1): if win[i+1]-win[i]<=t: return True for x in range(l-k): win.remove(nums[x]) bisect.insort(win,nums[k+x]) pos=bisect.bisect_left(win,nums[k+x]) if pos>0 and win[pos]-win[pos-1]<=t: return True l=len(win) if pos<l-1 and win[pos+1]-win[pos]<=t: return True return False
def bisect_demo(): """ ?????????????? :return: """ import bisect demo_list = [2, 6, 1, 8, 4, 9, 0] demo_list.sort() bisect.insort(demo_list, 5) print demo_list print bisect.bisect(demo_list, 3)
def addToLookup(entry): global allHomeEntriesByTime, trips if entry.tripId in trips: return if home_stop.upper() in stops[entry.stopId]['stop_name'].upper(): testExistsIndex = bisect.bisect_left(allHomeEntriesByTime, entry) if testExistsIndex < len(allHomeEntriesByTime) and allHomeEntriesByTime[testExistsIndex].tripId == entry.tripId: return bisect.insort(allHomeEntriesByTime, entry)