您的位置:首页 > 其它

LogStash的Filter的使用

2015-08-27 10:35 330 查看
最近在项目中使用LogStash做日志的采集和过滤,感觉LogStash还是很强大的。

input {
file{
path => "/XXX/syslog.txt"
start_position => beginning
codec => multiline{
patterns_dir => ["/XX/logstash-1.5.3/patterns"]
pattern => "^%{MESSAGE}"
negate => true
what => "previous"
}
}
}
filter{
mutate{
split => ["message","|"]
add_field =>   {
"tmp" => "%{[message][0]}"
}
add_field =>   {
"DeviceProduct" => "%{[message][2]}"
}
add_field =>   {
"DeviceVersion" => "%{[message][3]}"
}
add_field =>   {
"Signature ID" => "%{[message][4]}"
}
add_field =>   {
"Name" => "%{[message][5]}"
}
}

mutate{
split => ["tmp",":"]
add_field =>   {
"tmp1" => "%{[tmp][1]}"
}
add_field =>   {
"Version" => "%{[tmp][2]}"
}
remove_field => [ "tmp" ]
}

grok{
patterns_dir => ["/XXX/logstash-1.5.3/patterns"]
match => {"tmp1" => "%{TYPE:type}"}
remove_field => [ "tmp1"]
}

kv{
include_keys => ["eventId", "msg", "end", "mrt", "modelConfidence", "severity", "relevance","assetCriticality","priority","art","rt","cs1","cs2","cs3","locality","cs2Label","cs3Label","cs4Label","flexString1Label","ahost","agt","av","atz","aid","at","dvc","deviceZoneID","deviceZoneURI","dtz","eventAnnotationStageUpdateTime","eventAnnotationModificationTime","eventAnnotationAuditTrail","eventAnnotationVersion","eventAnnotationFlags","eventAnnotationEndTime","eventAnnotationManagerReceiptTime","_cefVer","ad.arcSightEventPath"]
}
mutate{
split => ["ad.arcSightEventPath",","]
add_field =>   {
"arcSightEventPath" => "%{[ad.arcSightEventPath][0]}"
}
remove_field => [ "ad.arcSightEventPath" ]
remove_field => [ "message" ]
}

}
output{
kafka{
topic_id => "rawlog"
batch_num_messages => 20
broker_list => "10.3.162.193:39192,10.3.162.194:39192,10.3.162.195:39192"
codec => "json"
}
stdout{
codec => rubydebug
}


input:接入数据源

filter:对数据源进行过滤

output: 输出的

其中最重要的是filter的处理,目前我们的需求是需要对字符串进行key-value的提取

1、使用了mutate中的split,能通过分割符对分本处理。

2、通过grok使用正则对字符串进行截取处理。

3、使用kv 提取所有的key-value
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: