我有以下我要转储到logstash的json输入(并最终在elasticsearch / kibana中搜索/仪表板)。
{"vulnerabilities":[ {"ip":"10.1.1.1","dns":"z.acme.com","vid":"12345"}, {"ip":"10.1.1.2","dns":"y.acme.com","vid":"12345"}, {"ip":"10.1.1.3","dns":"x.acme.com","vid":"12345"} ]}
我正在使用以下logstash配置
input { file { path => "/tmp/logdump/*" type => "assets" codec => "json" } } output { stdout { codec => rubydebug } elasticsearch { host => localhost } }
输出
{ "message" => "{\"vulnerabilities\":[\r", "@version" => "1", "@timestamp" => "2014-10-30T23:41:19.788Z", "type" => "assets", "host" => "av12612sn00-pn9", "path" => "/tmp/logdump/stack3.json" } { "message" => "{\"ip\":\"10.1.1.30\",\"dns\":\"z.acme.com\",\"vid\":\"12345\"},\r", "@version" => "1", "@timestamp" => "2014-10-30T23:41:19.838Z", "type" => "assets", "host" => "av12612sn00-pn9", "path" => "/tmp/logdump/stack3.json" } { "message" => "{\"ip\":\"10.1.1.31\",\"dns\":\"y.acme.com\",\"vid\":\"12345\"},\r", "@version" => "1", "@timestamp" => "2014-10-30T23:41:19.870Z", "type" => "shellshock", "host" => "av1261wag2sn00-pn9", "path" => "/tmp/logdump/stack3.json" } { "ip" => "10.1.1.32", "dns" => "x.acme.com", "vid" => "12345", "@version" => "1", "@timestamp" => "2014-10-30T23:41:19.884Z", "type" => "assets", "host" => "av12612sn00-pn9", "path" => "/tmp/logdump/stack3.json" }
显然logstash将每一行都视为一个事件,它认为这{"vulnerabilities":[是一个事件,我猜测两个后续节点上的尾随逗号使解析混乱,并且最后一个节点显得正确。我如何告诉Logstash解析漏洞数组内的事件,并忽略行尾的逗号?
{"vulnerabilities":[
更新日期:2014-11-05 遵循Magnus的建议,我添加了json过滤器,它运行良好。但是,如果未start_position => "beginning"在文件输入块中指定,则无法正确解析json的最后一行。任何想法为什么不呢?我知道它默认情况下会自下而上解析,但是可以预期mutate / gsub会顺利处理吗?
start_position => "beginning"
file { path => "/tmp/logdump/*" type => "assets" start_position => "beginning" } } filter { if [message] =~ /^\[?{"ip":/ { mutate { gsub => [ "message", "^\[{", "{", "message", "},?\]?$", "}" ] } json { source => "message" remove_field => ["message"] } } } output { stdout { codec => rubydebug } elasticsearch { host => localhost } }
您可以跳过json编解码器,并使用多行过滤器将邮件加入单个字符串中,然后将其输入到json过滤器中。
filter { multiline { pattern => '^{"vulnerabilities":\[' negate => true what => "previous" } json { source => "message" } }
但是,这会产生以下不良结果:
{ "message" => "<omitted for brevity>", "@version" => "1", "@timestamp" => "2014-10-31T06:48:15.589Z", "host" => "name-of-your-host", "tags" => [ [0] "multiline" ], "vulnerabilities" => [ [0] { "ip" => "10.1.1.1", "dns" => "z.acme.com", "vid" => "12345" }, [1] { "ip" => "10.1.1.2", "dns" => "y.acme.com", "vid" => "12345" }, [2] { "ip" => "10.1.1.3", "dns" => "x.acme.com", "vid" => "12345" } ] }
除非漏洞数组中有固定数量的元素,否则我认为我们无法做很多事情(无需求助于ruby过滤器)。
仅将json过滤器应用于看起来像我们想要的行,然后丢弃其余的行呢?您的问题不清楚所有日志是否都像这样,因此可能没有太大用处。
filter { if [message] =~ /^\s+{"ip":/ { # Remove trailing commas mutate { gsub => ["message", ",$", ""] } json { source => "message" remove_field => ["message"] } } else { drop {} } }