1.前言
由于项目涉及到kafka,自己以前没有接触过这方面的,学习了下,将搭建kafka运行环境同大家分享。
2.搭建步骤
第一步,到Apache Kafka官网下载最新的压缩包,比如我下载的就是:
kafka_2.9.2-0.8.1.1.tgz
第二步,解压并启动Zookeeper
tar -xzvf kafka_2.9.2-0.8.1.1.tgz
cd kafka_2.9.2-0.8.1.1
bin/zookeeper-server-start.sh config/zookeeper.properties &
说明: 由于kafka用到了zookeeper,所以应该首先启动它。
应该确保已经安装JDK,并设定好JAVA_HOME,CLASSPATH,PATH这些环境变量。否则会提示:
java command not found
![](http://s3.51cto.com/wyfs02/M01/49/E6/wKioL1QeTWiC80BcAAZgcsalBjE065.jpg)
查看端口信息:
![](http://s3.51cto.com/wyfs02/M00/49/E6/wKioL1QeTbHiD2WTAAFtqrDfcrI689.jpg)
查看zookeeper.properties配置信息:
# the port at which the clients will connect
clientPort=2181
第三步:启动kafka
bin/kafka-server-start.sh config/server.properties 我在启动中遇到了下面的2个问题:
Unrecognized VM option '+UseCompressedOops' 原因及解决办法: kafka用了很多优化运行的jvm参数,而我安装的jdk所带的jvm不一定支持这些参数,比如: -XX:+UseCompressedOops 所以需要编辑kafka-run-class.sh,将这个选项注释掉:
KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:+CMSClassUnloadingEnabled
-XX:+CMSScavengeBeforeRemark
-XX:+DisableExplicitGC -Djava.awt.headless=true" Error occurred during initialization of VM Could not reserve enough space for object heap 原因及解决办法: 查看kafka-server-start.sh配置文件,发现有heap设置信息:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
我们可以java -X来看看这些参数的意义:
![](http://s3.51cto.com/wyfs02/M02/49/E4/wKiom1QeUNnCeNxMAAMWoSYtIfU419.jpg)
-Xms表示JAVA堆内存的初始化大小,而-Xmx表示最大值。 我在创建虚拟机时,指定的内存大小为256M,很显然不够,因此我尝试改为:
#export KAFKA_HEAP_OPTS="-Xmx100m -Xms200m" 但是依然报错,看来这个参数不能调到这么小,最后我将虚拟机内存调到1G,启动成功。
![](http://s3.51cto.com/wyfs02/M00/49/E4/wKiom1QeUpSgUUWJAAbAC-ciE8c891.jpg)
注意输出信息中,端口信息:9092
进程验证:
![](http://s3.51cto.com/wyfs02/M00/49/E6/wKioL1QeYVuCVuhBAAAwdD9ld8Y413.jpg)
端口验证:
![](http://s3.51cto.com/wyfs02/M02/49/E5/wKiom1QeYkexrHeOAABv0xPvBnw149.jpg)
说明: Kafka的进程ID为9300,占用端口为9092 QuorumPeerMain为对应的zookeeper实例,进程ID为6379,在2181端口监听
|
3.一些基本概念kafka是什么?
记住几个关键点,分布式、高吞吐量 的 订阅、发布 消息系统
kafka有什么?
producer 消息的生成者,即发布消息
consumer 消息的消费者,即订阅消息 broker Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker zookeeper 协调转发
kafka的工作图
![](http://s3.51cto.com/wyfs02/M00/49/E7/wKioL1QeZSji0xPbAABVRKnvLJE735.jpg)
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息 kafka对消息进行归纳,即topic,也就是说producer发布topic,consumer订阅topic
下面,我们来一个初步的测试,来加深这些概念的理解。
|
4.模拟客户端发送、接受消息初步测试Step 1 : 创建一个topic
![](http://s3.51cto.com/wyfs02/M02/49/E7/wKioL1QeaQ2C4SzRAAB26yLebNA178.jpg)
命令运行后提示:
Created topic "my_first_topic".
注意,创建topic时需要指明zookeeper的socket(IP+PORT)在哪里,以及topic名称。 (至于partitions,replication-factor这些分区,副本的概念以后再说,暂放)
此时:
zookeeper进程提示:
![](http://s3.51cto.com/wyfs02/M01/49/E7/wKioL1QeabnjK073AAOMyEsPH0U599.jpg)
kafka进程提示:
![](http://s3.51cto.com/wyfs02/M01/49/E5/wKiom1QeaduSXtPtAAFXcwcLPU4105.jpg)
Step 2 : 查看topic list
![](http://s3.51cto.com/wyfs02/M02/49/E7/wKioL1QebB7gc9JcAABrP9DMrKM472.jpg)
Step 3 : 发送、接受消息
下面,我启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。
XSHELL-A
![](http://s3.51cto.com/wyfs02/M01/49/E7/wKioL1Qebd7ybw32AAD0PjKC2yk691.jpg)
XSHELL-B
![](http://s3.51cto.com/wyfs02/M01/49/E5/wKiom1Qebe2QOLySAADxhrQ7Tbk791.jpg)
只要我们在XSHELL-A中输入消息回车,那么马上XSHELL-B中就会有消息显示。
注意: producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker consumer, 指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)
上面的只是一个单个的broker,下面我们来实验一个多broker的集群。
|
5.搭建一个多个broker的集群刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点 也都是在本机上的
Step 1 : 为每一个broker提供配置文件
我们先看看config/server.properties配置信息:
[root@localhost config]# grep -v '#' server.properties | sort
broker.id=0
port=9092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=1000000 说明: broker.id为集群中唯一的标注一个节点,因为在同一个机器上,所以必须指定不同的 端口和日志文件,避免数据被覆盖。
在上面单个broker的实验中,为什么kafka的端口为9092,这里可以看得很清楚。
注意日志目录:
[root@localhost kafka-logs]# pwd
/tmp/kafka-logs
[root@localhost kafka-logs]# ls -l
total 32
drwxr-xr-x 2 root root 4096 Sep 20 22:58 my_first_topic-0
-rw-r--r-- 1 root root 32 Sep 20 23:54 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 32 Sep 20 23:54 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 Sep 20 20:22 test-0
[root@localhost kafka-logs]# tree my_first_topic-0
my_first_topic-0
|-- 00000000000000000000.index
`-- 00000000000000000000.log
0 directories, 2 files
【topic,分区,offset等等这些概念,暂放】
kafka cluster怎么同zookeeper交互的,配置信息中也有体现。
那么下面,我们仿照上面的配置文件,提供2个broker的配置文件:
[root@localhost config]# touch server-1.properties
[root@localhost config]# touch server-2.properties
[root@localhost config]# vi server-1.properties
[root@localhost config]#
[root@localhost config]#
[root@localhost config]# vi server-2.properties
[root@localhost config]#
[root@localhost config]#
[root@localhost config]# cat server-1.properties
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
zookeeper.connect=localhost:2181
[root@localhost config]# cat server-2.properties
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
zookeeper.connect=localhost:2181
Step 2 : 启动所有的broker
由于在上面的实验中,已经启动了zookeeper和一个broker(id=0),那么现在只需要启动 broker(id=1)和broker(id=2)。
命令如下:
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-1.properties &
会发现,zookeeper进程会有提示信息输出。
进程,端口观察:
![](http://s3.51cto.com/wyfs02/M01/49/E7/wKioL1QeemKx4EvIAAIeZbDOnto371.jpg)
一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口 9092,9093,9094监听。
Step 3 : 创建topic
bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3 \
--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_2 --partitions 1 --replication-factor 3 \
--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_3 --partitions 1 --replication-factor 3 \
--zookeeper localhost:2181 查看topic创建情况:
![](http://s3.51cto.com/wyfs02/M01/49/E7/wKioL1QefTzy-ZYFAAF-L9pFdok724.jpg)
上面的有些东西,也许还不太清楚,暂放,继续试验。需要注意的是topic_1的Leader=1
Step 4 : 模拟客户端发送,接受消息
XSHELL-A
bin/kafka-console-consumer.sh --topic topic_1 --zookeeper localhost:2181 --from-beginning
XSHELL-B
bin/kafka-console-producer.sh --topic topic_1 --broker-list localhost:9092,localhost:9093,
localhost:9094 需要注意,此时producer将topic发布到了3个broker中,现在分布式的概念就有点了。
在XSHELL-B中发消息,XSHELL-A中就会有消息显示出来。
Step 5 : kill some broker
测试点一: kill broker(id=0) 首先,我们根据前面的配置,得到broker(id=0)应该在9092监听,这样就能确定它的PID了。
![](http://s3.51cto.com/wyfs02/M01/49/E8/wKioL1QehPChlpqSAAE44i09mes219.jpg)
得到broker(id=0)的PID为9300,那么接下来,我们KILL这个broker:
![](http://s3.51cto.com/wyfs02/M02/49/E8/wKioL1QehX7T7zqWAAEPAY6whpA513.jpg)
再次观察,topic在kafka cluster中的情况:
![](http://s3.51cto.com/wyfs02/M01/49/E6/wKiom1Qehd3B9USOAAH5zv7m2dI080.jpg)
需要与broker(id=0)没有被kill前,做下对比。很明显,主要变化在于Isr,以后在分析。
测试下,发送消息,接受消息,是否收到影响。
生产者:
![](http://s3.51cto.com/wyfs02/M02/49/E6/wKiom1QehpTh5gWVAAEg3vmnALQ852.jpg)
消费者:
![](http://s3.51cto.com/wyfs02/M00/49/E6/wKiom1QehqiQye_YAAEX9hq6Byo444.jpg)
结论,并没有收到影响。
测试点二:
kill broker(id=1)
同上可以得到broker(id=1)的PID为21165,同样的kill它,并测试发送,接受消息 是否收到影响。
发送端:
![](http://s3.51cto.com/wyfs02/M02/49/E8/wKioL1QeiHfxWYVyAAAsZoJsVyI996.jpg)
接受端:
![](http://s3.51cto.com/wyfs02/M02/49/E6/wKiom1QeiGOwltVhAAAq0ik1a_E670.jpg)
可见,kafka的分布式机制,容错能力还是挺好的~
|
本文出自 “学海无涯 心境无限” 博客,请务必保留此出处http://zhangfengzhe.blog.51cto.com/8855103/1556650