我正在尝试在Apache Spark sql上运行查询。第一个查询工作正常,但是第二个查询也删除了空值。
代码 :
def main(args: Array[String]) { val sc = new SparkContext("local[*]", "Spark") val sqlContext = new SQLContext(sc) val pageViewsDF = getDataframe(sc, sqlContext) println("RUNNING SQL QUERIES ") sqlContext.sql("select name , count(*) from pageviews_by_second group by name").show(10) sqlContext.sql("select name , count(*) from pageviews_by_second where name not in (\"Rose\") group by name").show(10) } def getDataframe(sc: SparkContext, sqlContext: SQLContext): DataFrame = { Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); val dataArray = List(List("David", null), List("David", null), List("Charlie", "23"), List("Rose", null), List("Ben", null), List("Harry", "43"), List(null, "25"), List(null, "21"), List("David", "15"), List("Rose", null), List("Alan", "26")) val separator = "," // Create an RDD val dataRDD = sc.parallelize(dataArray) // The schema is encoded in a string val header = "name,age" // Import Spark SQL data types and Row. import org.apache.spark.sql._ // Generate the schema based on the string of schema val schema = StructType( header.split(separator).map { fieldName => StructField(fieldName, StringType, true) }) val rowRDD = dataRDD .map(p => Row(p(0), p(1))) // Apply the schema to the RDD. var df = sqlContext.createDataFrame(rowRDD, schema) df.registerTempTable("pageviews_by_second") df }
第一次查询的结果是:
+-------+---+ | name|_c1| +-------+---+ | Alan| 1| | Ben| 1| | David| 3| |Charlie| 1| | Rose| 2| | Harry| 1| | null| 2| +-------+---+
和第二个查询的输出:
+-------+---+ | name|_c1| +-------+---+ | Alan| 1| | Ben| 1| | David| 3| |Charlie| 1| | Harry| 1| +-------+---+
在第二个查询中,我仅排除“ Rose”,但也排除了“ null”。
如果我的查询有误,请帮助我进行正确的查询。
发生这种情况是因为NULL在SQL中等效于“未知”。这意味着与/NULL以外的任何比较均未定义并返回。IS NULL``IS NOT NULL``NULL
NULL
IS NULL``IS NOT NULL``NULL
case class Record(id: Integer, value: String) val df = sc.parallelize(Seq(Record(1, "foo"), Record(2, null))).toDF df.registerTempTable("df") sqlContext.sql("""SELECT value = "foo" FROM df""").show // +----+ // | _c0| // +----+ // |true| // |null| // +----+ sqlContext.sql("""SELECT value != "foo" FROM df""").show // +-----+ // | _c0| // +-----+ // |false| // | null| // +-----+
因此,IN/NOT IN也未定义:
IN
NOT IN
sqlContext.sql("""SELECT value IN ("foo", "bar") FROM df""").show // +----+ // | _c0| // +----+ // |true| // |null| // +----+
这是标准的SQL行为,正确实施SQL标准的系统应以相同的方式运行。如果要过滤并保留NULLs,则必须明确地进行过滤:
NULLs
sqlContext.sql( """SELECT value IN ("foo", "bar") OR value IS NULL FROM df""").show // +----+ // | _c0| // +----+ // |true| // |true| // +----+