您的位置:首页 > 其它

Logstash日志分析的配置和使用

2017-11-04 12:05 411 查看
https://www.elastic.co/guide/en/logstash/current/index.html

首先下载logstash,上传到服务器

logstash 日志分析的配置和使用

logstash是一个数据分析软件,主要目的是分析log日志;整套软件可以当做一个mvc模型.logstash是controller层,Elasticsearch是一个model层,kabana是view层

首先将数据传给logstash,它将数据进行过滤和格式化[转成JSON格式],然后传给Elasticsearch进行存储,建搜索的索引,kabana提供前端的页面再进行搜索个图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化



Logstash工作的三个阶段:

第一阶段:

Input数据输入端,可以接收来自任何地方的源数据:

file:从文件中读取
syslog:监听在514端口的系统日志信息,并解析成RFC3164格式
redis:从redis-server  list   中获取
beat:接收来自Filebeat的事件


第二阶段:

Filter数据中转层,主要进行格式处理,数据类型转换,数据过滤,字段添加,修改等,常用的过滤器:

grok:通过正则解析和结构化任何文本;Grok目前是logstash最好的方式对非结构化日志数据解析成结构化和可查询化;logstash内置了120个匹配模式,满足大部分需求
mutate:在事件字段执行一般的转换;可以重命名,删除,替换和修改事件字段
drop:完全丢弃事件,如debug事件
clone:复制事件,可能添加或者删除字段
geoip:添加有关ip地址地理位置信息


第三阶段:

Output是logstash工作的最后一个阶段,负责将数据输出到指定位置,兼容大多数应用:

elasticsearch:发送事件数据到Elasticsearch,便于查询,分析,绘图
file:将事件数据写入到磁盘文件上
mongodb:将事件数据发送到高性能NoSQL  mongodb,便于永久存储,查询,分析,大数据分片
redis:将数据发送至redis-server,常用于中间层暂时缓存
graphite:发送事件数据到graphite
statsd:发送事件数据到statsd


解压:

tar -zxvf logstash-2.3.1.tar.gz

无需配置,直接启动:

bin/logstash -e ‘input { stdin {} } output { stdout{} }’

bin/logstash -e ‘input { stdin {} } output { kafka { topic_id => “test1” bootstrap_servers => “192.168.88.81:9092,192.168.88.82:9092,192.168.88.83:9092”} stdout{codec => rubydebug} }’



kafka消费者输出端:[以最后一行的输出为主]



以配置的形式:

vi logstash-kafka.conf

# logstash使用{}来定义配置区域,区域内又可以包含其插件的区域配置

#最基本的配置文件定义,必须包含input和output

input {

file {

path => “/home/person2.txt”

discover_interval => 5

start_position => “beginning”

}

}

#可以定义多个输出源与多个输出位置

output {

kafka {

topic_id => “test1”

codec => plain {

format => “%{message}”

charset =>”UTF-8”

}

bootstrap_servers => “minimaster:9092,miniSlave1:9092,miniSlave2:9092”

}

}

启动logstash:

bin/logstash -f conf/logstash-kafka.conf



修改 vi /home/person2.txt 保存后查看kafka消费端

结果看最后几行:





代码展示:

vi gs-kafka.conf

input {
file {
codec => plain {
charset => "GB2312"
}
path => "/root/loserver/basedir/*/*.txt"
discover_interval => 30
start_position => "beginning"
}
}

output {
kafka{
topic_id => "gamelog"
codec => plain {
format => "%{message}"
charset => "GB2312"
}
bootstr
aaa3
ap_servers => "minimaster:9092,miniSlave1:9092,miniSlave2:9092"
}
}


新建kafka新的topic,名为gamelog

./kafka-topics.sh --create --zookeeper minimaster:2181 --replication-factor 1 --partitions 1 --topic gamelog


启动kafka消费者[gamelog]:

./kafka-console-consumer.sh --zookeeper minimaster:2181 --from-beginning --topic gamelog


启动文件,即修改/root/loserver/basedir//.txt中的文件

看消费者是否打印出内容:



vi kafka-es.conf

input {
kafka {
type => "gamelog"
auto_offset_reset => "smallest"
codec => "plain"
group_id => "elas2"
topic_id => "gamelog"
zk_connect => "minimaster:2181,miniSlave1:2181,miniSlave2:2181"
}
}

filter {
if [type] == "gamelog" {
mutate {
split => { "message" => " " }
add_field => {
"event_type" => "%{message[3]}"
"current_map" => "%{message[4]}"
"current_X" => "%{message[5]}"
"current_y" => "%{message[6]}"
"user" => "%{message[7]}"
"item" => "%{message[8]}"
"item_id" => "%{message[9]}"
"current_time" => "%{message[12]}"
}
remove_field => [ "message" ]
}
}
}

output {
if [type] == "gamelog" {
elasticsearch {
index => "gamelogs"
codec => plain {
charset => "UTF-16BE"
}
hosts => ["minimaster:9200", "miniSlave1:9200", "miniSlave2:9200"]
}
}
}


查看网页http://192.168.222.156:9200/_plugin/head/

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: