在我们的应用程序中,我们使用Spark sql获取字段值作为列。我正在尝试弄清楚如何将列值放入嵌套的json对象并推送到Elasticsearch。还有一种方法可以参数化值selectExpr以传递给正则表达式?
selectExpr
我们目前正在使用Spark Java API。
Dataset<Row> data = rowExtracted.selectExpr("split(value,\"[|]\")[0] as channelId", "split(value,\"[|]\")[1] as country", "split(value,\"[|]\")[2] as product", "split(value,\"[|]\")[3] as sourceId", "split(value,\"[|]\")[4] as systemId", "split(value,\"[|]\")[5] as destinationId", "split(value,\"[|]\")[6] as batchId", "split(value,\"[|]\")[7] as orgId", "split(value,\"[|]\")[8] as businessId", "split(value,\"[|]\")[9] as orgAccountId", "split(value,\"[|]\")[10] as orgBankCode", "split(value,\"[|]\")[11] as beneAccountId", "split(value,\"[|]\")[12] as beneBankId", "split(value,\"[|]\")[13] as currencyCode", "split(value,\"[|]\")[14] as amount", "split(value,\"[|]\")[15] as processingDate", "split(value,\"[|]\")[16] as status", "split(value,\"[|]\")[17] as rejectCode", "split(value,\"[|]\")[18] as stageId", "split(value,\"[|]\")[19] as stageStatus", "split(value,\"[|]\")[20] as stageUpdatedTime", "split(value,\"[|]\")[21] as receivedTime", "split(value,\"[|]\")[22] as sendTime" ); StreamingQuery query = data.writeStream() .outputMode(OutputMode.Append()).format("es").option("checkpointLocation", "C:\\checkpoint") .start("spark_index/doc")
实际输出:
{ "_index": "spark_index", "_type": "doc", "_id": "test123", "_version": 1, "_score": 1, "_source": { "channelId": "test", "country": "SG", "product": "test", "sourceId": "", "systemId": "test123", "destinationId": "", "batchId": "", "orgId": "test", "businessId": "test", "orgAccountId": "test", "orgBankCode": "", "beneAccountId": "test", "beneBankId": "test", "currencyCode": "SGD", "amount": "53.0000", "processingDate": "", "status": "Pending", "rejectCode": "test", "stageId": "123", "stageStatus": "Comment", "stageUpdatedTime": "2019-08-05 18:11:05.999000", "receivedTime": "2019-08-05 18:10:12.701000", "sendTime": "2019-08-05 18:11:06.003000" } }
我们需要在节点“ txn_summary”下的上述列,例如以下json:
预期产量:
{ "_index": "spark_index", "_type": "doc", "_id": "test123", "_version": 1, "_score": 1, "_source": { "txn_summary": { "channelId": "test", "country": "SG", "product": "test", "sourceId": "", "systemId": "test123", "destinationId": "", "batchId": "", "orgId": "test", "businessId": "test", "orgAccountId": "test", "orgBankCode": "", "beneAccountId": "test", "beneBankId": "test", "currencyCode": "SGD", "amount": "53.0000", "processingDate": "", "status": "Pending", "rejectCode": "test", "stageId": "123", "stageStatus": "Comment", "stageUpdatedTime": "2019-08-05 18:11:05.999000", "receivedTime": "2019-08-05 18:10:12.701000", "sendTime": "2019-08-05 18:11:06.003000" } } }
将所有列添加到顶层结构应提供预期的输出。在Scala中:
import org.apache.spark.sql.functions._ data.select(struct(data.columns:_*).as("txn_summary"))
在Java中,我怀疑是这样的:
import org.apache.spark.sql.functions.struct; data.select(struct(data.columns()).as("txn_summary"));