我想知道如何在Spark(Pyspark)中实现以下目标
初始数据框:
+--+---+ |id|num| +--+---+ |4 |9.0| +--+---+ |3 |7.0| +--+---+ |2 |3.0| +--+---+ |1 |5.0| +--+---+
结果数据框:
+--+---+-------+ |id|num|new_Col| +--+---+-------+ |4 |9.0| 7.0 | +--+---+-------+ |3 |7.0| 3.0 | +--+---+-------+ |2 |3.0| 5.0 | +--+---+-------+
我通常使用以下方法设法将新列“追加”到数据框: df.withColumn("new_Col", df.num * 10)
df.withColumn("new_Col", df.num * 10)
但是,我不知道如何为新列实现这种“行移位”,以便新列具有上一行的字段值(如示例中所示)。我也无法在API文档中找到有关如何通过索引访问DF中特定行的任何内容。
任何帮助,将不胜感激。
您可以lag如下使用窗口功能
lag
from pyspark.sql.functions import lag, col from pyspark.sql.window import Window df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"]) w = Window().partitionBy().orderBy(col("id")) df.select("*", lag("num").over(w).alias("new_col")).na.drop().show() ## +---+---+-------+ ## | id|num|new_col| ## +---+---+-------| ## | 2|3.0| 5.0| ## | 3|7.0| 3.0| ## | 4|9.0| 7.0| ## +---+---+-------+
但是有一些重要的问题:
尽管第二个问题几乎从来都不是问题,但第一个问题可以成为破坏交易的方法。如果是这种情况,您应该简单地将其转换DataFrame为RDD并lag手动进行计算。
DataFrame