我想与Python解决方案共享这个特定的Apache Spark,因为它的文档非常有限。
我想通过KEY计算K / V对的平均值(存储在Pairwise RDD中)。示例数据如下所示:
>>> rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.249999999999996), (u'2013-10-13', 10.693069306930692)]
现在,以下代码序列 并不是达到最佳效果的 方法,但它确实有效。这是我在寻找更好的解决方案之前所做的事情。这并不可怕,但是-如您将在答案部分中看到的- 有一种更简洁,有效的方法。
>>> import operator >>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} >>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs). >>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) >>> print(rdd1.collect()) [(u'2013-10-09', 11.235365503035176), (u'2013-10-07', 23.39500642456595), ... snip ... ]
现在,更好的方法是使用该rdd.aggregateByKey()方法。因为该方法在Apache Spark和Python文档中的记录非常少- 这就是我编写此问与答的原因 -直到最近我一直在使用上述代码序列。但是同样,它的效率较低,因此除非必要,否则 避免 这样做。
rdd.aggregateByKey()
这是使用rdd.aggregateByKey()方法( 推荐 )进行相同操作的方法…
通过KEY,同时计算SUM(我们要计算的平均值的分子)和COUNT(我们要计算的平均值的分母):
>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function. >>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
关于上面每个a和b对的含义,以下内容是正确的(因此您可以直观地看到正在发生的事情):
a
b
First lambda expression for Within-Partition Reduction Step:: a: is a TUPLE that holds: (runningSum, runningCount). b: is a SCALAR that holds the next Value Second lambda expression for Cross-Partition Reduction Step:: a: is a TUPLE that holds: (runningSum, runningCount). b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
最后,计算每个KEY的平均值,并收集结果。
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect() >>> print(finalResult) [(u'2013-09-09', 11.235365503035176), (u'2013-09-01', 23.39500642456595), (u'2013-09-03', 13.53240060820617), (u'2013-09-05', 13.141148418977687), ... snip ... ]
我希望这个问题和答案aggregateByKey()会有所帮助。
aggregateByKey()