我想从dnsmasq收集和处理日志,因此决定使用ELK。Dnsmasq用作DHCP服务器和DNS解析器,因此它为这两种服务创建日志条目。
我的目标是将所有包含请求者IP,请求者主机名(如果有)和请求者mac地址的DNS查询发送到Elasticsearch。这样一来,无论设备IP是否更改,我都可以按mac地址对请求进行分组,并显示主机名。
我想做的是以下几点:
1)阅读以下条目:
Mar 30 21:55:34 dnsmasq-dhcp[346]: 3806132383 DHCPACK(eth0) 192.168.0.80 04:0c:ce:d1:af:18 air
2)临时存储关系:
192.168.0.80 => 04:0c:ce:d1:af:18
192.168.0.80 => 空气
3)充实如下所示的条目,添加mac地址和主机名。如果主机名是空的,我将添加mac地址。
Mar 30 22:13:05 dnsmasq[346]: query[A] imap.gmail.com from 192.168.0.80
我找到了一个名为“记忆”的模块,该模块可以让我存储它们,但不幸的是不适用于最新版本的Logstash
我使用的版本:
ElastiSearch 2.3.0 Kibana 4.4.2 Logstash 2.2.2
还有logstash过滤器(这是我第一次尝试使用logstash,因此我敢肯定可以改进配置文件)
input { file { path => "/var/log/dnsmasq.log" start_position => "beginning" type => "dnsmasq" } } filter { if [type] == "dnsmasq" { grok { match => [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{NOTSPACE:action} %{IP:clientip} %{MAC:clientmac} ?(%{HOSTNAME:clientname})?"] match => [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: ?(%{NONNEGINT:num} )?%{USER:action}?(\[%{USER:subaction}\])? %{NOTSPACE:domain} %{NOTSPACE:function} %{IP:clientip}"] match => [ "message", "%{SYSLOGTIMESTAMP:reqtimestamp} %{USER:program}\[%{NONNEGINT:pid}\]\: %{NOTSPACE:action} %{DATA:data}"] } if [action] =~ "DHCPACK" { }else if [action] == "query" { }else { drop{} } } } output { elasticsearch { hosts => ["localhost:9200"] } stdout { codec => rubydebug } }
问题:
1)是否可以使用最新的logstash版本替代“ memorize”插件?另一个插件或其他过程。
2)我可以将logstash降级到2之前的版本吗(我认为以前是1.5.4)?如果是这样,elasticsearch 2.2.1是否存在任何已知的服务器问题或不兼容?
3)还是应该修改允许“ logstash 2.x”使用的“ memorize”插件(如果可以,那么我将不胜感激如何入门)?
memorize我认为不需要为此重新打包插件。您可以使用aggregate过滤器实现所需的功能。
memorize
aggregate
... # record host/mac in temporary map if [action] =~ "DHCPACK" { aggregate { task_id => "%{clientip}" code => "map['clientmac'] = event['clientmac']; map['clientname'] = event['clientname'];" map_action => "create_or_update" # timeout set to 48h timeout => 172800 } } # add host/mac where/when needed else if [action] == "query" { aggregate { task_id => "%{clientip}" code => "event['clientmac'] = map['clientmac']; event['clientname'] = map['clientname']" map_action => "update" } }