[20170502 01:57:26.209 EDT (thread-name) package-name.classname#MethodName INFO] Some info line (5 stats): [fieldA: strvalue1| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0] [fieldA: strvalue2| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0] [fieldA: strvalue3| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0] [fieldA: strvalue4| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0] [fieldA: strvalue5| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
编辑 :
# [20170513 06:08:29.734 EDT (StatsCollector-1) deshaw.tools.jms.ActiveMQLoggingPlugin$ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ Destination Stats (97 destinations): # [destName: topic://darts.metaDataChangeTopic | enqueueCount: 1 | dequeueCount: 1 | dispatchCount: 1 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 536870912 | avgEnqueueTimeMs: 0.0 | maxEnqueueTimeMs: 0 | minEnqueueTimeMs: 0 | currentConsumers: 1 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 2392 | maxMsgSize: 2392 | avgMsgSize: 2392.0 | totalMsgSize: 2392] input { file { path => "/u/bansalp/activemq_primary_plugin.stats.log.1" ### For testing and continual process of the same file, remove these before produciton start_position => "beginning" sincedb_path => "/dev/null" ### Lets read the logfile and recombine multi line details codec => multiline { # Grok pattern names are valid! :) pattern => "^\[destName:" negate => false what => "previous" } } } filter { if ([message] =~ /^\s*$/ ){ drop{} } if ([message] =~ /^[^\[]/) { drop{} } if ([message] =~ /logMemoryInfo|logProcessInfo|logSystemInfo|logThreadBreakdown|logBrokerStats/) { drop{} } if [message] =~ "logPerDestinationStats" { grok { match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*" } } split { field => "message" } grok { match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" } } mutate { convert => { "message" => "string" } add_field => { "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}" "load_timestamp" => "%{@timestamp}" } remove_field => ["yr","mnt", "daynum", "time", "timezone"] } } } output { stdout {codec => rubydebug} }
input { file { path => "/var/log/someapp.log" codec => multiline { # Grok pattern names are valid! :) pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}" negate => true what => previous } } }
这基本上表明任何不以YYYYMMDD HH:mi:ss.000开头的行都将与上一行合并
一旦感到满意,您就可以从第一行获得所有所需的数据,然后可以在\ r或\ n上拆分,并使用单个grok模式(基于上面给出的示例)获取单个统计数据。
更新2017-05-08 11:54:
完整的logstash conf可能看起来像这样,您将需要考虑更改grok模式以更好地满足您的要求(仅您知道数据)。
input { file { path => "/var/log/someapp.log" ### For testing and continual process of the same file, remove these before produciton start_position => "beginning" sincedb_path => "/dev/null" ### Lets read the logfile and recombine multi line details codec => multiline { # Grok pattern names are valid! :) pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}" negate => true what => previous } } } filter { ### Let's get some high level data before we split the line (note: anything you grab before the split gets copied) grok { match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]" } } ### Split the lines back out to being a single line now. (this may be a \r or \n, test which one) split { "field" => "message" "terminator" => "\r" } ### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc. ### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content grok { break_on_match => false match => { "message" => "^\[%{DATA}:\s*%{DATA:fieldA}\|%{DATA}:\s*%{DATA:field2}\|%{DATA}:\s*%{DATA:field3}\|%{DATA}:\s*%{DATA:field4}\|%{DATA}:\s*%{DATA:field5}\|%{DATA}:\s*%{DATA:field6}\|%{DATA}:\s*%{DATA:field7}\]$" } } mutate { convert => { "message" => "string" } add_field => { "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}" "load_timestamp" => "%{@timestamp}" } remove_field => ["yr","mnt", "daynum", "time", "timezone"] } } output { stdout { codec => rubydebug } }
输入过滤器,将路径更改为以.log *结尾,也按照您的原始示例,该模式不必与所需的日期格式匹配(以便将所有关联放在一行上)
input { file { path => "/u/bansalp/activemq_primary_plugin.stats.log*" ### For testing and continual process of the same file, remove these before production start_position => "beginning" sincedb_path => "/dev/null" ### Lets read the logfile and recombine multi line details codec => multiline { # Grok pattern names are valid! :) pattern => "^\[destName:" negate => false what => "previous" } } } filter { if "logPerDestinationStats" in [message] { grok { match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*" } } split { field => "message" terminator => "\r” } grok { match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" } } mutate { convert => { "message" => "string" } add_field => { "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}" "load_timestamp" => "%{@timestamp}" } remove_field => ["yr","mnt", "daynum", "time", "timezone"] } } else { drop{} } }