一尘不染

如何将Spark数据框推送到elasticsearch(Pyspark)

elasticsearch

初学者ES问题在这里

将Spark数据框推送到Elastic Search的工作流程或步骤是什么?

通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。

但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑

请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个字符串类型,2个长类型和1个整数列表),具有约350万行。


阅读 294

收藏
2020-06-22

共1个答案

一尘不染

设法找到答案,所以我会分享。SparkDF(来自pyspark.sql)目前不支持该newAPIHadoopFile()方法;但是,这df.rdd.saveAsNewAPIHadoopFile()也给了我错误。诀窍是通过以下功能将df转换为字符串

def transform(doc):
    import json
    import hashlib

    _json = json.dumps(doc)
    keys = doc.keys()
    for key in keys:
        if doc[key] == 'null' or doc[key] == 'None':
            del doc[key]
    if not doc.has_key('id'):
        id = hashlib.sha224(_json).hexdigest()
        doc['id'] = id
    else:
        id = doc['id']
    _json = json.dumps(doc)
    return (id, _json)

所以我的JSON工作流程是:

1: df = spark.read.json('XXX.json')

2: rdd_mapped = df.rdd.map(lambda y: y.asDict())

3: final_rdd = rdd_mapped.map(transform)

4:

final_rdd.saveAsNewAPIHadoopFile(
     path='-', 
     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
     keyClass="org.apache.hadoop.io.NullWritable",  
     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
     conf={ "es.resource" : "<INDEX> / <INDEX>", "es.mapping.id":"id", 
         "es.input.json": "true", "es.net.http.auth.user":"elastic",
         "es.write.operation":"index", "es.nodes.wan.only":"false",
         "es.net.http.auth.pass":"changeme", "es.nodes":"<NODE1>, <NODE2>, <NODE3>...",
         "es.port":"9200" })

有关ES参数的更多信息,请参见此处(滚动到“配置”)

2020-06-22