我正在使用带有Elasticsearch的Apache Spark 1.5数据帧,我尝试从包含ID列表(数组)的列中过滤ID。
例如,elasticsearch列的映射如下所示:
{ "people":{ "properties":{ "artist":{ "properties":{ "id":{ "index":"not_analyzed", "type":"string" }, "name":{ "type":"string", "index":"not_analyzed", } } } } }
示例数据格式如下
{ "people": { "artist": { [ { "id": "153", "name": "Tom" }, { "id": "15389", "name": "Cok" } ] } } }, { "people": { "artist": { [ { "id": "369", "name": "Carl" }, { "id": "15389", "name": "Cok" }, { "id": "698", "name": "Sol" } ] } } }
在火花我尝试这样做:
val peopleId = 152 val dataFrame = sqlContext.read .format("org.elasticsearch.spark.sql") .load("index/type") dataFrame.filter(dataFrame("people.artist.id").contains(peopleId)) .select("people_sequence.artist.id")
我得到了包含152的所有id,例如1523,152978,但不仅id == 152
然后我尝试
dataFrame.filter(dataFrame("people.artist.id").equalTo(peopleId)) .select("people.artist.id")
我变得空虚,我明白为什么,这是因为我有很多人。artist.id
有人可以告诉我如何在有ID列表时进行过滤吗?
在Spark 1.5+中,您可以使用array_contains功能:
array_contains
df.where(array_contains($"people.artist.id", "153"))
如果您使用的是较早版本,则可以尝试这样的UDF:
val containsId = udf( (rs: Seq[Row], v: String) => rs.map(_.getAs[String]("id")).exists(_ == v)) df.where(containsId($"people.artist", lit("153")))