我有一条流经多个系统的消息,每个系统都会记录消息的进入和退出以及时间戳和uuid messageId。我通过以下方式提取所有日志:
filebeat --> logstash --> elastic search --> kibana
结果,我现在有以下事件:
@timestamp messageId event May 19th 2016, 02:55:29.003 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter May 19th 2016, 02:55:29.200 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit May 19th 2016, 02:55:29.205 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter May 19th 2016, 02:55:29.453 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit
我想生成一个报告(最好是堆积的条或列),用于每个系统的时间:
messageId in1:1->2:in2 00e02f2f-32d5-9509-870a-f80e54dc8775 197:5:248
做这个的最好方式是什么?Logstash过滤器?kibana计算字段?
您只能使用Logstash aggregate过滤器来实现此目的,但是,您必须实质性地重新实现该elapsed过滤器已经完成的功能,这样会很丢人吧?
aggregate
elapsed
然后,让我们混合使用Logstash aggregate过滤器和elapsedfilter。后者用于测量每个阶段的时间,而前者用于将所有计时信息汇总到最后一个事件中。
旁注:您可能想重新考虑您的时间戳格式,使其更符合解析标准。我将它们转换为ISO 8601以使其更易于解析,但是可以随意滚动自己的正则表达式。
因此,我从以下日志开始:
2016-05-19T02:55:29.003 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 2016-05-19T02:55:29.200 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 2016-05-19T02:55:29.205 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 2016-05-19T02:55:29.453 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit
首先,我使用三个elapsed过滤器(每个阶段一个in1,1->2然后使用in2),然后使用三个聚合过滤器以收集所有时序信息。看起来像这样:
in1
1->2
in2
filter { grok { match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] add_tag => [ "%{event}" ] } date { match => [ "timestamp", "ISO8601"] } # Measures the execution time of system1 elapsed { unique_id_field => "messageId" start_tag => "system1Enter" end_tag => "system1Exit" new_event_on_match => true add_tag => ["in1"] } # Measures the execution time of system2 elapsed { unique_id_field => "messageId" start_tag => "system2Enter" end_tag => "system2Exit" new_event_on_match => true add_tag => ["in2"] } # Measures the time between system1 and system2 elapsed { unique_id_field => "messageId" start_tag => "system1Exit" end_tag => "system2Enter" new_event_on_match => true add_tag => ["1->2"] } # Records the execution time of system1 if "in1" in [tags] and "elapsed" in [tags] { aggregate { task_id => "%{messageId}" code => "map['report'] = [(event['elapsed_time']*1000).to_i]" map_action => "create" } } # Records the time between system1 and system2 if "1->2" in [tags] and "elapsed" in [tags] { aggregate { task_id => "%{messageId}" code => "map['report'] << (event['elapsed_time']*1000).to_i" map_action => "update" } } # Records the execution time of system2 if "in2" in [tags] and "elapsed" in [tags] { aggregate { task_id => "%{messageId}" code => "map['report'] << (event['elapsed_time']*1000).to_i; event['report'] = map['report'].join(':')" map_action => "update" end_of_task => true } } }
在前两个事件之后,您将获得一个类似这样的新事件,该事件表明在system1中已经花费了197ms:
{ "@timestamp" => "2016-05-21T04:20:51.731Z", "tags" => [ "elapsed", "elapsed_match", "in1" ], "elapsed_time" => 0.197, "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", "elapsed_timestamp_start" => "2016-05-19T00:55:29.003Z" }
在第三个事件之后,您将获得一个类似这样的事件,该事件显示在system1和system2之间花费了多少时间,即5毫秒:
{ "@timestamp" => "2016-05-21T04:20:51.734Z", "tags" => [ "elapsed", "elapsed_match", "1->2" ], "elapsed_time" => 0.005, "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", "elapsed_timestamp_start" => "2016-05-19T00:55:29.200Z" }
在第四个事件之后,您将获得一个类似这样的新事件,该事件显示了system2中花费了多少时间,即248ms。该事件还包含一个report字段,其中包含消息的所有计时信息
report
{ "@timestamp" => "2016-05-21T04:20:51.736Z", "tags" => [ "elapsed", "elapsed_match", "in2" ], "elapsed_time" => 0.248, "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", "elapsed_timestamp_start" => "2016-05-19T00:55:29.205Z" "report" => "197:5:248" }