logstash介绍 - 3.处理syslog json 格式的消息体
2018-02-09 14:30
302 查看
背景
logstash对不同主机发过来的json格式的syslog进行区分设置,并且需要对该json消息的内容可能并不是我们需要的的,需要对内容进行转化。syslog的消息体
接收的syslog的消息体如下:2018-01-24T05:27:49.303Z 192.168.6.66 <177>Jan 14 13:27:49 localhost4.localdomain4 syslog_say[12021]: {"datetime": "2018-01-24T13:27:48+08:00","dts_ip":"192.1686.6.65","dst_port":"8000","action":"pass","rule_id":"123456","host":""...}
logstash的配置
使用的logstash的日志的配置文件如下:# cat /etc/logstash/conf.d/001_logstash.conf input { syslog { facility_labels => "local6" port => 514 } } filter { if [host] == "host_ip" { json { source => "message" remove_field => ["message"] # 对消息体中的内容增加自定义的属性 add_field => {"product_name" => "network-plugin"} # 由于json消息体中包含host字段,这里重新设置host为另一个key add_field => {"product_host" => "host_ip"} } } mutate { # 修改datetime的key为happentime rename => { "datetime" => "happentime" } } translate { override => true # 可以对key action 的值进行转化 field => "action" destination => "action" dictionary => { "pass" => "1" } } # 如果字典太多,可以使用csv,yaml,json保存映射关系 # 需要用到 logstash-filter-translate,有以下两种加载方式,第二种需要先从已经装好的环境导出来 # 1,/usr/share/logstash/bin/logstash-plugin install logstash-filter-translate # 2,/usr/share/logstash/bin/logstash-plugin install file:///elk/logstash-filter-translate-3.0.4.zip translate { override => true field => "rule_id" destination => "rule" dictionary_path => "/etc/lyh/mapping.yaml" # 这个第一个可以,如果是int,要强制转化为 string,否则会报错 # cat /etc/lyh/mapping.yaml # '1101000': "test1" # '1101001': "test2" } } output { # 直接输出到屏幕 stdout { } # 对接到 elasticsearch # 由于json消息体中包含host字段,这里重新设置host为另一个key,并使用新的key # 对接到elasticsearch后,可以使用预先设置模板的方式 if [product_host] == "host_ip" { elasticsearch { hosts => "localhost:9200" index => "lyh-index-host_ip-%{+YYYY.MM.dd}" document_type => "logs" } } }
创建elasticsearch模板
elasticsearch模板,该模板的含义即满足索引的格式为lyh-index-*消息使用该模板:# curl -H 'Content-type: application/json' localhost:9200/_template/ssp-attacklog-template -d @/etc/elasticsearch/mapping.json # cat /etc/elasticsearch/mapping.json { "template":"lyh-index-*", "settings":{ "number_of_shards":3, #主分片数 "number_of_replicas":1 #主分片备份个数 }, "mappings":{ "logs":{ "properties":{ "happentime":{ "type":"date" }, "rule":{ "type":"keyword" }, "dst_ip":{ "type":"ip" }, "dst_port":{ "type":"integer" }, "action":{ "type":"integer" } } } } }
参数资料
https://www.elastic.co/guide/en/logstash/current/plugins-filters-translate.html相关文章推荐
- APNS IOS 消息推送JSON格式介绍
- Logstash处理json格式日志文件的三种方法
- Logstash处理json格式日志文件的三种方法
- Logstash处理json格式日志文件的三种方法
- Logstash处理json格式日志文件的三种方法
- 将apache日志输出为json格式并发送给logstash处理
- Android学习笔记_14_对JSON格式数据的处理
- javascript通过json2.js处理json格式数据
- python处理JSON格式数据并利用pygal绘制世界地图
- 在Spring MVC中,如何处理JSON对象,并对Spring MVC框架中的数据转换与数据格式化进行介绍(SSMchapter12)
- php用xml和json处理数据格式。
- C++MFC编程笔记day01 MFC介绍、创建MFC程序和重写消息处理
- 阿里fastjson怎么处理时间格式
- Android学习笔记:Android消息处理机制之Handler介绍
- CXF拦截器Interceptor实现自定义消息,并以json格式返回
- 05.Java后台如何接收并处理前台传过来的json格式的数组参数
- DuiLib介绍及其消息处理剖析
- android Handler及消息处理机制的简单介绍
- 一个java处理JSON格式数据的通用类
- C++MFC编程笔记day01 MFC介绍、创建MFC程序和重写消息处理