您的位置:首页 > 运维架构

Logstash结合Kafka并根据不同topic创建ES区分索引

2020-05-07 04:11 2391 查看

目录

1.前言

2.实践过程

3.总结

1.前言

前段时间,自己通过SpringCloud框架,结合kafka集成了ELK框架(博客链接:https://www.geek-share.com/detail/2799795801.html)。之前的框架整体还有可优化空间以及需要补充的地方,今天特补充一下ELK中根据不同索引,查看不同服务的日志记录。而此情况下,就涉及到logstash与ES的索引结合,下面直接贴干货。

2.实践过程

首先,还是使用kafka再新创建一个新主题:

[code]kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic otherlog

其次在两个微服务项目的logback-spring.xml文件中的kafka appender分别配置上对应的topic。之前框架中已配置了一个名为testlog的,今天这个项目配置上面创建的名为otherlog的。

[code]<appender name="kafkaAppender"  class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!--<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">-->
<!--<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>-->
<!--</encoder>-->
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<!--此类是为了解决kafka接收日志中文变乱码问题-->
<jsonFactoryDecorator class="com.giveu.newwebeurekaclient11003.encoder.MyJsonFactoryDecorator"/>
<providers>
<timestamp>
<timeZone>GMT+8</timeZone>
</timestamp>
<!--必须用两个pattern,不然日志不完整-->
<pattern>
<pattern>
{
"ip":"%ip",
"severity": "%level",
"service": "${springAppName:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"parent": "%X{X-B3-ParentSpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message",
"stack_trace": "%exception{30}"
}
</pattern>
</pattern>
</providers>
</encoder>
<!-- 此处即为kafka的主题Topic名称-->
<topic>otherlog</topic>
<!-- we don't care how the log messages will be partitioned  -->
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />

<!-- use async delivery. the application threads are not blocked by logging -->
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<!-- don't wait for a broker to ack the reception of a batch.  -->
<producerConfig>acks=0</producerConfig>
<!-- wait up to 1000ms and collect log messages before sending them as a batch -->
<producerConfig>linger.ms=1000</producerConfig>
<!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
<producerConfig>max.block.ms=0</producerConfig>
<!-- define a client-id that you use to identify yourself against the kafka broker -->
<producerConfig>client.id=0</producerConfig>
<!-- this is the fallback appender if kafka is not available. -->
<appender-ref ref="CONSOLE" />
</appender>

接下来,修改logstash的配置文件,我这边还是在上个框架的core.conf中直接修改,内容如下:

[code]input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["testlog"]
codec => "json"
auto_offset_reset => "earliest" #从最早的偏移量开始消费
decorate_events => true    #此属性会将当前topic、offset、group、partition等信息也带到message中
type => "testlog" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
}

kafka {
bootstrap_servers => "localhost:9092"
topics => ["otherlog"]
codec => "json"
auto_offset_reset => "earliest" #从最早的偏移量开始消费
decorate_events => true    #此属性会将当前topic、offset、group、partition等信息也带到message中
type => "otherlog" #所有插件通用属性,尤其在input里面配置多个数据源时很有用
}
}

output {

if [type] == "testlog" {
elasticsearch {
hosts => ["localhost:9200"]
index => "testlog-%{+YYYY.MM.dd}"
}
}

if [type] == "otherlog" {
elasticsearch {
hosts => ["localhost:9200"]
index => "otherlog-%{+YYYY.MM.dd}"
}
}

stdout {
codec => rubydebug {metadata => true}  #logstash控制台输出日志和@metadata信息
}
}

最后在kibana中根据上面指定的索引名称testlog-*以及otherlog-*分别创建新的索引,再分别启动相应的应用与服务,即可达到根据不同主题创建不同索引来区分日志查看啦。效果如下:

3.总结

其实区分创建索引的方式还有很多种,期间也有尝试过其他方式,但是还没有尝试到什么成果,目前这种是已成功且较为简单的方式,希望能够帮助到各位。

尝试期间,有以下几个阻力点:

①logstash中有一个@metadata全局变量,此变量中还有很多元数据,也可以自己通过filter向其中摄入自己定义的数据来使用。但我使用的logstash5.2.2版本貌似需要logstash-kafka插件才可以实现kafka的broker信息。例如

[@metadata][kafka][topic]
:消息被消费的原始kafka主题,这个变量我这边就尝试了多次,无法获取到。导致创建索引失败,卡壳了很久。官方有文档,下面这个帖子也写的很清楚:https://segmentfault.com/a/1190000016595992。大家可以参考一下。

②logstash有好几个任何版本都通用的基础变量,如:

设置 输入类型 要求
add_field
hash No
codec
codec No
enable_metric
boolean No
id
string No
tags
array No
type
string

No

 

我这边就是引用到了Type才做到了topic区分索引。

善良勤劳勇敢而又聪明的老杨 原创文章 77获赞 64访问量 9万+ 关注 私信
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: