您的位置:首页 > 编程语言 > Java开发

教你无脑在springBoot项目中集成ELK+Kafka

2018-02-06 18:51 711 查看

ELK+Kafka从0开始

简介

(1)Kafka:接收用户日志的消息队列

(2)Logstash:做日志解析,统一成json输出给Elasticsearch

(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。

(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。

(5)Zookeeper: 状态管理,监控进程等服务



切换到JDK1.8

最新的软件都是要求JDK1.8,建议首先切换到JDK版本

yum search jdk
yum install java-1.8.0-openjdk.x86_64 -y

//修改JDK环境配置(例如:)
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_31
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export  PATH=${JAVA_HOME}/bin:$PATH


1.elasticsearch

官网docs:https://www.elastic.co/guide/en/elasticsearch/reference/current/rpm.html

1.下载安装包

**wget https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/rpm/elasticsearch/2.3.5/elasticsearch-2.3.5.rpm**


2.解压启动

**sudo rpm --install elasticsearch-2.3.5.rpm**




配置成系统启动时自动启动:

**sudo systemctl daemon-reload**

**sudo systemctl enable elasticsearch.service**


启动:

**sudo systemctl start elasticsearch.service**


3.查看是否启动:



**systemctl status elasticsearch**




4.查找elasticsearch路径,查看修改elasticsearch.yum

**whereis elasticsearch**

**vi /etc/elasticsearch/elasticsearch.yml**




注意这里要修改host和port

这里我将ip修改成里本机的ip地址,才能让外网访问,修改完成后注意重启.



5.服务的启动和停止

**sudo systemctl start elasticsearch.service**
**sudo systemctl stop elasticsearch.service**


6.无法启动,查看错误日志

**journalctl -xe**

**sudo journalctl --unit elasticsearch**


根据时间搜索:eg

**sudo journalctl --unit elasticsearch --since  "2016-10-30 18:17:16"**


7.启动成功,测试

**curl -XGET "127.0.0.1:920
4000
0"**




在浏览器访问:



2.安装kibana

1.下载

wget https://download.elastic.co/kibana/kibana/kibana-4.5.1-1.x86_64.rpm


2.安装

yum localinstall kibana-4.5.1-1.x86_64.rpm -y




3.启动

sudo systemctl start kibana

查看是否启动成功

systemctl status kibana




4.浏览器访问kibana



3.安装logstash,以及添加配置文件

1.下载logstash

wget -c https://download.elastic.co/logstash/logstash/packages/centos/logstash-2.4.1.noarch.rpm


这里注意;logstash 5.x和6.x版本都需要JDK1.8,而服务器jdk1.7版本所以并没有下载那么高的版本,只能选在2.x

2.安装

yum localinstall logstash-2.4.1.noarch.rpm –y


3.生成证书

[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b pki]# cd tls
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b tls]# ls
cert.pem  certs  misc  openssl.cnf  private

//使用下面命令
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b tls]# openssl req -subj '/CN=elk.test.com/' -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout private/logstash-forwarder.key -out certs/logstash-forwarder.crt

Generating a 2048 bit RSA private key
..+++
.....................................................................+++
writing new private key to 'private/logstash-forwarder.key'
-----


4.之后创建logstash 的配置文件。如下:

cd /etc/logstash/conf.d/
vi 01-logstash-initial.conf


将下面配置写入

input {
beats {
port => 5000
type => "logs"
ssl => true
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}


监听TCP 5044端口上beats 输入,使用上面创建的SSL证书加密,当然你要是不想使用这种方式还可以配置路径收集

input{
file{
path =>"/Develop/Tools/apache-tomcat-8.0.30/webapps/nggirllog/access.log"
start_position=>"beginning"
}
}


创建一个名为10-syslog-filter.conf的配置文件,我们将为syslog消息添加一个过滤器

filter {
if [type] == "syslog-beat" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: % {GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%  {@timestamp}" ]
add_field => [ "received_from", "% {host}" ]
}
geoip {
source => "clientip"
}
syslog_pri {}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}


这个输出基本上配置Logstash来存储input数据到Elasticsearch中,运行在localhost:9200

output {
elasticsearch { }
stdout { codec => rubydebug }

}


5.启动logstash,刚才配置文件端口写的5000

启动
sudo systemctl start logstash

查看是否启动成功
systemctl status logstash




6.查询一下当前启动的三个软件的运行状态

netstat -nltp




ok,测试一下,效果

这个时候其实已经能初步的使用了,你可以将一个java项目的跑在服务器上将日志输出路径陪成和input路径相同,大概能实现如下效果:



4.kafka

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制:



Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

kafka官方文档:

http://kafka.apache.org/documentation

kafka工作原理:https://www.cnblogs.com/hei12138/p/7805475.html

1.从kafka官网下载,并解压kafka

首先确保你的机器上安装了jdk**(最新版kafka需要使用jdk1.8)**,kafka需要java运行环境,以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境.

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz 
//解压
tar -zxvf kafka_2.11-1.0.0.tgz


2.修改配置

//进入到kafka的安装目录下
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b config]# cd /home/centos/kafka_2.12-1.0.0
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b kafka_2.12-1.0.0]# ls
bin  config  libs  LICENSE  NOTICE  site-docs
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b kafka_2.12-1.0.0]# cd config/
[root@test-lufei-6ec9ba1e-3ed1-4409-97dc-4089e858268b config]# ls
connect-console-sink.propertiesconnect-distributed.properties  connect-file-source.properties  connect-standalone.properties  log4j.properties server.properties   zookeeper.properties
connect-console-source.properties  connect-file-sink.propertiesconnect-log4j.propertiesconsumer.propertiesproducer.properties  tools-log4j.properties


修改service.properties文件

#此Broker的ID,集群中每个Broker的ID不可相同

broker.id=0

#监听器,端口号与port一致即可

listeners=PLAINTEXT://localhostls:9092

#Broker监听的端口

port=19092

#Broker的Hostname,填主机IP即可

host.name=172.16.38.176

#向Producer和Consumer建议连接的Hostname和port(此处有坑,具体见后)

advertised.host.name=172.16.38.176

advertised.port=9092

#进行IO的线程数,应大于主机磁盘数

num.io.threads=8

#消息文件存储的路径

log.dirs=/data/kafka-logs

#消息文件清理周期,即清理x小时前的消息记录

log.retention.hours=168

#每个Topic默认的分区数,一般在创建Topic时都会指定分区数,所以这个配成1就行了

num.partitions=1

#Zookeeper连接串,此处填写上一节中安装的三个zk节点的ip和端口即可

zookeeper.connect=172.16.38.176:12180,172.16.38.177:12181,172.16.38.177:12182


3.启动zookeeper,kafka

进入到kafka目录下,启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties


启动zookeeper成功后会看到如下的输出:



cd进入kafka解压目录,启动kafka

bin/kafka-server-start.sh config/server.properties


启动kafka成功后会看到如下的输出:



4.第一个消息

4.1 创建一个topic(主题)

Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷.

在kafka解压目录打开,输入,创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


创建一个名为test的topic

来查看已经创建的topic

bin/kafka-topics.sh --list --zookeeper localhost:2181


4.2 创建一个消息消费者

在kafka解压目录打开终端,输入,创建一个消息消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning


4.3 创建一个消息生产者

在kafka解压目录打开一个新的终端,输入,创建一个消息生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test


输入信息:



查看输出:



4.4 如何通过ip修改,外网访问

进入到config目录下,修改

//listeners = listener_name://host_name:port
//EXAMPLE:
//listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092


启动时注意修改ip

kafka log存放目录:

log.dirs=/tmp/kafka-logs


4.5 修改logstash的配置,让logstash消费kafka的消息

input{
kafka {
zk_connect => "172.28.50.143:2181"
group_id => "logstash"
topic_id => "test1"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}


这里配置完成之后需要重启logstash,这里的配置使logstash变成了kafka对应topic:test1的消费方.

5.向fakfa中放Message

这里向kafka中放消息是很灵活的,spring官网有对kafka的支持:



官方文档地址:https://docs.spring.io/spring-kafka/docs/2.0.3.RELEASE/reference/html/

pom:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>


使用KafkaTemplate向kafka中放入消息;



kafka的configBean



配置参数:

kafka.host.name = 172.28.50.143:9092
kafka.topic.name = test1


当然KafkaTemplate也可以用来消费kafka中的消息,我们这里不需要.

这里实际的代码是很简单的,可以使用过滤器来统一打印Controller的日志,同时将这个项目打成一个jar依赖到其他项目中,尽量减少代码入侵.

代码git地址:

https://github.com/Houlintao1/hou_start1.git

这只是这几天初步研究的简单的demo,理解不深,后续还会补充.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: