一尘不染

Spark使用上一行的值将新列添加到数据框

python

我想知道如何在Spark(Pyspark)中实现以下目标

初始数据框:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

结果数据框:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

我通常使用以下方法设法将新列“追加”到数据框: df.withColumn("new_Col", df.num * 10)

但是,我不知道如何为新列实现这种“行移位”,以便新列具有上一行的字段值(如示例中所示)。我也无法在API文档中找到有关如何通过索引访问DF中特定行的任何内容。

任何帮助,将不胜感激。


阅读 126

收藏
2020-12-20

共1个答案

一尘不染

您可以lag如下使用窗口功能

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

但是有一些重要的问题:

  1. 如果您需要全局操作(不被其他一个或多个其他列分区),则效率极低。
  2. 您需要一种自然的方式来订购数据。

尽管第二个问题几乎从来都不是问题,但第一个问题可以成为破坏交易的方法。如果是这种情况,您应该简单地将其转换DataFrame为RDD并lag手动进行计算。

2020-12-20