一尘不染

如何避免在heapq中使用_siftup或_siftdown

python

我不知道如何在不使用_siftup或的情况下有效解决以下问题_siftdown

当一个元素发生故障时,如何恢复堆不变式?

换句话说,更新old_valueheapnew_value,并保持heap工作。您可以假设old_value堆中只有一个。功能定义如下:

def update_value_in_heap(heap, old_value, new_value):

这是我的真实情况,如果您有兴趣请阅读。

  • 您可以想象它是一个小的自动完成系统。我需要计算单词的频率,并保持前k个最大数量的单词,这些单词随时准备输出。所以我用heap在这里。当一个单词计数++时,如果它在堆中,我需要对其进行更新。

  • 所有的单词和计数都存储在trie-tree的叶子中,而堆
    存储在trie-tree的中间节点中。如果您在意
    堆外的单词,请放心,我可以从trie-tree的叶子节点获取它。

  • 当用户键入一个单词时,它将首先从堆中读取,然后对其进行更新
    。为了获得更好的性能,我们可以考虑通过批量更新来减少更新频率。

那么,当一个特定的字数增加时,如何更新堆呢?

这是_siftup或_siftdown版本的简单示例(不是我的情况):

>>> from heapq import _siftup, _siftdown, heapify, heappop

>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
>>> heapify(data)
>>> old, new = 8, 22              # increase the 8 to 22
>>> i = data.index(old)
>>> data[i] = new
>>> _siftup(data, i)
>>> [heappop(data) for i in range(len(data))]
[1, 2, 3, 5, 7, 10, 18, 19, 22, 37]

>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
>>> heapify(data)
>>> old, new = 8, 4              # decrease the 8 to 4
>>> i = data.index(old)
>>> data[i] = new
>>> _siftdown(data, 0, i)
>>> [heappop(data) for i in range(len(data))]
[1, 2, 3, 4, 5, 7, 10, 18, 19, 37]

它花费O(n)进行索引和O(logn)进行更新。heapify是另一种解决方案,但效率不如_siftup_siftdown

但是_siftup_siftdown是heapq中的受保护成员,因此不建议它们从外部访问。

那么,有没有更好,更有效的方法来解决这个问题呢?这种情况的最佳做法?

感谢您的阅读,非常感谢您的帮助。:)


阅读 212

收藏
2021-01-20

共1个答案

一尘不染

TL; DR 使用heapify

您必须牢记的重要一件事是,理论复杂性和性能是两个不同的事物(即使它们是相关的)。换句话说,实现也很重要。渐近复杂度为您提供了一些 下限
,您可以将其视为保证,例如O(n)中的算法可确保在最坏的情况下,您将执行许多与输入大小成线性关系的指令。这里有两件重要的事情:

  1. 常量被忽略,但是常量在现实生活中很重要;
  2. 最坏的情况取决于您考虑的算法,而不仅取决于输入。

根据要考虑的主题/问题,第一点可能非常重要。在某些域中,隐藏在渐近复杂性中的常量是如此之大,以至于您甚至无法建立比常量大的输入(或者认为输入不现实)。这里不是这种情况,但是您始终必须记住这一点。

给出这两个结论,您不能真正说出: 实现B比A快,因为A源自O(n)算法,而B源自O(log n)算法
。即使从总体上来说这是一个很好的论据,但它并不总是足够的。当所有输入均等可能发生时,理论上的复杂性对于比较算法特别有用。换句话说,您的算法非常通用。

如果您知道用例和输入将是什么,则可以直接测试性能。同时使用测试和渐进复杂度,可以使您很好地了解算法的性能(在极端情况和任意实际情况下)。

话虽如此,让我们在下面的类上运行一些性能测试,这些测试将实现三种不同的策略(这里实际上有四种策略,但是
Invalidate and Reinsert 在您的情况下似乎 不合适,
因为您将使每一项无效的时间为您会看到一个给定的单词)。我将包括我的大部分代码,以便您可以仔细检查我是否搞砸了(甚至可以检查完整的笔记本):

from heapq import _siftup, _siftdown, heapify, heappop

class Heap(list):
  def __init__(self, values, sort=False, heap=False):
    super().__init__(values)
    heapify(self)
    self._broken = False
    self.sort = sort
    self.heap = heap or not sort

  # Solution 1) repair using the knowledge we have after every update:        
  def update(self, key, value):
    old, self[key] = self[key], value
    if value > old:
        _siftup(self, key)
    else:
        _siftdown(self, 0, key)

  # Solution 2 and 3) repair using sort/heapify in a lazzy way:
  def __setitem__(self, key, value):
    super().__setitem__(key, value)
    self._broken = True

  def __getitem__(self, key):
    if self._broken:
        self._repair()
        self._broken = False
    return super().__getitem__(key)

  def _repair(self):  
    if self.sort:
        self.sort()
    elif self.heap:
        heapify(self)

  # … you'll also need to delegate all other heap functions, for example:
  def pop(self):
    self._repair()
    return heappop(self)

我们首先可以检查所有三种方法是否都有效:

data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]

heap = Heap(data[:])
heap.update(8, 22)
heap.update(7, 4)
print(heap)

heap = Heap(data[:], sort_fix=True)
heap[8] = 22
heap[7] = 4
print(heap)

heap = Heap(data[:], heap_fix=True)
heap[8] = 22
heap[7] = 4
print(heap)

然后,我们可以使用以下功能运行一些性能测试:

import time
import random

def rand_update(heap, lazzy_fix=False, **kwargs):
    index = random.randint(0, len(heap)-1)
    new_value = random.randint(max_int+1, max_int*2)
    if lazzy_fix:
        heap[index] = new_value
    else:
        heap.update(index, new_value)

def rand_updates(n, heap, lazzy_fix=False, **kwargs):
    for _ in range(n):
        rand_update(heap, lazzy_fix)

def run_perf_test(n, data, **kwargs):
    test_heap = Heap(data[:], **kwargs)
    t0 = time.time()
    rand_updates(n, test_heap, **kwargs)
    test_heap[0]
    return (time.time() - t0)*1e3

results = []
max_int = 500
nb_updates = 1

for i in range(3, 7):
    test_size = 10**i
    test_data = [random.randint(0, max_int) for _ in range(test_size)]

    perf = run_perf_test(nb_updates, test_data)
    results.append((test_size, "update", perf))

    perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, heap_fix=True)
    results.append((test_size, "heapify", perf))

    perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, sort_fix=True)
    results.append((test_size, "sort", perf))

结果如下:

import pandas as pd
import seaborn as sns

dtf = pd.DataFrame(results, columns=["heap size", "method", "duration (ms)"])
print(dtf)

sns.lineplot(
    data=dtf, 
    x="heap size", 
    y="duration (ms)", 
    hue="method",
)


从这些测试中我们可以看出,这heapify似乎是最合理的选择,但在最坏的情况下它具有相当好的复杂度:O(n),并且在实践中表现更好。另一方面,研究其他选择(例如具有专用于该特定问题的数据结构,例如,使用bin将单词放入其中,然后将它们从bin移至下一个看起来像是一条可能的轨道),可能是个好主意。调查)。

重要说明:这种情况(更新与阅读比例为1:1)对heapifysort解决方案均不利。所以,如果你管理有AK:1分的比例,这一结论将更加清晰(你可以替换nb_updates = 1nb_updates = k上面的代码)。

数据框详细信息:

    heap size   method  duration in ms
0        1000   update        0.435114
1        1000  heapify        0.073195
2        1000     sort        0.101089
3       10000   update        1.668930
4       10000  heapify        0.480175
5       10000     sort        1.151085
6      100000   update       13.194084
7      100000  heapify        4.875898
8      100000     sort       11.922121
9     1000000   update      153.587103
10    1000000  heapify       51.237106
11    1000000     sort      145.306110
2021-01-20