您的位置:首页 > 其它

Logstash 与Elasticsearch整合使用示例

2015-11-24 13:47 1196 查看
不多废话了,直接上操作步骤:

环境准备

这里使用Docker搭建,我Docker宿主机的IP是
192.168.4.99


Zookeeper

docker run -d --name monitor-zk -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper

Kafka

吐嘈一下,docker hub的有不少 kafka ,但好用的太少了

docker run -d \
--name monitor-kafka \
-p 9092:9092 \
-e KAFKA_ADVERTISED_HOST_NAME=192.168.4.99 \
-e ZOOKEEPER_IP=192.168.4.99 \
ches/kafka

# Kafka Test(可以忽略)
docker run --rm --interactive ches/kafka  kafka-console-producer.sh --topic m1 --broker-list 192.168.4.99:9092
docker run --rm ches/kafka kafka-console-consumer.sh --topic test --from-beginning --zookeeper 192.168.4.99:2181

参考:

Docker:https://hub.docker.com/r/ches/kafka/

HTTP插件:https://github.com/confluentinc/kafka-rest

SSL加密:https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka

Elasticsearch

这里用2.x版本

docker run -d \
--name monitor-es \
-p 9200:9200 -p 9300:9300 \
-v /opt/monitor/es/data:/usr/share/elasticsearch/data \
-v /opt/monitor/es/plugins:/usr/share/elasticsearch/plugins \
elasticsearch:2.0.0 \
-Des.node.name="Node01" \
-Des.network.host=::0

参考:

SQL化查询插件:https://github.com/NLPchina/elasticsearch-sql/wiki

Logstash

同样是2.x版本

docker run -d \
--name monitor-logstash \
-p 7001-7005:7001-7005 \
-v /opt/monitor/logstash/config/:/config-dir \
-v /opt/monitor/logstash/input_test/:/input_test \ # 用于测试File输入的目录映射
logstash:2.0.0 \
logstash -f /config-dir/config.conf # 配置文件,后面会介绍

参考:

Kafka输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

CSV过滤插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html

ES输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html

Logback输入插件:https://github.com/logstash/logstash-logback-encoder

Log4j输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html

Websocket输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-websocket.html

Logstash 配置

在上面环境准备的最后一步我们启动了
Logstash
,其中引用到了一个
config.conf
的配置文件,它的格式如下:

input {
kafka { # 这个用于kafka输入
type => 'key_trace_log'
topic_id => 'format_key_trace_topic'
zk_connect => '192.168.4.99:2181'
}
log4j { # 这个用于Log4j输入
type => 'tp_log_test1'
mode => 'server'
host => '0.0.0.0'
port => 7001
}
tcp { # 这个用于Logback输入
type => 'tp_log_test2'
host => '0.0.0.0'
port => 7002
codec => 'json_lines'
}
file { # 这个用于File输入
type => 'tp_log_test3'
path => ['/input_test/logs/*.log'] # 这是文件所在的目录
start_position => 'beginning'
}
}
filter {
if [type] == 'key_trace_log' {
csv { # Kafka输入的个message,这里假设是用tab分隔的,对应的字段如下
separator => '	'
columns => ['level','time','job_code','task_code','clue_value','node_code','stage','message']
}
}
if [type] == 'tp_log_test3' {
grok { # 对于File输入的数据需要做正则解析,下面正则解析的内容类似:2015-11-24 11:28:58  [ main:4234 ] - [ DEBUG ]  logstash slf4j test loop 109
pattern => '(?<datetime>.{19}) *\[ (?<class>.*):(?<line>\d*) \] - \[ (?<level>.*) \] (?<msg>.*)'
}
}
}
output {
if [type] == 'key_trace_log' {
elasticsearch {
hosts => ['192.168.4.99:9200']
index => 'monitor_key_trace_log'
document_type => 'default'
}
websocket {
host => '0.0.0.0'
port => 7005
}
}
if [type] == 'tp_log_test1' {
elasticsearch {
hosts => ['192.168.4.99:9200']
index => 'tp_log'
document_type => 'test1'
}
}
if [type] == 'tp_log_test2' {
elasticsearch {
hosts => ['192.168.4.99:9200']
index => 'tp_log'
document_type => 'test2'
}
}
if [type] == 'tp_log_test3' {
elasticsearch {
hosts => ['192.168.4.99:9200']
index => 'tp_log'
document_type => 'test3'
}
}
stdout { codec => rubydebug }
}

几点说明:

Logback
需要有专门的插件


在目标系统中加入依赖:

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.5.1</version>
</dependency>

修改logback.xml,加入:

<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>192.168.4.99:7002</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder" />
<keepAliveDuration>5 minutes</keepAliveDuration>
</appender>

<root level="TRACE">
<appender-ref ref="logstash"/>
</root>

Log4j
需要加上Socket适配


log4j.appender.SOCKET=org.apache.log4j.net.SocketAppender
log4j.appender.SOCKET.port=7001
log4j.appender.SOCKET.remoteHost=192.168.4.99

grok测试

http://grokdebug.herokuapp.com/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息