我的问题:我的XML文件包含要使用Logstash解析的事件,然后再使用Kibana对其进行请求。我想在每个事件中保留来自ROOT标记的所有信息。
输入看起来像:
<?xml version="1.0" encoding="UTF-8"?> <ROOT number="34"> <EVENTLIST> <EVENT name="hey"/> <EVENT name="you"/> </EVENTLIST> </ROOT>
我想要的是两个这样的文件:
{ "number":"34" "name": "Hey" } { "number":"34" "name": "you" }
Logstash conf:
input { stdin { } } filter { xml { store_xml => "false" source => "message" target => "EVENT" xpath => [ "/ROOT/@number","number", "/ROOT/EVENTLIST/EVENT/@name","name" ] } } output { elasticsearch { host => localhost } stdout { codec => rubydebug } }
没有工作。我得到的是:
{ "number" : ["34"] "name":["hey,"you""] }
我遵循了这篇文章的解决方案:https : //serverfault.com/questions/615196/logstash-parsing-xml-document- contains- multiple-log-entries
但是我的问题仍然存在,我丢失了根标签的信息。
解决方案之一可能是使用一些红宝石过滤器来处理该问题,但我不知道红宝石。另一种方法是使用一些Java程序将XML转换为JSON,然后再将其发送到elasticsearch …
有什么想法可以解决这个问题,还是我必须学习红宝石?
如果您的结构很简单,则可以使用memorize我编写的插件。
memorize
您的配置如下所示:
filter { if ([message] =~ /<ROOT/) { grok { match => [ "message", 'number="(?<number>\d+)" number2="(?<number1>\d+)"' ] } } else if ([message] =~ /<EVENT /) { grok { match => [ "message", 'name="(?<name>[^"]+)"'] } } memorize { fields => ["number","number1"] } if ([message] !~ /<EVENT /) { drop {} } else { mutate { remove_field => ["message"] } } }
我的示例显示了ROOT根据下面的注释在元素中查找多个内容。这是支持记忆多个字段的插件版本:
ROOT
# encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" require "set" # # This filter will look for fields from an event and record the last value # of them. If any are not present, their last value will be added to the # event # # The config looks like this: # # filter { # memorize { # fields => ["time"] # default => { "time" => "00:00:00.000" } # } # } # # The `fields` is an array of the field NAMES that you want to memorize # The `default` is a map of field names to field values that you want # to use if the field isn't present and has no memorized value (optional) class LogStash::Filters::Memorize < LogStash::Filters::Base config_name "memorize" milestone 2 # An array of the field names to to memorize config :fields, :validate => :array, :required => true # a map for default values to use if its not seen before we need it config :default, :validate => :hash, :required => false # The stream identity is how the filter determines which stream an # event belongs to. See the multiline plugin if you want more details on how # this might work config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}" public def initialize(config = {}) super @threadsafe = false # This filter needs to keep state. @memorized = Hash.new end # def initialize public def register # nothing needed end # def register public def filter(event) return unless filter?(event) any = false @fields.each do |field| if event[field].nil? map = @memorized[@stream_identity] val = map.nil? ? nil : map[field] if val.nil? val = @default.nil? ? nil : @default[field] end if !val.nil? event[field] = val any = true end else map = @memorized[@stream_identity] if map.nil? map = @memorized[@stream_identity] = Hash.new end val = event[field] map[field] = event[field] end #if if any filter_matched(event) end end #field.each end end
对于logstash 1.5和更高版本,可以通过以下方式安装此插件
bin/plugin install logstash-filter-memorize