一尘不染

输入json到logstash-配置问题?

elasticsearch

我有以下我要转储到logstash的json输入(并最终在elasticsearch / kibana中搜索/仪表板)。

{"vulnerabilities":[
    {"ip":"10.1.1.1","dns":"z.acme.com","vid":"12345"},
    {"ip":"10.1.1.2","dns":"y.acme.com","vid":"12345"},
    {"ip":"10.1.1.3","dns":"x.acme.com","vid":"12345"}
]}

我正在使用以下logstash配置

input {
  file {
    path => "/tmp/logdump/*"
    type => "assets"
    codec => "json"
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { host => localhost }
}

输出

{
       "message" => "{\"vulnerabilities\":[\r",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.788Z",
          "type" => "assets",
          "host" => "av12612sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}
{
       "message" => "{\"ip\":\"10.1.1.30\",\"dns\":\"z.acme.com\",\"vid\":\"12345\"},\r",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.838Z",
          "type" => "assets",
          "host" => "av12612sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}
{
       "message" => "{\"ip\":\"10.1.1.31\",\"dns\":\"y.acme.com\",\"vid\":\"12345\"},\r",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.870Z",
          "type" => "shellshock",
          "host" => "av1261wag2sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}
{
            "ip" => "10.1.1.32",
           "dns" => "x.acme.com",
           "vid" => "12345",
      "@version" => "1",
    "@timestamp" => "2014-10-30T23:41:19.884Z",
          "type" => "assets",
          "host" => "av12612sn00-pn9",
          "path" => "/tmp/logdump/stack3.json"
}

显然logstash将每一行都视为一个事件,它认为这{"vulnerabilities":[是一个事件,我猜测两个后续节点上的尾随逗号使解析混乱,并且最后一个节点显得正确。我如何告诉Logstash解析漏洞数组内的事件,并忽略行尾的逗号?

更新日期:2014-11-05 遵循Magnus的建议,我添加了json过滤器,它运行良好。但是,如果未start_position => "beginning"在文件输入块中指定,则无法正确解析json的最后一行。任何想法为什么不呢?我知道它默认情况下会自下而上解析,但是可以预期mutate
/ gsub会顺利处理吗?

file {
    path => "/tmp/logdump/*"
    type => "assets"
    start_position => "beginning"
  }
}
filter {
  if [message] =~ /^\[?{"ip":/ {
    mutate {
      gsub => [
        "message", "^\[{", "{",
        "message", "},?\]?$", "}"
      ]
    }
    json {
      source => "message"
      remove_field => ["message"]
    }
  }
}
output {
  stdout { codec => rubydebug }
  elasticsearch { host => localhost }
}

阅读 245

收藏
2020-06-22

共1个答案

一尘不染

您可以跳过json编解码器,并使用多行过滤器将邮件加入单个字符串中,然后将其输入到json过滤器中。

filter {
  multiline {
    pattern => '^{"vulnerabilities":\['
    negate => true
    what => "previous"
  }
  json {
    source => "message"
  }
}

但是,这会产生以下不良结果:

{
            "message" => "<omitted for brevity>",
           "@version" => "1",
         "@timestamp" => "2014-10-31T06:48:15.589Z",
               "host" => "name-of-your-host",
               "tags" => [
        [0] "multiline"
    ],
    "vulnerabilities" => [
        [0] {
             "ip" => "10.1.1.1",
            "dns" => "z.acme.com",
            "vid" => "12345"
        },
        [1] {
             "ip" => "10.1.1.2",
            "dns" => "y.acme.com",
            "vid" => "12345"
        },
        [2] {
             "ip" => "10.1.1.3",
            "dns" => "x.acme.com",
            "vid" => "12345"
        }
    ]
}

除非漏洞数组中有固定数量的元素,否则我认为我们无法做很多事情(无需求助于ruby过滤器)。

仅将json过滤器应用于看起来像我们想要的行,然后丢弃其余的行呢?您的问题不清楚所有日志是否都像这样,因此可能没有太大用处。

filter {
  if [message] =~ /^\s+{"ip":/ {
    # Remove trailing commas
    mutate {
      gsub => ["message", ",$", ""]
    }
    json {
      source => "message"
      remove_field => ["message"]
    }
  } else {
    drop {}
  }
}
2020-06-22