一尘不染

重塑/旋转Spark RDD和/或Spark DataFrames中的数据

python

我有以下格式的数据(RDD或Spark DataFrame):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

 rdd = sc.parallelize([('X01',41,'US',3),
                       ('X01',41,'UK',1),
                       ('X01',41,'CA',2),
                       ('X02',72,'US',4),
                       ('X02',72,'UK',6),
                       ('X02',72,'CA',7),
                       ('X02',72,'XX',8)])

# convert to a Spark DataFrame                    
schema = StructType([StructField('ID', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Country', StringType(), True),
                     StructField('Score', IntegerType(), True)])

df = sqlContext.createDataFrame(rdd, schema)

我想做的是“重塑”数据,将“国家/地区”中的某些行(特别是美国,英国和加拿大)转换为列:

ID    Age  US  UK  CA  
'X01'  41  3   1   2  
'X02'  72  4   6   7

本质上,我需要一些与Pythonpivot工作流程类似的东西:

categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', 
                                                  columns = 'Country',
                                                  values = 'Score')

我的数据集很大,因此我无法真正collect()将数据吸收到内存中以在Python本身中进行重塑。有没有办法.pivot()在映射RDD或Spark
DataFrame时将Python转换为可调用函数?任何帮助,将不胜感激!


阅读 135

收藏
2020-12-20

共1个答案

一尘不染

从Spark
1.6开始,您可以使用pivotfunctionGroupedData并提供聚合表达式。

pivoted = (df
    .groupBy("ID", "Age")
    .pivot(
        "Country",
        ['US', 'UK', 'CA'])  # Optional list of levels
    .sum("Score"))  # alternatively you can use .agg(expr))
pivoted.show()

## +---+---+---+---+---+
## | ID|Age| US| UK| CA|
## +---+---+---+---+---+
## |X01| 41|  3|  1|  2|
## |X02| 72|  4|  6|  7|
## +---+---+---+---+---+

电平可以省略,但如果提供,则可以提高性能并用作内部滤波器。

该方法仍然相对较慢,但肯定胜过了在JVM和Python之间手动传递数据。

2020-12-20