您的位置:首页 > 其它

第三节 ElasticSearch数据导入之Logstash

2017-08-15 18:07 423 查看
一、简介

Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理、或一些具有一定格式的数据导入到ES的处理。

工作流程

Logstash 工作的三个阶段:

input 数据输入端,可以接收来自任何地方的源数据。

file:从文件中读取

syslog:监听在514端口的系统日志信息,并解析成RFC3164格式。

redis:从redis-server list 中获取

beat:接收来自Filebeat的事件

Filter 数据中转层,主要进行格式处理,数据类型转换、数据过滤、字段添加,修改等,常用的过滤器如下。

grok: 通过正则解析和结构化任何文本。Grok 目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化。logstash内置了120个匹配模式,满足大部分需求。

mutate: 在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。

drop: 完全丢弃事件,如debug事件。

clone: 复制事件,可能添加或者删除字段。

geoip: 添加有关IP地址地理位置信息。

output 是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用,常用的有:

elasticsearch: 发送事件数据到 Elasticsearch,便于查询,分析,绘图。

file: 将事件数据写入到磁盘文件上。

mongodb:将事件数据发送至高性能NoSQL mongodb,便于永久存储,查询,分析,大数据分片。

redis:将数据发送至redis-server,常用于中间层暂时缓存。

graphite: 发送事件数据到graphite。http://graphite.wikidot.com/

statsd: 发送事件数据到 statsd。

二、下载安装

1.下载地址:https://www.elastic.co/products/logstash

2.window下解压到目标文件夹即可
三、使用

1. 编写logstash启动的配置文件

2.编写对应的mapping template文件
第一种:普通有固定分隔符的文件

例如:2017-05-09 10:31:41,378 [INFO ] com.es.common.SystemLog.execute(SystemLog.java:44) - login|IP: 192.168.3.105|MAC: A1245C15-26C1-4263-8845-01CFCA6EC4FD|USERID: 89293|USERNM: sslvoe|

log.conf

input {

file {

type => "tradelog"

#需要读取的文件

path => "E:/home/elk/his/trade.log*"

discover_interval => 5

#从开始位置读取

start_position => "beginning"

#记录读取的位置

sincedb_path => "E:/home/elk/conf/sincedb_trade.txt"

sincedb_write_interval => 15

#文本类型
4000

codec => plain { charset => "GB2312" }

}

}

filter {

grok {

match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|USERNM: %{WORD:usernm}\|" }

match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|USERID: %{WORD:userid}\|" }

match => { "message" => "%{DATESTAMP_CN:[@metadata][logdate]} .* - %{WORD:operMethod}\|IP: %{IP:ip}\|MAC: %{GREEDYDATA:mac}\|" }

remove_field => "message"

}

date {

match => ["[@metadata][logdate]", "YYYY-MM-dd HH:mm:ss,SSS"]

}

}

output {

if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {

stdout {codec => rubydebug}

elasticsearch {

index => "tradelog"

document_type => "t_log_type"

hosts => ["127.0.0.1:9200"]

manage_template => true

template_overwrite => true

template_name => "log4j-tradelog"

template => "E:/home/elk/conf/tradelog_template.json"

}

}

}

tradelog_template.json:

{

"template": "log4j-tradelog*",

"settings": {

"index.number_of_shards": 3,

"number_of_replicas": 0

},

"mappings": {

"tradelog": {

"_all": {

"enabled": false

},

"properties": {

"@timestamp": {

"type": "date",

"format": "strict_date_optional_time||epoch_millis",

"doc_values": true

},

"@version": {

"type": "string",

"index": "not_analyzed"

},

"exception": {

"type": "string",

"index": "analyzed"

},

"path": {

"type": "string",

"index": "not_analyzed"

},

"host": {

"type": "string",

"index": "not_analyzed"

},

"ip": {

"type": "ip",

"index": "not_analyzed"

},

"userid": {

"type": "integer",

"index": "not_analyzed"

},

"mac": {

"type": "string",

"index": "not_analyzed"

},

"usernm": {

"type": "string",

"index": "not_analyzed"

},

"operMethod": {

"type": "string",

"index": "not_analyzed"

},

"type": {

"type": "string",

"index": "not_analyzed"

}

}

}

}

}

执行命令:

logstash -f log.conf

第二种 从数据库中读取数据并插入到es中
input {
jdbc {
jdbc_driver_library => "E:/tools/mvn_repository/mysql/mysql-connector-java/5.1.36/mysql-connector-java-5.1.36.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://172.16.10.3:3306/b2bdev?characterEncoding=UTF-8&useSSL=false"
jdbc_user => "devmanager"
jdbc_password => "yyh123"
statement => "SELECT MERCHANDISE_ID AS goodsid,MERCHANDISE_CD AS goodscd,ASSISTANT_CODE AS asscode,BUYER AS buyer,MERCHANDISE_NM AS goodsnm,MANUFACTURER_ID AS manuid,created_date as createddate
FROM t_merchandise_info WHERE DELETE_FLAG = 0 LIMIT 0, 200"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
}
}

filter {
json {
source => "message"
remove_field => ["message"]
}
}

output {
stdout {codec => rubydebug}

elasticsearch {
index => "goods300"
document_id => "%{goodscd}"
document_type => "goods_info"
doc_as_upsert => true
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "goodsInfo"
template => "E:/home/elk/conf/goods_template.json"
}

}

第三种 JSON格式的文本
input {
file {
path => "E:/home/elk/his/jsontest.json"
start_position => "beginning"
codec => json {

charset => "UTF-8"

}
}
}

filter {
json {
source => "message"
remove_field => "message"
}

}

output {
stdout {codec => rubydebug}
#如果不是JSON则不记录到elasticsearch

if "_jsonparsefailure" not in [tags] {
elasticsearch {
index => "jsontest1"
document_type => "jst"
hosts => ["127.0.0.1:9200"]
manage_template => true
template_overwrite => true
template_name => "jsonInfo"
template => "E:/home/elk/conf/jsontest_template.json"
}
}
}

参考网址:http://tchuairen.blog.51cto.com/3848118/1840596/

http://www.2cto.com/kf/201610/560348.html

https://www.elastic.co/guide/en/logstash/current/plugins-filters-json.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/array.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息