如何解决使用 logstash 过滤和解析非结构化日志
我对 logstash 还很陌生,但遇到了一个问题。
我想从与其他日志不同的日志中过滤一行,然后使用 grok 进行一些操作。
这是我拥有的日志文件:
Date: 3/1/2021 -- 05:08:14 (uptime: 2d,22h 36m 18s)
------------------------------------------------------------------------------------
Counter | TM Name | Value
------------------------------------------------------------------------------------
capture.kernel_packets | Total | 433066
capture.kernel_drops | Total | 18183
decoder.pkts | Total | 414883
decoder.bytes | Total | 453509832
decoder.ipv4 | Total | 413834
decoder.ipv6 | Total | 778
decoder.ethernet | Total | 414883
decoder.tcp | Total | 409208
decoder.udp | Total | 5266
decoder.icmpv6 | Total | 56
decoder.avg_pkt_size | Total | 1093
decoder.max_pkt_size | Total | 1514
flow.tcp | Total | 1273
flow.udp | Total | 1336
flow.icmpv6 | Total | 26
flow.wrk.spare_sync_avg | Total | 100
flow.wrk.spare_sync | Total | 18
decoder.event.ipv4.opt_pad_required | Total | 82
decoder.event.ipv6.zero_len_padn | Total | 24
flow.wrk.flows_evicted_needs_work | Total | 535
flow.wrk.flows_evicted_pkt_inject | Total | 579
flow.wrk.flows_evicted | Total | 406
flow.wrk.flows_injected | Total | 535
tcp.sessions | Total | 667
tcp.syn | Total | 669
tcp.synack | Total | 668
tcp.rst | Total | 407
tcp.stream_depth_reached | Total | 14
tcp.reassembly_gap | Total | 8
tcp.overlap | Total | 27
detect.alert | Total | 1106
app_layer.flow.http | Total | 41
app_layer.tx.http | Total | 126
app_layer.flow.tls | Total | 611
app_layer.flow.ntp | Total | 15
app_layer.tx.ntp | Total | 15
app_layer.flow.dhcp | Total | 4
app_layer.tx.dhcp | Total | 6
app_layer.flow.dns_udp | Total | 964
app_layer.tx.dns_udp | Total | 1934
app_layer.flow.failed_udp | Total | 353
flow.mgr.full_hash_pass | Total | 35
flow.spare | Total | 9856
flow.mgr.rows_maxlen | Total | 2
flow.mgr.flows_checked | Total | 3998
flow.mgr.flows_notimeout | Total | 1808
从日期开始重复。我只需要日期字符串,仅此而已,然后我想对数据进行一些操作,将它们作为 json 发送。有办法吗?
解决方法
首先,您必须管理多行。所以我想你有输入日志文件(你可以根据需要进行更改,方法是相同的)。
input {
file {
path => ["inputlogs/*"]
codec => multiline {
pattern => "^Date: %{DATE:date} -- %{TIME:time}"
negate => true
what => previous
}
}
}
在此之后,您必须过滤每一行(我想 csv 过滤器在这种情况下必须更有用,但我们也可以按照您的要求使用 grok 处理此问题):
grok {
match => ["message","^%{NOTSPACE:fieldname}\s*\| %{WORD:field}\s*\| %{INT:value}"]
}
所有与此模式不匹配的行(标题和 -- 行)都被标记为 _grokparsefailure
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。