Logstash结合Kafka并根据不同topic创建ES区分索引
2020-05-07 04:11
2391 查看
目录
1.前言
前段时间,自己通过SpringCloud框架,结合kafka集成了ELK框架(博客链接:https://www.geek-share.com/detail/2799795801.html)。之前的框架整体还有可优化空间以及需要补充的地方,今天特补充一下ELK中根据不同索引,查看不同服务的日志记录。而此情况下,就涉及到logstash与ES的索引结合,下面直接贴干货。
2.实践过程
首先,还是使用kafka再新创建一个新主题:
[code]kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic otherlog
其次在两个微服务项目的logback-spring.xml文件中的kafka appender分别配置上对应的topic。之前框架中已配置了一个名为testlog的,今天这个项目配置上面创建的名为otherlog的。
[code]<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <!--<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">--> <!--<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>--> <!--</encoder>--> <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"> <!--此类是为了解决kafka接收日志中文变乱码问题--> <jsonFactoryDecorator class="com.giveu.newwebeurekaclient11003.encoder.MyJsonFactoryDecorator"/> <providers> <timestamp> <timeZone>GMT+8</timeZone> </timestamp> <!--必须用两个pattern,不然日志不完整--> <pattern> <pattern> { "ip":"%ip", "severity": "%level", "service": "${springAppName:-}", "trace": "%X{X-B3-TraceId:-}", "span": "%X{X-B3-SpanId:-}", "parent": "%X{X-B3-ParentSpanId:-}", "exportable": "%X{X-Span-Export:-}", "pid": "${PID:-}", "thread": "%thread", "class": "%logger{40}", "rest": "%message", "stack_trace": "%exception{30}" } </pattern> </pattern> </providers> </encoder> <!-- 此处即为kafka的主题Topic名称--> <topic>otherlog</topic> <!-- we don't care how the log messages will be partitioned --> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <!-- use async delivery. the application threads are not blocked by logging --> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --> <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --> <!-- bootstrap.servers is the only mandatory producerConfig --> <producerConfig>bootstrap.servers=localhost:9092</producerConfig> <!-- don't wait for a broker to ack the reception of a batch. --> <producerConfig>acks=0</producerConfig> <!-- wait up to 1000ms and collect log messages before sending them as a batch --> <producerConfig>linger.ms=1000</producerConfig> <!-- even if the producer buffer runs full, do not block the application but start to drop messages --> <producerConfig>max.block.ms=0</producerConfig> <!-- define a client-id that you use to identify yourself against the kafka broker --> <producerConfig>client.id=0</producerConfig> <!-- this is the fallback appender if kafka is not available. --> <appender-ref ref="CONSOLE" /> </appender>
接下来,修改logstash的配置文件,我这边还是在上个框架的core.conf中直接修改,内容如下:
[code]input { kafka { bootstrap_servers => "localhost:9092" topics => ["testlog"] codec => "json" auto_offset_reset => "earliest" #从最早的偏移量开始消费 decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中 type => "testlog" #所有插件通用属性,尤其在input里面配置多个数据源时很有用 } kafka { bootstrap_servers => "localhost:9092" topics => ["otherlog"] codec => "json" auto_offset_reset => "earliest" #从最早的偏移量开始消费 decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中 type => "otherlog" #所有插件通用属性,尤其在input里面配置多个数据源时很有用 } } output { if [type] == "testlog" { elasticsearch { hosts => ["localhost:9200"] index => "testlog-%{+YYYY.MM.dd}" } } if [type] == "otherlog" { elasticsearch { hosts => ["localhost:9200"] index => "otherlog-%{+YYYY.MM.dd}" } } stdout { codec => rubydebug {metadata => true} #logstash控制台输出日志和@metadata信息 } }
最后在kibana中根据上面指定的索引名称testlog-*以及otherlog-*分别创建新的索引,再分别启动相应的应用与服务,即可达到根据不同主题创建不同索引来区分日志查看啦。效果如下:
3.总结
其实区分创建索引的方式还有很多种,期间也有尝试过其他方式,但是还没有尝试到什么成果,目前这种是已成功且较为简单的方式,希望能够帮助到各位。
尝试期间,有以下几个阻力点:
①logstash中有一个@metadata全局变量,此变量中还有很多元数据,也可以自己通过filter向其中摄入自己定义的数据来使用。但我使用的logstash5.2.2版本貌似需要logstash-kafka插件才可以实现kafka的broker信息。例如
[@metadata][kafka][topic]:消息被消费的原始kafka主题,这个变量我这边就尝试了多次,无法获取到。导致创建索引失败,卡壳了很久。官方有文档,下面这个帖子也写的很清楚:https://segmentfault.com/a/1190000016595992。大家可以参考一下。
②logstash有好几个任何版本都通用的基础变量,如:
设置 | 输入类型 | 要求 |
---|---|---|
add_field |
hash | No |
codec |
codec | No |
enable_metric |
boolean | No |
id |
string | No |
tags |
array | No |
type |
string |
No |
我这边就是引用到了Type才做到了topic区分索引。
善良勤劳勇敢而又聪明的老杨 原创文章 77获赞 64访问量 9万+ 关注 私信相关文章推荐
- kafka创建topic时如何将分区放置到不同的broker中
- Kafka创建Topic时如何将分区放置到不同的Broker中
- 脚本——根据昨天es索引的大小,提前创建明天的索引
- 记一次logback传输日志到logstash根据自定义设置动态创建ElasticSearch索引
- Kafka创建Topic时如何将分区放置到不同的Broker中
- 使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)
- hbase 根据需求创建组合索引(组合rowkey)及组合索引创建规则
- kafka->logstash->es
- 保存log时,根据不同级别采用不同颜色区分,文件保存为Html或者输出到控制台
- 表上不同列相比较 索引创建
- 统一日志检索部署(es、logstash、kafka、flume)
- Spark Streaming结合 Kafka 两种不同的数据接收方式比较
- [Spark][kafka]kafka 的topic 创建和删除试验
- 【我的Android进阶之旅】快速创建和根据不同的版本类型(Dev、Beta、Release)发布Android 开发库到Maven私服
- 在窗体窗口上创建复选框、单选框、文本区域、单行文本框等组件,并实现根据用户输入的10 进制数,选择不同选项可转换为2、8、16 进制数
- Es创建索引、设置和修改Mapping
- kafka 创建topic,查看topic
- kafka 创建topic
- iOS 在一个屏幕中根据不同的按钮创建不同的 view 默认显示第一个
- kafka删除和创建topic