Logstash 与Elasticsearch整合使用示例
2015-11-24 13:47
1196 查看
不多废话了,直接上操作步骤:
参考:
Docker:https://hub.docker.com/r/ches/kafka/
HTTP插件:https://github.com/confluentinc/kafka-rest
SSL加密:https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
参考:
SQL化查询插件:https://github.com/NLPchina/elasticsearch-sql/wiki
参考:
Kafka输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
CSV过滤插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html
ES输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html
Logback输入插件:https://github.com/logstash/logstash-logback-encoder
Log4j输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
Websocket输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-websocket.html
几点说明:
在目标系统中加入依赖:
修改logback.xml,加入:
grok测试
http://grokdebug.herokuapp.com/
环境准备
这里使用Docker搭建,我Docker宿主机的IP是192.168.4.99
Zookeeper
docker run -d --name monitor-zk -p 2181:2181 -p 2888:2888 -p 3888:3888 jplock/zookeeper
Kafka
吐嘈一下,docker hub的有不少 kafka ,但好用的太少了docker run -d \ --name monitor-kafka \ -p 9092:9092 \ -e KAFKA_ADVERTISED_HOST_NAME=192.168.4.99 \ -e ZOOKEEPER_IP=192.168.4.99 \ ches/kafka # Kafka Test(可以忽略) docker run --rm --interactive ches/kafka kafka-console-producer.sh --topic m1 --broker-list 192.168.4.99:9092 docker run --rm ches/kafka kafka-console-consumer.sh --topic test --from-beginning --zookeeper 192.168.4.99:2181
参考:
Docker:https://hub.docker.com/r/ches/kafka/
HTTP插件:https://github.com/confluentinc/kafka-rest
SSL加密:https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
Elasticsearch
这里用2.x版本docker run -d \ --name monitor-es \ -p 9200:9200 -p 9300:9300 \ -v /opt/monitor/es/data:/usr/share/elasticsearch/data \ -v /opt/monitor/es/plugins:/usr/share/elasticsearch/plugins \ elasticsearch:2.0.0 \ -Des.node.name="Node01" \ -Des.network.host=::0
参考:
SQL化查询插件:https://github.com/NLPchina/elasticsearch-sql/wiki
Logstash
同样是2.x版本docker run -d \ --name monitor-logstash \ -p 7001-7005:7001-7005 \ -v /opt/monitor/logstash/config/:/config-dir \ -v /opt/monitor/logstash/input_test/:/input_test \ # 用于测试File输入的目录映射 logstash:2.0.0 \ logstash -f /config-dir/config.conf # 配置文件,后面会介绍
参考:
Kafka输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
CSV过滤插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-csv.html
ES输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-filters-elasticsearch.html
Logback输入插件:https://github.com/logstash/logstash-logback-encoder
Log4j输入插件:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
Websocket输出插件:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-websocket.html
Logstash 配置
在上面环境准备的最后一步我们启动了Logstash,其中引用到了一个
config.conf的配置文件,它的格式如下:
input { kafka { # 这个用于kafka输入 type => 'key_trace_log' topic_id => 'format_key_trace_topic' zk_connect => '192.168.4.99:2181' } log4j { # 这个用于Log4j输入 type => 'tp_log_test1' mode => 'server' host => '0.0.0.0' port => 7001 } tcp { # 这个用于Logback输入 type => 'tp_log_test2' host => '0.0.0.0' port => 7002 codec => 'json_lines' } file { # 这个用于File输入 type => 'tp_log_test3' path => ['/input_test/logs/*.log'] # 这是文件所在的目录 start_position => 'beginning' } } filter { if [type] == 'key_trace_log' { csv { # Kafka输入的个message,这里假设是用tab分隔的,对应的字段如下 separator => ' ' columns => ['level','time','job_code','task_code','clue_value','node_code','stage','message'] } } if [type] == 'tp_log_test3' { grok { # 对于File输入的数据需要做正则解析,下面正则解析的内容类似:2015-11-24 11:28:58 [ main:4234 ] - [ DEBUG ] logstash slf4j test loop 109 pattern => '(?<datetime>.{19}) *\[ (?<class>.*):(?<line>\d*) \] - \[ (?<level>.*) \] (?<msg>.*)' } } } output { if [type] == 'key_trace_log' { elasticsearch { hosts => ['192.168.4.99:9200'] index => 'monitor_key_trace_log' document_type => 'default' } websocket { host => '0.0.0.0' port => 7005 } } if [type] == 'tp_log_test1' { elasticsearch { hosts => ['192.168.4.99:9200'] index => 'tp_log' document_type => 'test1' } } if [type] == 'tp_log_test2' { elasticsearch { hosts => ['192.168.4.99:9200'] index => 'tp_log' document_type => 'test2' } } if [type] == 'tp_log_test3' { elasticsearch { hosts => ['192.168.4.99:9200'] index => 'tp_log' document_type => 'test3' } } stdout { codec => rubydebug } }
几点说明:
Logback需要有专门的插件
在目标系统中加入依赖:
<dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>4.5.1</version> </dependency>
修改logback.xml,加入:
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender"> <destination>192.168.4.99:7002</destination> <encoder class="net.logstash.logback.encoder.LogstashEncoder" /> <keepAliveDuration>5 minutes</keepAliveDuration> </appender> <root level="TRACE"> <appender-ref ref="logstash"/> </root>
Log4j需要加上Socket适配
log4j.appender.SOCKET=org.apache.log4j.net.SocketAppender log4j.appender.SOCKET.port=7001 log4j.appender.SOCKET.remoteHost=192.168.4.99
grok测试
http://grokdebug.herokuapp.com/
相关文章推荐
- Kafka 之 中级
- Apache Log4j 2.0-rc1 发布
- log4j详细的常用配置说明介绍
- log4j的使用详细解析
- 详解slf4j+logback在java工程中的配置
- logback 打印spring jdbcTemplate SQL日志
- 使用ElasticSearch+LogStash+Kibana+Redis搭建日志管理服务
- Log4J输出至当前web路径
- Spring3 AOP配置异常。执行时间记录
- log4j.properties配置与加载应用
- 一个配置比较全的log4j.xml文件(一)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- SpringAOP实现自动生成日志
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)
- Log4j的ConversionPattern参数的格式含义
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令