有一个对话列表,每个对话都有一个消息列表。每个消息都有一个不同的字段和一个action字段。我们需要考虑的是,在对话的第一条消息中使用了动作A,在几条消息中使用了动作A.1之后,过了一会儿A.1.1,依此类推(有一个聊天机器人意图列表)。
action
A
A.1
A.1.1
将对话的消息动作分组将类似于: A > A > A > A.1 > A > A.1 > A.1.1 ...
A > A > A > A.1 > A > A.1 > A.1.1 ...
问题:
我需要使用ElasticSearch创建一个报告,该报告将返回actions group每次会话的;接下来,我需要对类似的东西进行分组并actions groups添加一个计数;最终将导致Map<actionsGroup, count>as 'A > A.1 > A > A.1 > A.1.1', 3。
actions group
actions groups
Map<actionsGroup, count>
'A > A.1 > A > A.1 > A.1.1', 3
构建actions groupI需要消除每组重复项;而不是A > A > A > A.1 > A > A.1 > A.1.1我需要拥有A > A.1 > A > A.1 > A.1.1。
A > A > A > A.1 > A > A.1 > A.1.1
A > A.1 > A > A.1 > A.1.1
我开始做的步骤 :
{ "collapse":{ "field":"context.conversationId", "inner_hits":{ "name":"logs", "size": 10000, "sort":[ { "@timestamp":"asc" } ] } }, "aggs":{ }, }
接下来我需要什么:
aggr
要么:
conversationId
更新2: 我设法取得了部分结果,但仍然有一个问题。检查我仍然需要解决的问题。
更新1: 添加一些其他详细信息
对应:
"mappings":{ "properties":{ "@timestamp":{ "type":"date", "format": "epoch_millis" } "context":{ "properties":{ "action":{ "type":"keyword" }, "conversationId":{ "type":"keyword" } } } } }
对话的样本文件:
Conversation 1. { "@timestamp": 1579632745000, "context": { "action": "A", "conversationId": "conv_id1", } }, { "@timestamp": 1579632745001, "context": { "action": "A.1", "conversationId": "conv_id1", } }, { "@timestamp": 1579632745002, "context": { "action": "A.1.1", "conversationId": "conv_id1", } } Conversation 2. { "@timestamp": 1579632745000, "context": { "action": "A", "conversationId": "conv_id2", } }, { "@timestamp": 1579632745001, "context": { "action": "A.1", "conversationId": "conv_id2", } }, { "@timestamp": 1579632745002, "context": { "action": "A.1.1", "conversationId": "conv_id2", } } Conversation 3. { "@timestamp": 1579632745000, "context": { "action": "B", "conversationId": "conv_id3", } }, { "@timestamp": 1579632745001, "context": { "action": "B.1", "conversationId": "conv_id3", } }
预期结果:
{ "A -> A.1 -> A.1.1": 2, "B -> B.1": 1 } Something similar, having this or any other format.
由于我是Elasticsearch的新手,所以每个提示都值得欢迎。
我用scripted_metric弹性的解决了。而且,的index状态已从初始状态更改。
scripted_metric
index
剧本:
{ "size": 0, "aggs": { "intentPathsCountAgg": { "scripted_metric": { "init_script": "state.messagesList = new ArrayList();", "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);", "combine_script": "return state", "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) " } } } }
格式化脚本(为了提高可读性-使用.ts):
scripted_metric: { init_script: 'state.messagesList = new ArrayList();', map_script: ` long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = [ 'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value ]; state.messagesList.add(currentMessage);`, combine_script: 'return state', reduce_script: ` List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); boolean[] ifElseWorkaround = new boolean[1]; for (state in states) { messages.addAll(state.messagesList); } messages.stream().forEach(message -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = true; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else { intentsMap.put(conversation.getValue().intentsPath, 1L); } }); return intentsMap.entrySet().stream().map(intentPath -> [ 'path': intentPath.getKey().toString(), 'count': intentPath.getValue() ]).collect(Collectors.toSet())`
答案:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 11, "relation": "eq" }, "max_score": null, "hits": [] }, "aggregations": { "intentPathsCountAgg": { "value": [ { "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2 }, { "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3 -> smallTalk.greet4": 1 }, { "smallTalk.greet -> smallTalk.greet2": 1 } ] } } }