初学者ES问题在这里
将Spark数据框推送到Elastic Search的工作流程或步骤是什么?
通过研究,我相信我需要使用spark.newAPIHadoopFile()方法。
但是,在研究ElasticSearch文档和其他StackQ / A时,我仍然对参数需要采用的格式以及为什么使用它感到困惑
请注意,我正在使用pyspark,这是ES的新表(尚无索引),并且df是5列(2个字符串类型,2个长类型和1个整数列表),具有约350万行。
设法找到答案,所以我会分享。SparkDF(来自pyspark.sql)目前不支持该newAPIHadoopFile()方法;但是,这df.rdd.saveAsNewAPIHadoopFile()也给了我错误。诀窍是通过以下功能将df转换为字符串
newAPIHadoopFile()
df.rdd.saveAsNewAPIHadoopFile()
def transform(doc): import json import hashlib _json = json.dumps(doc) keys = doc.keys() for key in keys: if doc[key] == 'null' or doc[key] == 'None': del doc[key] if not doc.has_key('id'): id = hashlib.sha224(_json).hexdigest() doc['id'] = id else: id = doc['id'] _json = json.dumps(doc) return (id, _json)
所以我的JSON工作流程是:
1: df = spark.read.json('XXX.json')
df = spark.read.json('XXX.json')
2: rdd_mapped = df.rdd.map(lambda y: y.asDict())
rdd_mapped = df.rdd.map(lambda y: y.asDict())
3: final_rdd = rdd_mapped.map(transform)
final_rdd = rdd_mapped.map(transform)
4:
final_rdd.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "<INDEX> / <INDEX>", "es.mapping.id":"id", "es.input.json": "true", "es.net.http.auth.user":"elastic", "es.write.operation":"index", "es.nodes.wan.only":"false", "es.net.http.auth.pass":"changeme", "es.nodes":"<NODE1>, <NODE2>, <NODE3>...", "es.port":"9200" })
有关ES参数的更多信息,请参见此处(滚动到“配置”)