我一直在搜索,还没有找到一种解决方案,该解决方案是如何使用Spark SQL查询从纪元存储为UTC毫秒的日期。我从NoSQL数据源(来自MongoDB的JSON)提取的架构的目标日期为:
|-- dateCreated: struct (nullable = true) ||-- $date: long (nullable = true)
完整的架构如下:
scala> accEvt.printSchema root |-- _id: struct (nullable = true) | |-- $oid: string (nullable = true) |-- appId: integer (nullable = true) |-- cId: long (nullable = true) |-- data: struct (nullable = true) | |-- expires: struct (nullable = true) | | |-- $date: long (nullable = true) | |-- metadata: struct (nullable = true) | | |-- another key: string (nullable = true) | | |-- class: string (nullable = true) | | |-- field: string (nullable = true) | | |-- flavors: string (nullable = true) | | |-- foo: string (nullable = true) | | |-- location1: string (nullable = true) | | |-- location2: string (nullable = true) | | |-- test: string (nullable = true) | | |-- testKey: string (nullable = true) | | |-- testKey2: string (nullable = true) |-- dateCreated: struct (nullable = true) | |-- $date: long (nullable = true) |-- id: integer (nullable = true) |-- originationDate: struct (nullable = true) | |-- $date: long (nullable = true) |-- processedDate: struct (nullable = true) | |-- $date: long (nullable = true) |-- receivedDate: struct (nullable = true) | |-- $date: long (nullable = true)
我的目标是按照以下方式编写查询:
SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1]
到目前为止,我的过程是:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@29200d25 scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json") ... 14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s accEvt: org.apache.spark.sql.SchemaRDD = SchemaRDD[6] at RDD at SchemaRDD.scala:103 scala> accEvt.registerAsTable("accomplishmentEvent")
(此时,以下基准查询成功执行)
scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println) ... [74475]
现在,我无法理解的巫毒教徒是如何形成我的select语句来推断日期。例如,以下代码执行w / o错误,但返回零,而不是应有的所有记录数(74475)。
scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println) ... [0]
我也尝试过一些丑陋的事情,例如:
scala> val now = new java.util.Date() now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014 scala> val today = now.getTime today: Long = 1414613115743 scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000) thirtydaysago: Long = 1416316083039 scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println)
按照建议,我选择了一个命名字段以确保其有效。所以:
scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println)
返回:
[[1376318850033]] [[1376319429590]] [[1376320804289]] [[1376320832835]] [[1376320832960]] [[1376320835554]] [[1376320914480]] [[1376321041899]] [[1376321109341]] [[1376321121469]]
然后扩展以尝试获得某种日期,我已经尝试过:
scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println)
结果错误:
java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true))) ...
在字段名前加上$建议的前缀也会导致另一种错误:
$
scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println) java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5
显然,我不知道如何选择以这种方式存储的日期-有人可以帮我填补这一空白吗?
我对Scala和Spark都比较陌生,因此如果这是一个基本问题,请原谅我,但是我在论坛和Spark文档上的搜索都变成了空白。
谢谢你。
您的JSON不平坦,因此需要使用限定名称(例如)来对顶级以下的字段进行寻址dateCreated.$date。您的特定日期字段都是long类型,因此您需要对它们进行数值比较,看起来您在正确的位置上。
dateCreated.$date
long
另一个问题是您的字段名称包含“ $”字符,并且Spark SQL不允许您对其进行查询。一种解决方案是,SchemaRDD首先将它读为,而不是直接将JSON读为,而是RDD[String]使用map方法执行您选择的Scala字符串操作,然后使用SQLContext的jsonRDD方法创建SchemaRDD。
SchemaRDD
RDD[String]
map
SQLContext
jsonRDD
val lines = sc.textFile(...) // you may want something less naive than global replacement of all "$" chars val linesFixed = lines.map(s => s.replaceAllLiterally("$", "")) val accEvt = sqlContext.jsonRDD(linesFixed)
我已经使用Spark 1.1.0测试了这一点。
作为参考,此错误报告及其他报告中已指出Spark SQL中缺乏报价功能,并且似乎该修复程序已在最近签入,但要花一些时间才能发布到版本中