首先我要说的是,我在这里已经通过了尽可能多的示例,但仍然无法奏效。我不确定是否是因为日志文件中JSON的复杂性。
我正在寻找示例日志条目,让Logstash读取它,并将JSON作为JSON发送到ElasticSearch。
(简化的)示例如下所示:
[0m[0m16:02:08,685 INFO [org.jboss.as.server] (ServerService Thread Pool -- 28) JBAS018559: { "appName": "SomeApp", "freeMemReqStartBytes": 544577648, "freeMemReqEndBytes": 513355408, "totalMem": 839385088, "maxMem": 1864368128, "anonymousUser": false, "sessionId": "zz90g0dFQkACVao4ZZL34uAb", "swAction": { "clock": 0, "clockStart": 1437766438950, "name": "General", "trackingMemory": false, "trackingMemoryGcFirst": true, "memLast": 0, "memOrig": 0 }, "remoteHost": "127.0.0.1", "remoteAddr": "127.0.0.1", "requestMethod": "GET", "mapLocalObjectCount": { "FinanceEmployee": { "x": 1, "singleton": false }, "QuoteProcessPolicyRef": { "x": 10, "singleton": false }, "LocationRef": { "x": 2, "singleton": false } }, "theSqlStats": { "lstStat": [ { "sql": "select * FROM DUAL", "truncated": false, "truncatedSize": -1, "recordCount": 1, "foundInCache": false, "putInCache": false, "isUpdate": false, "sqlFrom": "DUAL", "usingPreparedStatement": true, "isLoad": false, "sw": { "clock": 104, "clockStart": 1437766438970, "name": "General", "trackingMemory": false, "trackingMemoryGcFirst": true, "memLast": 0, "memOrig": 0 }, "count": 0 }, { "sql": "select * FROM DUAL2", "truncated": false, "truncatedSize": -1, "recordCount": 0, "foundInCache": false, "putInCache": false, "isUpdate": false, "sqlFrom": "DUAL2", "usingPreparedStatement": true, "isLoad": false, "sw": { "clock": 93, "clockStart": 1437766439111, "name": "General", "trackingMemory": false, "trackingMemoryGcFirst": true, "memLast": 0, "memOrig": 0 }, "count": 0 } ] } }
我尝试过的Logstash配置无效。到目前为止最接近的是:
input { file { codec => multiline { pattern => '\{(.*)\}' negate => true what => previous } path => [ '/var/log/logstash.log' ] start_position => "beginning" sincedb_path => "/dev/null" } } filter { json { source => message } } output { stdout { codec => rubydebug } elasticsearch { cluster => "logstash" index => "logstashjson" } }
我也尝试过:
input { file { type => "json" path => "/var/log/logstash.log" codec => json #also tried json_lines } } filter { json { source => "message" } } output { stdout { codec => rubydebug } elasticsearch { cluster => "logstash" codec => "json" #also tried json_lines index => "logstashjson" } }
我只想获取上面发布的JSON并将其“按原样”发送给ElasticSearch,就像我对该文件进行了cURL PUT一样。感谢您的帮助,谢谢!
更新
在Leonid的帮助下,这是我现在的配置:
input { file { codec => multiline { pattern => "^\[" negate => true what => previous } path => [ '/var/log/logstash.log' ] start_position => "beginning" sincedb_path => "/dev/null" } } filter { grok { match => { "message" => "^(?<rubbish>.*?)(?<logged_json>{.*)" } } json { source => "logged_json" target => "parsed_json" } } output { stdout { codec => rubydebug } elasticsearch { cluster => "logstash" index => "logstashjson" } }
抱歉,我无法发表评论,因此将发布答案。 您document_type在elaticsearch配置中缺少a ,否则将如何推导?
document_type
elaticsearch
好吧,在查看了logstash参考并与@Ascalonian紧密合作之后,我们想到了以下配置:
input { file { # in the input you need to properly configure the multiline codec. # You need to match the line that has the timestamp at the start, # and then say 'everything that is NOT this line should go to the previous line'. # the pattern may be improved to handle case when json array starts at the first # char of the line, but it is sufficient currently codec => multiline { pattern => "^\[" negate => true what => previous max_lines => 2000 } path => [ '/var/log/logstash.log'] start_position => "beginning" sincedb_path => "/dev/null" } } filter { # extract the json part of the message string into a separate field grok { match => { "message" => "^.*?(?<logged_json>{.*)" } } # replace newlines in the json string since the json filter below # can not deal with those. Also it is time to delete unwanted fields mutate { gsub => [ 'logged_json', '\n', '' ] remove_field => [ "message", "@timestamp", "host", "path", "@version", "tags"] } # parse the json and remove the string field upon success json { source => "logged_json" remove_field => [ "logged_json" ] } } output { stdout { codec => rubydebug } elasticsearch { cluster => "logstash" index => "logstashjson" } }