我目前正在尝试从MongoDB中提取数据库,并使用Spark来将其提取到ElasticSearch中geo_points。
geo_points
Mongo数据库具有纬度和经度值,但是ElasticSearch要求将它们强制转换为geo_point类型。
geo_point
Spark中是否可以将latand lon列复制到arrayor 的新列struct?
lat
lon
array
struct
任何帮助表示赞赏!
我假设您从某种平面模式开始,如下所示:
root |-- lat: double (nullable = false) |-- long: double (nullable = false) |-- key: string (nullable = false)
首先让我们创建示例数据:
import org.apache.spark.sql.Row import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types._ val rdd = sc.parallelize( Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil) val schema = StructType( StructField("lat", DoubleType, false) :: StructField("long", DoubleType, false) :: StructField("key", StringType, false) ::Nil) val df = sqlContext.createDataFrame(rdd, schema)
一种简单的方法是使用udf和case类:
case class Location(lat: Double, long: Double) val makeLocation = udf((lat: Double, long: Double) => Location(lat, long)) val dfRes = df. withColumn("location", makeLocation(col("lat"), col("long"))). drop("lat"). drop("long") dfRes.printSchema
我们得到
root |-- key: string (nullable = false) |-- location: struct (nullable = true) | |-- lat: double (nullable = false) | |-- long: double (nullable = false)
一种困难的方法是转换数据并随后应用模式:
val rddRes = df. map{case Row(lat, long, key) => Row(key, Row(lat, long))} val schemaRes = StructType( StructField("key", StringType, false) :: StructField("location", StructType( StructField("lat", DoubleType, false) :: StructField("long", DoubleType, false) :: Nil ), true) :: Nil ) sqlContext.createDataFrame(rddRes, schemaRes).show
我们得到了预期的输出
+------+-------------+ | key| location| +------+-------------+ |Warsaw|[52.23,21.01]| | Corte| [42.3,9.15]| +------+-------------+
从头开始创建嵌套模式可能很繁琐,因此,如果可以的话,我建议您采用第一种方法。如果需要更复杂的结构,可以轻松扩展它:
case class Pin(location: Location) val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long)) df. withColumn("pin", makePin(col("lat"), col("long"))). drop("lat"). drop("long"). printSchema
我们得到预期的输出:
root |-- key: string (nullable = false) |-- pin: struct (nullable = true) | |-- location: struct (nullable = true) | | |-- lat: double (nullable = false) | | |-- long: double (nullable = false)
不幸的是,您无法控制nullable字段,因此如果对您的项目很重要,则必须指定架构。
nullable
最后,您可以使用struct1.4中引入的功能:
import org.apache.spark.sql.functions.struct df.select($"key", struct($"lat", $"long").alias("location"))