一尘不染

PySpark DataFrames-枚举而不转换为熊猫的方法?

python

我有一个很大的 pyspark.sql.dataframe.DataFrame 名为df。我需要某种枚举记录的方式-
因此,能够访问具有特定索引的记录。(或选择具有索引范围的记录组)

在大熊猫中,我可以

indexes=[2,3,6,7] 
df[indexes]

在这里我想要类似的东西 (并且不将数据框转换为熊猫)

我最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:
        indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
* 使用where()函数搜索所需的值。

问题:

  1. 为什么它不起作用以及如何使其起作用?如何在数据框中添加一行?
  2. 以后可以做类似的事情吗:
         indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
  1. 有没有更快,更简单的处理方法?

阅读 134

收藏
2020-12-20

共1个答案

一尘不染

它不起作用,因为:

  1. 的第二个参数withColumn应该Column不是一个集合。np.array在这里不会工作
  2. 当您将"index in indexes"SQL表达式传递给时where indexes超出范围,并且不能将其解析为有效标识符

PySpark > = 1.4.0

您可以使用相应的窗口函数添加行号,并使用Column.isin方法或格式正确的查询字符串进行查询:

    from pyspark.sql.functions import col, rowNumber
    from pyspark.sql.window import Window

    w = Window.orderBy()
    indexed = df.withColumn("index", rowNumber().over(w))

    # Using DSL
    indexed.where(col("index").isin(set(indexes)))

    # Using SQL expression
    indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))

看起来调用无PARTITION BY子句的窗口函数会将所有数据移动到单个分区,因此上述毕竟不是最佳解决方案。

有没有更快,更简单的处理方法?

并不是的。Spark DataFrames不支持随机行访问。

PairedRDD``lookup如果使用进行分区,则可以使用相对较快的方法进行访问HashPartitioner。还有一个index-
rdd
项目,它支持有效的查找。

编辑

与PySpark版本无关,您可以尝试执行以下操作:

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, LongType

    row = Row("char")
    row_with_index = Row("char", "index")

    df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
    df.show(5)

    ## +----+
    ## |char|
    ## +----+
    ## |   a|
    ## |   b|
    ## |   c|
    ## |   d|
    ## |   e|
    ## +----+
    ## only showing top 5 rows

    # This part is not tested but should work and save some work later
    schema  = StructType(
        df.schema.fields[:] + [StructField("index", LongType(), False)])

    indexed = (df.rdd # Extract rdd
        .zipWithIndex() # Add index
        .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
        .toDF(schema)) # It will work without schema but will be more expensive

    # inSet in Spark < 1.3
    indexed.where(col("index").isin(indexes))
2020-12-20