我有一个logstash输入设置为
input { kafka { bootstrap_servers => "zookeper_address" topics => ["topic1","topic2"] } }
我需要将主题提供给Elasticsearch中的两个不同的索引。任何人都可以帮助我如何为此类任务设置输出。目前,我只能设置
output { elasticsearch { hosts => ["localhost:9200"] index => "my_index" codec => "json" document_id => "%{id}" } }
我需要在同一elasticsearch例如两个指标说index1和index2,这将在未来对信息供给topic1和topic2
index1
index2
topic1
topic2
首先,您需要添加decorate_events到kafka输入中才能知道消息来自哪个主题
decorate_events
kafka
input { kafka { bootstrap_servers => "zookeper_address" topics => ["topic1","topic2"] decorate_events => true } }
然后,您有两个选择,都涉及条件逻辑。首先是通过引入一个过滤器来根据主题名称添加正确的索引名称。为此,您需要添加
filter { if [kafka][topic] == "topic1" { mutate { add_field => {"[@metadata][index]" => "index1"} } } else { mutate { add_field => {"[@metadata][index]" => "index2"} } } # remove the field containing the decorations, unless you want them to land into ES mutate { remove_field => ["kafka"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "%{[@metadata][index]}" codec => "json" document_id => "%{id}" } }
然后第二个选择是直接在输出部分执行if / else,就像这样(但是其他kafka字段将落入ES中):
output { if [@metadata][kafka][topic] == "topic1" { elasticsearch { hosts => ["localhost:9200"] index => "index1" codec => "json" document_id => "%{id}" } } else { elasticsearch { hosts => ["localhost:9200"] index => "index2" codec => "json" document_id => "%{id}" } } }