一尘不染

用新值更新数据框列

python

df1具有字段id和json;df2具有字段id和json

df1.count()=> 1200;df2.count()=> 20

df1具有所有行。df2的增量更新只有20行。

我的目标是使用中的值更新df1 df2。的所有IDdf2在df1中。但是df2json为这些相同的ID更新了值(在该字段中)。

结果df应该具有的所有值df1和的更新的值df2。

做这个的最好方式是什么?-具有最少数量的联接和过滤器。

谢谢!


阅读 262

收藏
2021-01-20

共1个答案

一尘不染

您可以使用一个左连接来实现。

创建示例数据框

使用@Shankar Koirala在其答案中提供的样本数据。

data1 = [
  (1, "a"),
  (2, "b"),
  (3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])

data2 = [
  (1, "x"), 
  (2, "y")
]

df2 = sqlCtx.createDataFrame(data2, ["id", "value"])

左加入

使用id列上的左联接将两个DataFrame联接起来。这会将所有行保留在左侧的DataFrame中。对于右侧DataFrame中没有匹配项的行id,其值为null。

import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
         f.col('l.value').alias('left_value'),
         f.col('r.value').alias('right_value')
    )\
    .show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#|  1|         a|          x|
#|  3|         c|       null|
#|  2|         b|          y|
#+---+----------+-----------+

选择所需的数据

我们将利用不匹配的idsnull选择最后一列的事实。使用pyspark.sql.functions.when()使用权价值,如果它不为空,否则保持左值。

df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
        f.when(
            ~f.isnull(f.col('r.value')),
            f.col('r.value')
        ).otherwise(f.col('l.value')).alias('value')
    )\
    .show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

您可以id按顺序排序此输出。

使用pyspark-sql

您可以使用pyspark-sql查询执行相同的操作:

df1.registerTempTable('df1')
df2.registerTempTable('df2')

query = """SELECT l.id, 
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value 
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+
2021-01-20