简短的问题:如果我有每个存储区的top_hits的汇总,如何在结果结构中求和特定值?
细节:
我有许多记录,每个商店包含一定数量。我想获取每个商店的所有最新记录的总和。
为了获得每个商店的最新记录,我创建以下聚合:
"latest_quantity_per_store": { "aggs": { "latest_quantity": { "top_hits": { "sort": [ { "datetime": "desc" }, { "quantity": "asc" } ], "_source": { "includes": [ "quantity" ] }, "size": 1 } } }, "terms": { "field": "store", "size": 10000 } }
假设我有两个商店,每个商店有两个数量用于两个不同的时间戳。这是该聚合的结果:
"latest_quantity_per_store": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "01", "doc_count": 2, "latest_quantity": { "hits": { "total": 2, "max_score": null, "hits": [ { "_index": "inventory-local", "_type": "doc", "_id": "O6wFD2UBG8e7nvSU8dYg", "_score": null, "_source": { "quantity": 6 }, "sort": [ 1532476800000, 6 ] } ] } } }, { "key": "02", "doc_count": 2, "latest_quantity": { "hits": { "total": 2, "max_score": null, "hits": [ { "_index": "inventory-local", "_type": "doc", "_id": "pLUFD2UBHBuSGcoH0ZT4", "_score": null, "_source": { "quantity": 11 }, "sort": [ 1532476800000, 11 ] } ] } } } ] }
我现在想在ElasticSearch中进行汇总,以汇总这些存储桶中的总和。在示例数据中,总和超过6和11。我尝试了以下聚合:
"latest_quantity": { "sum_bucket": { "buckets_path": "latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity" } }
但这导致此错误:
{ "error": { "root_cause": [ { "type": "illegal_argument_exception", "reason": "No aggregation [hits] found for path [latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity]" } ], "type": "search_phase_execution_exception", "reason": "all shards failed", "phase": "query", "grouped": true, "failed_shards": [ { "shard": 0, "index": "inventory-local", "node": "3z5CqmmAQ-yT2sUCb69DzA", "reason": { "type": "illegal_argument_exception", "reason": "No aggregation [hits] found for path [latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity]" } } ] }, "status": 400 }
什么是正确的聚合,以某种方式从ElasticSearch获得数字17?
我对另一个聚合进行了类似的操作,即平均值而不是top_hits聚合。
"average_quantity": { "sum_bucket": { "buckets_path": "average_quantity_per_store>average_quantity" } }, "average_quantity_per_store": { "aggs": { "average_quantity": { "avg": { "field": "quantity" } } }, "terms": { "field": "store", "size": 10000 } }
这可以按预期工作,这是结果:
"average_quantity_per_store": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "01", "doc_count": 2, "average_quantity": { "value": 6 } }, { "key": "02", "doc_count": 2, "average_quantity": { "value": 11.5 } } ] }, "average_quantity": { "value": 17.5 }
有一种方法可以使用scripted_metric聚合和sum_bucket管道聚合的组合来解决。脚本化的指标聚合有点复杂,但主要思想是允许您提供自己的存储算法并从中吐出单个指标。
scripted_metric
sum_bucket
在您的情况下,您要做的是找出每个商店的最新数量,然后对这些商店数量求和。解决方案如下所示,我将在下面解释一些细节:
POST inventory-local/_search { "size": 0, "aggs": { "bystore": { "terms": { "field": "store.keyword", "size": 10000 }, "aggs": { "latest_quantity": { "scripted_metric": { "init_script": "params._agg.quantities = new TreeMap()", "map_script": "params._agg.quantities.put(doc.datetime.date, [doc.datetime.date.millis, doc.quantity.value])", "combine_script": "return params._agg.quantities.lastEntry().getValue()", "reduce_script": "def maxkey = 0; def qty = 0; for (a in params._aggs) {def currentKey = a[0]; if (currentKey > maxkey) {maxkey = currentKey; qty = a[1]} } return qty;" } } } }, "sum_latest_quantities": { "sum_bucket": { "buckets_path": "bystore>latest_quantity.value" } } } }
请注意,为了使其正常工作,您需要script.painless.regex.enabled: true在elasticsearch.yml配置文件中进行设置。
script.painless.regex.enabled: true
elasticsearch.yml
在init_script创建TreeMap每个碎片。使用日期/数量的映射map_script填充TreeMap每个分片上的。我们在地图中输入的值在单个字符串中包含时间戳和数量。我们稍后将需要该时间戳记reduce_script。在combine_script简单地采取的最后一个值TreeMap,因为这是给定的碎片最新的量。大部分工作位于reduce_script。我们迭代每个分片的所有最新数量,并返回最新的数量。
init_script
TreeMap
map_script
reduce_script
combine_script
此时,我们为每个商店提供了最新数量。剩下要做的就是使用sum_bucket管道聚合来求和每个存储量。在那里,您得到17的结果。
响应如下所示:
"aggregations": { "bystore": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "key": "01", "doc_count": 2, "latest_quantity": { "value": 6 } }, { "key": "02", "doc_count": 2, "latest_quantity": { "value": 11 } } ] }, "sum_latest_quantities": { "value": 17 } }