一尘不染

使用Python计算Spark中成对(K,V)RDD中每个KEY的平均值

python

我想与Python解决方案共享这个特定的Apache Spark,因为它的文档非常有限。

我想通过KEY计算K / V对的平均值(存储在Pairwise RDD中)。示例数据如下所示:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

现在,以下代码序列 并不是达到最佳效果的 方法,但它确实有效。这是我在寻找更好的解决方案之前所做的事情。这并不可怕,但是-如您将在答案部分中看到的-
有一种更简洁,有效的方法。

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

阅读 276

收藏
2020-12-20

共1个答案

一尘不染

现在,更好的方法是使用该rdd.aggregateByKey()方法。因为该方法在Apache Spark和Python文档中的记录非常少-
这就是我编写此问与答的原因 -直到最近我一直在使用上述代码序列。但是同样,它的效率较低,因此除非必要,否则 避免 这样做。

这是使用rdd.aggregateByKey()方法( 推荐 )进行相同操作的方法…

通过KEY,同时计算SUM(我们要计算的平均值的分子)和COUNT(我们要计算的平均值的分母):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

关于上面每个ab对的含义,以下内容是正确的(因此您可以直观地看到正在发生的事情):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

最后,计算每个KEY的平均值,并收集结果。

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

我希望这个问题和答案aggregateByKey()会有所帮助。

2020-12-20