您的位置:首页 > Web前端 > JavaScript

logstash介绍 - 3.处理syslog json 格式的消息体

2018-02-09 14:30 302 查看

背景

     logstash对不同主机发过来的json格式的syslog进行区分设置,并且需要对该json消息的内容可能并不是我们需要的的,需要对内容进行转化。

syslog的消息体

    接收的syslog的消息体如下:
2018-01-24T05:27:49.303Z 192.168.6.66 <177>Jan 14 13:27:49 localhost4.localdomain4 syslog_say[12021]: {"datetime": "2018-01-24T13:27:48+08:00","dts_ip":"192.1686.6.65","dst_port":"8000","action":"pass","rule_id":"123456","host":""...}

logstash的配置

    使用的logstash的日志的配置文件如下:
# cat /etc/logstash/conf.d/001_logstash.conf
input {
syslog {
facility_labels => "local6"
port => 514
}
}
filter {
if [host] == "host_ip" {
json {
source => "message"
remove_field => ["message"]
# 对消息体中的内容增加自定义的属性
add_field => {"product_name" => "network-plugin"}
# 由于json消息体中包含host字段,这里重新设置host为另一个key
add_field => {"product_host" => "host_ip"}
}
}
mutate {
# 修改datetime的key为happentime
rename => { "datetime" => "happentime" }
}
translate {
override => true
# 可以对key action 的值进行转化
field => "action"
destination => "action"
dictionary => {
"pass" => "1"
}
}
# 如果字典太多,可以使用csv,yaml,json保存映射关系
# 需要用到 logstash-filter-translate,有以下两种加载方式,第二种需要先从已经装好的环境导出来
# 1,/usr/share/logstash/bin/logstash-plugin install logstash-filter-translate
# 2,/usr/share/logstash/bin/logstash-plugin install file:///elk/logstash-filter-translate-3.0.4.zip
translate {
override => true
field => "rule_id"
destination => "rule"
dictionary_path => "/etc/lyh/mapping.yaml"
# 这个第一个可以,如果是int,要强制转化为 string,否则会报错
# cat /etc/lyh/mapping.yaml
# '1101000': "test1"
# '1101001': "test2"
}
}

output {
# 直接输出到屏幕
stdout { }
# 对接到 elasticsearch
# 由于json消息体中包含host字段,这里重新设置host为另一个key,并使用新的key
# 对接到elasticsearch后,可以使用预先设置模板的方式
if [product_host] == "host_ip" {
elasticsearch {
hosts => "localhost:9200"
index => "lyh-index-host_ip-%{+YYYY.MM.dd}"
document_type => "logs"
}
}
}

创建elasticsearch模板

elasticsearch模板,该模板的含义即满足索引的格式为lyh-index-*消息使用该模板:

# curl -H 'Content-type: application/json'  localhost:9200/_template/ssp-attacklog-template -d @/etc/elasticsearch/mapping.json
# cat /etc/elasticsearch/mapping.json
{
"template":"lyh-index-*",
"settings":{
"number_of_shards":3, #主分片数
"number_of_replicas":1 #主分片备份个数
},
"mappings":{
"logs":{
"properties":{
"happentime":{
"type":"date"
},
"rule":{
"type":"keyword"
},
"dst_ip":{
"type":"ip"
},
"dst_port":{
"type":"integer"
},
"action":{
"type":"integer"
}
}
}
}
}

参数资料

https://www.elastic.co/guide/en/logstash/current/plugins-filters-translate.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: