我正在写一个小poc,试图将用python编写的一段逻辑重写到pyspark,其中我一一处理存储在sqlite中的日志:
logs = [...] processed_logs = [] previous_log = EmptyDecoratedLog() #empty for log in logs: processed_log = with_outlet_value_closed(log, previous_log) previous_log = processed_log processed_logs.append(processed_log)
和
def with_outlet_value_closed(current_entry: DecoratedLog, previous_entry: DecoratedLog): if current_entry.sourceName == "GS2": self.outletValveClosed = current_entry.eventData else: self.outletValveClosed = previous_entry.outletValveClosed
我想在 pyspark api 中表示为:
import pyspark.sql.functions as f window = W.orderBy("ID") #where ID is unique id on those logs df.withColumn("testValveOpened", f.when((f.col("sourceName") == "GS2"), f.col("eventData")) .otherwise(f.lag("testValveOpened").over(window)), )
但这会导致 AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION]outletValveClosed无法解析具有名称的列或函数参数。
outletValveClosed
所以我的问题是:是否可以表示这样的代码,其中当前行的值取决于同一列的前一行(我知道这将导致所有记录在单个线程上处理,但这很好)
我尝试添加列的初始化
df = df.withColumn("testValveOpened", f.lit(0)) df.withColumn("testValveOpened", f.when((f.col("sourceName") == "GS2"), f.col("eventData")) .otherwise(f.lag("testValveOpened").over(window)), )
但后来我得到了
ID |sourceName|eventData|testValveOpened 1 |GS3 |1 |0 2 |GS2 |1 |1 3 |GS2 |1 |1 4 |GS1 |1 |0 5 |GS1 |1 |0 6 |ABC |0 |0 7 |B123 |0 |0 8 |B423 |0 |0 9 |PTSD |168 |0 10 |XCD |0 |0
我想得到
ID |sourceName|eventData|testValveOpened 1 |GS3 |1 |0 2 |GS2 |1 |1 3 |GS2 |1 |1 4 |GS1 |1 |1 5 |GS1 |1 |1 6 |ABC |0 |1 7 |B123 |0 |1 8 |B423 |0 |1 9 |PTSD |168 |1 10 |XCD |0 |1
因此,当有 GS2 时,取 eventData 的值,否则取之前 testValueOpened 中的 cary 值
在PySpark中,您可以使用lag()函数来访问同一列的前一行值,并且在DataFrame的转换过程中实现类似于您描述的逻辑。但是,PySpark中的DataFrame API不直接支持在同一列上迭代地进行转换。不过,您可以通过使用窗口函数结合lag()函数来模拟这种行为。
lag()
下面是一个示例代码,演示了如何在PySpark中实现您的逻辑:
from pyspark.sql import SparkSession from pyspark.sql import Window import pyspark.sql.functions as F # 创建 SparkSession spark = SparkSession.builder \ .appName("PySpark Example") \ .getOrCreate() # 模拟日志数据 data = [ (1, "GS3", 1), (2, "GS2", 1), (3, "GS2", 1), (4, "GS1", 1), (5, "GS1", 1), (6, "ABC", 0), (7, "B123", 0), (8, "B423", 0), (9, "PTSD", 168), (10, "XCD", 0) ] # 创建DataFrame df = spark.createDataFrame(data, ["ID", "sourceName", "eventData"]) # 定义窗口规范 windowSpec = Window.orderBy("ID") # 使用lag函数和窗口规范来模拟迭代地处理同一列的逻辑 processed_df = df.withColumn("testValveOpened", F.when((F.col("sourceName") == "GS2"), F.col("eventData")) .otherwise(F.lag("testValveOpened").over(windowSpec)) .otherwise(F.lit(0)) # 设置默认值 ) # 显示结果 processed_df.show()
在这个示例中,我们首先创建了一个SparkSession,并模拟了日志数据。然后,我们创建了一个窗口规范,并使用lag()函数结合窗口规范来模拟迭代地处理同一列的逻辑。最后,我们将结果DataFrame显示出来。
请注意,如果想在同一列上迭代地进行转换,可能需要使用递归或者其他方法,但这样会比较复杂。上面给出的方法是一个简单的解决方案,可能并不是在所有情况下都适用。您需要根据具体的情况进行调整和优化。