我使用SUM聚合计算服务过程的持续时间。执行过程的每一步都将保存在Elasticsearch中的调用ID下。
这是我监视的内容:
Duration of Request-Processing for ID #123 (calling service #1) Duration of Server-Response for ID #123 (calling service #1) **Complete Duration for ID #123** Duration of Request-Processing for ID #124 (calling service #1) Duration of Server-Response for ID #124 (calling service #1) **Complete duration for ID #124**
过滤:
{ "from" : 0, "size" :0, "query" : { "filtered" : { "query" : { "match_all" : {}}, "filter" : { "term" : { "callingId" : "123", } } } }, "aggs" : { "total_duration" : { "sum" : { "field" : "duration" } }, "max_duration":{"max": {"field":"duration"}}, "min_duration":{"min":{"field":"duration"}} } } }
这将返回该过程的完整持续时间,并且还告诉我该过程的哪一部分是最快的,而哪一部分是最慢的。
接下来,我要通过serviceId 计算 所有已完成过程 的平均 持续时间 。在这种情况下,我只关心每项服务的总时长,因此我可以提供帮助。
如何从total_durations中创建平均值,最小值和最大值?
编辑:我添加了一些示例数据,希望您可以使用它。
通话1:
{ "callerId":"U1", "operation":"Initialize", "status":"INITIALIZED", "duration":1, "serviceId":"1" } { "callerId":"U1", "operation":"Calculate", "status":"STARTED", "duration":1, "serviceId":"1" } { "callerId":"U1", "operation":"Finish", "status":"FINISHED", "duration":1200, "serviceId":"1" } sum: 1202
通话2:
{ "callerId":"U2", "operation":"Initialize", "status":"INITIALIZED", "duration":2, "serviceId":"1" } { "callerId":"U2", "operation":"Calculate", "status":"STARTED", "duration":1, "serviceId":"1" } { "callerId":"U2", "operation":"Finish", "status":"FINISHED", "duration":1030, "serviceId":"1" } sum: 1033
服务ID#1的所有服务呼叫的汇总 这是我要计算的:
Max: 1202 Min: 1033 AVG: 1116
稍微复杂一点,但是在这里(由于这种类型的聚合,仅在1.4 中使用):
{ "query": { "filtered": { "query": { "match_all": {} }, "filter": { "term": { "serviceId": 1 } } } }, "aggs": { "executionTimes": { "scripted_metric": { "init_script": "_agg['values'] = new java.util.HashMap();", "map_script": "if (_agg.values[doc['callerId'].value]==null) {_agg.values[doc['callerId'].value]=doc['duration'].value;} else {_agg.values[doc['callerId'].value].add(doc['duration'].value);}", "combine_script":"someHashMap = new java.util.HashMap();for(x in _agg.values.keySet()) {value=_agg.values[x]; sum=0; for(y in value) {sum+=y}; someHashMap.put(x,sum)}; return someHashMap;", "reduce_script": "finalArray = []; finalMap = new java.util.HashMap(); for(map in _aggs){for(x in map.keySet()){if(finalMap.containsKey(x)){value=finalMap.get(x);finalMap.put(x,value+map.get(x));} else {finalMap.put(x,map.get(x))}}}; finalAvgValue=0; finalMaxValue=-1; finalMinValue=-1; for(key in finalMap.keySet()){currentValue=finalMap.get(key);finalAvgValue+=currentValue; if(finalMinValue<0){finalMinValue=currentValue} else if(finalMinValue>currentValue){finalMinValue=currentValue}; if(currentValue>finalMaxValue) {finalMaxValue=currentValue}}; finalArray.add(finalMaxValue); finalArray.add(finalMinValue); finalArray.add(finalAvgValue/finalMap.size()); return finalArray", "lang": "groovy" } } } }
另外,我并不是说这是最好的方法,但我只能找到一种方法。另外,我并不是说解决方案处于最佳状态。可能会对其进行清理和改进。不过,我想证明这是可能的。但是请记住,它在1.4中可用。
该方法的基本思想是使用脚本来构建应包含所需信息的数据结构,并根据脚本化的度量汇总以不同的步骤进行计算。而且,仅对一个执行聚合serviceId。如果要对所有serviceId执行此操作,我想您可能需要重新考虑一下脚本中的数据结构。
serviceId
对于上面的查询以及您提供的确切数据,输出为:
{ "took": 3, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 6, "max_score": 0, "hits": [] }, "aggregations": { "executionTimes": { "value": [ 1202, 1033, "1117.5" ] } } }
根据中value的脚本,数组中值的顺序为[max,min,avg] reduce_script。
value
reduce_script