第三节 ElasticSearch数据导入之Logstash
2017-08-15 18:07
423 查看
一、简介
Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理、或一些具有一定格式的数据导入到ES的处理。
工作流程
Logstash 工作的三个阶段:
input 数据输入端,可以接收来自任何地方的源数据。
file:从文件中读取
syslog:监听在514端口的系统日志信息,并解析成RFC3164格式。
redis:从redis-server list 中获取
beat:接收来自Filebeat的事件
Filter 数据中转层,主要进行格式处理,数据类型转换、数据过滤、字段添加,修改等,常用的过滤器如下。
grok: 通过正则解析和结构化任何文本。Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。
mutate: 在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
drop: 完全丢弃事件,如debug事件。
clone: 复制事件,可能添加或者删除字段。
geoip: 添加有关IP地址地理位置信息。
output 是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用,常用的有:
elasticsearch: 发送事件数据到 Elasticsearch,便于查询,分析,绘图。
file: 将事件数据写入到磁盘文件上。
mongodb:将事件数据发送至高性能NoSQL mongodb,便于永久存储,查询,分析,大数据分片。
redis:将数据发送至redis-server,常用于中间层暂时缓存。
graphite: 发送事件数据到graphite。http://graphite.wikidot.com/
statsd: 发送事件数据到 statsd。
二、下载安装
1.下载地址:https://www.elastic.co/products/logstash
2.window下解压到目标文件夹即可
三、使用
1. 编写logstash启动的配置文件
2.编写对应的mapping template文件
第一种:普通有固定分隔符的文件
例如:2017-05-09 10:31:41,378 [INFO ] com.es.common.SystemLog.execute(SystemLog.java:44) - login|IP: 192.168.3.105|MAC: A1245C15-26C1-4263-8845-01CFCA6EC4FD|USERID: 89293|USERNM: sslvoe|
log.conf
input {
file {
type => "tradelog"
#需要读取的文件
path => "E:/home/elk/his/trade.log*"
discover_interval => 5
#从开始位置读取
start_position => "beginning"
#记录读取的位置
sincedb_path => "E:/home/elk/conf/sincedb_trade.txt"
sincedb_write_interval => 15
#文本类型
4000
codec => plain { charset => "GB2312" }
}
}
filter {
grok {
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|USERNM: %{WORD:usernm}\|" }
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|" }
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
remove_field => "message"
}
date {
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
}
}
output {
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
stdout {codec => rubydebug}
elasticsearch {
index => "tradelog"
document_type => "t_log_type"
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "log4j-tradelog"
template => "E:/home/elk/conf/tradelog_template.json"
}
}
}
tradelog_template.json:
{
"template": "log4j-tradelog*",
"settings": {
"index.number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"tradelog": {
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis",
"doc_values": true
},
"@version": {
"type": "string",
"index": "not_analyzed"
},
"exception": {
"type": "string",
"index": "analyzed"
},
"path": {
"type": "string",
"index": "not_analyzed"
},
"host": {
"type": "string",
"index": "not_analyzed"
},
"ip": {
"type": "ip",
"index": "not_analyzed"
},
"userid": {
"type": "integer",
"index": "not_analyzed"
},
"mac": {
"type": "string",
"index": "not_analyzed"
},
"usernm": {
"type": "string",
"index": "not_analyzed"
},
"operMethod": {
"type": "string",
"index": "not_analyzed"
},
"type": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
执行命令:
logstash -f log.conf
第二种 从数据库中读取数据并插入到es中
input {
jdbc {
jdbc_driver_library => "E:/tools/mvn_repository/mysql/mysql-connector-java/5.1.36/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://172.16.10.3:3306/b2bdev?characterEncoding=UTF-8&useSSL=false"
jdbc_user => "devmanager"
jdbc_password => "yyh123"
statement => "SELECT MERCHANDISE_ID AS goodsid,MERCHANDISE_CD AS goodscd,ASSISTANT_CODE AS asscode,BUYER AS buyer,MERCHANDISE_NM AS goodsnm,MANUFACTURER_ID AS manuid,created_date as createddate
FROM t_merchandise_info WHERE DELETE_FLAG = 0 LIMIT 0, 200"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
stdout {codec => rubydebug}
elasticsearch {
index => "goods300"
document_id => "%{goodscd}"
document_type => "goods_info"
doc_as_upsert => true
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "goodsInfo"
template => "E:/home/elk/conf/goods_template.json"
}
}
第三种 JSON格式的文本
input {
file {
path => "E:/home/elk/his/jsontest.json"
start_position => "beginning"
codec => json {
charset => "UTF-8"
}
}
}
filter {
json {
source => "message"
remove_field => "message"
}
}
output {
stdout {codec => rubydebug}
#如果不是JSON则不记录到elasticsearch
if "_jsonparsefailure" not in [tags] {
elasticsearch {
index => "jsontest1"
document_type => "jst"
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "jsonInfo"
template => "E:/home/elk/conf/jsontest_template.json"
}
}
}
参考网址:http://tchuairen.blog.51cto.com/3848118/1840596/
http://www.2cto.com/kf/201610/560348.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html
Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理、或一些具有一定格式的数据导入到ES的处理。
工作流程
Logstash 工作的三个阶段:
input 数据输入端,可以接收来自任何地方的源数据。
file:从文件中读取
syslog:监听在514端口的系统日志信息,并解析成RFC3164格式。
redis:从redis-server list 中获取
beat:接收来自Filebeat的事件
Filter 数据中转层,主要进行格式处理,数据类型转换、数据过滤、字段添加,修改等,常用的过滤器如下。
grok: 通过正则解析和结构化任何文本。Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。
mutate: 在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
drop: 完全丢弃事件,如debug事件。
clone: 复制事件,可能添加或者删除字段。
geoip: 添加有关IP地址地理位置信息。
output 是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用,常用的有:
elasticsearch: 发送事件数据到 Elasticsearch,便于查询,分析,绘图。
file: 将事件数据写入到磁盘文件上。
mongodb:将事件数据发送至高性能NoSQL mongodb,便于永久存储,查询,分析,大数据分片。
redis:将数据发送至redis-server,常用于中间层暂时缓存。
graphite: 发送事件数据到graphite。http://graphite.wikidot.com/
statsd: 发送事件数据到 statsd。
二、下载安装
1.下载地址:https://www.elastic.co/products/logstash
2.window下解压到目标文件夹即可
三、使用
1. 编写logstash启动的配置文件
2.编写对应的mapping template文件
第一种:普通有固定分隔符的文件
例如:2017-05-09 10:31:41,378 [INFO ] com.es.common.SystemLog.execute(SystemLog.java:44) - login|IP: 192.168.3.105|MAC: A1245C15-26C1-4263-8845-01CFCA6EC4FD|USERID: 89293|USERNM: sslvoe|
log.conf
input {
file {
type => "tradelog"
#需要读取的文件
path => "E:/home/elk/his/trade.log*"
discover_interval => 5
#从开始位置读取
start_position => "beginning"
#记录读取的位置
sincedb_path => "E:/home/elk/conf/sincedb_trade.txt"
sincedb_write_interval => 15
#文本类型
4000
codec => plain { charset => "GB2312" }
}
}
filter {
grok {
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|USERNM: %{WORD:usernm}\|" }
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|" }
match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }
remove_field => "message"
}
date {
match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]
}
}
output {
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
stdout {codec => rubydebug}
elasticsearch {
index => "tradelog"
document_type => "t_log_type"
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "log4j-tradelog"
template => "E:/home/elk/conf/tradelog_template.json"
}
}
}
tradelog_template.json:
{
"template": "log4j-tradelog*",
"settings": {
"index.number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"tradelog": {
"_all": {
"enabled": false
},
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis",
"doc_values": true
},
"@version": {
"type": "string",
"index": "not_analyzed"
},
"exception": {
"type": "string",
"index": "analyzed"
},
"path": {
"type": "string",
"index": "not_analyzed"
},
"host": {
"type": "string",
"index": "not_analyzed"
},
"ip": {
"type": "ip",
"index": "not_analyzed"
},
"userid": {
"type": "integer",
"index": "not_analyzed"
},
"mac": {
"type": "string",
"index": "not_analyzed"
},
"usernm": {
"type": "string",
"index": "not_analyzed"
},
"operMethod": {
"type": "string",
"index": "not_analyzed"
},
"type": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
执行命令:
logstash -f log.conf
第二种 从数据库中读取数据并插入到es中
input {
jdbc {
jdbc_driver_library => "E:/tools/mvn_repository/mysql/mysql-connector-java/5.1.36/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://172.16.10.3:3306/b2bdev?characterEncoding=UTF-8&useSSL=false"
jdbc_user => "devmanager"
jdbc_password => "yyh123"
statement => "SELECT MERCHANDISE_ID AS goodsid,MERCHANDISE_CD AS goodscd,ASSISTANT_CODE AS asscode,BUYER AS buyer,MERCHANDISE_NM AS goodsnm,MANUFACTURER_ID AS manuid,created_date as createddate
FROM t_merchandise_info WHERE DELETE_FLAG = 0 LIMIT 0, 200"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
stdout {codec => rubydebug}
elasticsearch {
index => "goods300"
document_id => "%{goodscd}"
document_type => "goods_info"
doc_as_upsert => true
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "goodsInfo"
template => "E:/home/elk/conf/goods_template.json"
}
}
第三种 JSON格式的文本
input {
file {
path => "E:/home/elk/his/jsontest.json"
start_position => "beginning"
codec => json {
charset => "UTF-8"
}
}
}
filter {
json {
source => "message"
remove_field => "message"
}
}
output {
stdout {codec => rubydebug}
#如果不是JSON则不记录到elasticsearch
if "_jsonparsefailure" not in [tags] {
elasticsearch {
index => "jsontest1"
document_type => "jst"
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "jsonInfo"
template => "E:/home/elk/conf/jsontest_template.json"
}
}
}
参考网址:http://tchuairen.blog.51cto.com/3848118/1840596/
http://www.2cto.com/kf/201610/560348.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html
相关文章推荐
- [大数据]-Logstash-5.3.1的安装导入数据到Elasticsearch5.3.1并配置同义词过滤
- Logstash 导入数据到Elasticsearch
- logstash 安装导入mysql数据至ElasticSearch
- logstash5.0版本之后导入es数据指定ik分词器详解
- elasticsearch数据导入导出
- 用 Spark 为 Elasticsearch 导入搜索数据
- logstash向elasticsearch写入数据,如何指定多个数据template
- elasticsearch从mysql导入数据
- Elasticsearch的脚本化数据导入导出
- Elasticsearch系列(九)----使用Logstash-input-jdbc同步数据库中的数据到ES
- Windows下使用curl命令向elasticsearch导入示例数据出错问题
- flume采集数据导入elasticsearch 配置
- elasticsearch(5)hive 数据导入Elasticsearch
- 将Mysql数据导入到ElasticSearch集群
- MongoDB数据导入Elasticsearch
- elasticsearch从mysql导入数据
- 2-Elasticsearch集群数据批量导入
- 利用Logstash插件进行Elasticsearch与Mysql的数据
- 利用logstash-output-jdbc从mysql导入数据到es中,如何构建多级节点的JSON
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步