我有一个将其写入ES的DataFrame
在写ES之前,我将EVTExit列转换为EPOCH中的Date。
EVTExit
workset = workset.withColumn("EVTExit", to_date(from_unixtime($"EVTExit".divide(1000)))) workset.select("EVTExit").show(10) +----------+ | EVTExit| +----------+ |2014-06-03| |null | |2012-10-23| |2014-06-03| |2015-11-05|
如我所见,这EVTExit已转换为日期。
workset.write.format("org.elasticsearch.spark.sql").save("workset/workset1")
但是在将其写入ES之后,我仍然可以使用EPOC格式。
"EVTExit" : 1401778800000
任何人都可以知道这里出了什么问题。
谢谢
让我们考虑一下DataFrame您的问题中的示例:
DataFrame
scala> val df = workset.select("EVTExit") // df: org.apache.spark.sql.DataFrame = [EVTExit: date] scala> df.printSchema // root // |-- EVTExit: date (nullable = true)
您将需要列铸造成一个字符串,并禁用es.mapping.date.rich这是true默认。
es.mapping.date.rich
true
该参数定义是为Elasticsearch中的Date字段创建类似Rich Date的对象还是将其作为原语(字符串或long)返回。实际的对象类型基于所使用的库。值得注意的 异常是Map / Reduce,它不提供内置的Date对象,因此无论此设置如何,都会返回LongWritable和Text。
我同意,这是违反直觉的,但是如果您希望elasticsearch不将其转换为long格式,则它是目前唯一的解决方案。这实际上是很痛苦的。
elasticsearch
long
scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string")) // df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string] scala> df2.show // +----------+----------+ // | EVTExit| EVTExit_1| // +----------+----------+ // |2014-06-03|2014-06-03| // | null| null| // |2012-10-23|2012-10-23| // |2014-06-03|2014-06-03| // |2015-11-05|2015-11-05| // +----------+----------+
现在您可以将数据写入elasticsearch:
scala> df2.write.format("org.elasticsearch.spark.sql").option("es.mapping.date.rich", "false").save("workset/workset1")
现在,让我们检查一下ES上的内容。首先让我们看一下映射:
$ curl -XGET localhost:9200/workset?pretty=true { "workset" : { "aliases" : { }, "mappings" : { "workset1" : { "properties" : { "EVTExit" : { "type" : "long" }, "EVTExit_1" : { "type" : "date", "format" : "strict_date_optional_time||epoch_millis" } } } }, "settings" : { "index" : { "creation_date" : "1475063310916", "number_of_shards" : "5", "number_of_replicas" : "1", "uuid" : "i3Rb014sSziCmYm9LyIc5A", "version" : { "created" : "2040099" } } }, "warmers" : { } } }
看来我们有约会了。现在让我们检查一下内容:
$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }' { "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 5, "max_score" : 1.0, "hits" : [ { "_index" : "workset", "_type" : "workset1", "_id" : "AVdwn-vFWzMbysX5OjMA", "_score" : 1.0, "_source" : { "EVTExit" : 1401746400000, "EVTExit_1" : "2014-06-03" } } ] } }
注意1: 我将两个字段都保留用于演示目的,但我认为您明白了。
注意2:在内部 对Elasticsearch 2.4,Spark 1.6.2,scala 2.10和elasticsearch-spark 2.3.2进行了测试spark-shell
spark-shell
$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2
注3: 与相同的解决方案pyspark:
pyspark
from pyspark.sql.functions import col df2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string")) df2.write.format("org.elasticsearch.spark.sql") \ .option("es.mapping.date.rich", "false").save("workset/workset1")