您的位置:首页 > 其它

搭建kafka运行环境

2016-06-15 00:00 417 查看
1.前言

由于项目涉及到kafka,自己以前没有接触过这方面的,学习了下,将搭建kafka运行环境同大家分享。

2.搭建步骤

第一步,到ApacheKafka官网下载最新的压缩包,比如我下载的就是:

1
kafka_2.9.2-0.8.1.1.tgz

第二步,解压并启动Zookeeper

1

2

3
tar
-xzvfkafka_2.9.2-0.8.1.1.tgz

cd
kafka_2.9.2-0.8.1.1

bin
/zookeeper-server-start
.shconfig
/zookeeper
.properties&

说明:

由于kafka用到了zookeeper,所以应该首先启动它。

应该确保已经安装JDK,并设定好JAVA_HOME,CLASSPATH,PATH这些环境变量。否则会提示:

javacommandnotfound





查看端口信息:





查看zookeeper.properties配置信息:

7fe0
1

2
#theportatwhichtheclientswillconnect

clientPort=2181

第三步:启动kafka

1
bin
/kafka-server-start
.shconfig
/server
.properties

我在启动中遇到了下面的2个问题:

UnrecognizedVMoption'+UseCompressedOops'
原因及解决办法:
kafka用了很多优化运行的jvm参数,而我安装的jdk所带的jvm不一定支持这些参数,比如:
-XX:+UseCompressedOops
所以需要编辑kafka-run-class.sh,将这个选项注释掉:

1

2

3

4

5

KAFKA_JVM_PERFORMANCE_OPTS="-server-XX:+UseCompressedOops-XX:+UseParNewGC

-XX:+UseConcMarkSweepGC

-XX:+CMSClassUnloadingEnabled

-XX:+CMSScavengeBeforeRemark

-XX:+DisableExplicitGC-Djava.awt.headless=
true
"

ErroroccurredduringinitializationofVMCouldnotreserveenoughspaceforobjectheap
原因及解决办法:
查看kafka-server-start.sh配置文件,发现有heap设置信息:

1
export
KAFKA_HEAP_OPTS=
"-Xmx1G-Xms1G"

我们可以java-X来看看这些参数的意义:





-Xms表示JAVA堆内存的初始化大小,而-Xmx表示最大值。
我在创建虚拟机时,指定的内存大小为256M,很显然不够,因此我尝试改为:

1
#exportKAFKA_HEAP_OPTS="-Xmx100m-Xms200m"

但是依然报错,看来这个参数不能调到这么小,最后我将虚拟机内存调到1G,启动成功。





注意输出信息中,端口信息:9092

进程验证:





端口验证:





说明:
Kafka的进程ID为9300,占用端口为9092
QuorumPeerMain为对应的zookeeper实例,进程ID为6379,在2181端口监听

3.一些基本概念

kafka是什么?

记住几个关键点,分布式、高吞吐量的订阅、发布消息系统

kafka有什么?

producer消息的生成者,即发布消息

consumer消息的消费者,即订阅消息
brokerKafka以集群的方式运行,可以由一个或多个服务组成,服务即broker
zookeeper协调转发

kafka的工作图




producers通过网络将消息发送到Kafka集群,集群向消费者提供消息
kafka对消息进行归纳,即topic,也就是说producer发布topic,consumer订阅topic

下面,我们来一个初步的测试,来加深这些概念的理解。

4.模拟客户端发送、接受消息初步测试

Step1:创建一个topic





命令运行后提示:

Createdtopic"my_first_topic".

注意,创建topic时需要指明zookeeper的socket(IP+PORT)在哪里,以及topic名称。
(至于partitions,replication-factor这些分区,副本的概念以后再说,暂放)

此时:

zookeeper进程提示:




kafka进程提示:




Step2:查看topiclist





Step3:发送、接受消息

下面,我启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。

XSHELL-A





XSHELL-B





只要我们在XSHELL-A中输入消息回车,那么马上XSHELL-B中就会有消息显示。

注意:
producer,指定的Socket(localhost+9092),说明生产者的消息要发往kafka,也即是broker
consumer,指定的Socket(localhost+2181),说明消费者的消息来自zookeeper(协调转发)

上面的只是一个单个的broker,下面我们来实验一个多broker的集群。
5.搭建一个多个broker的集群

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点
也都是在本机上的

Step1:为每一个broker提供配置文件

我们先看看config/server.properties配置信息:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16
[root@localhostconfig]
#grep-v'#'server.properties|sort

broker.
id
=0

port=9092

num.network.threads=2

num.io.threads=8

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

log.
dirs
=
/tmp/kafka-logs

num.partitions=2

log.retention.hours=168

log.segment.bytes=536870912

log.retention.check.interval.ms=60000

log.cleaner.
enable
=
false

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=1000000

说明:
broker.id为集群中唯一的标注一个节点,因为在同一个机器上,所以必须指定不同的
端口和日志文件,避免数据被覆盖。

在上面单个broker的实验中,为什么kafka的端口为9092,这里可以看得很清楚。

注意日志目录:

1

2

3

4

5

6

7

8

9

10

11

12

13
[root@localhostkafka-logs]
#pwd

/tmp/kafka-logs

[root@localhostkafka-logs]
#ls-l

total32

drwxr-xr-x2rootroot4096Sep2022:58my_first_topic-0

-rw-r--r--1rootroot32Sep2023:54recovery-point-offset-checkpoint

-rw-r--r--1rootroot32Sep2023:54replication-offset-checkpoint

drwxr-xr-x2rootroot4096Sep2020:22
test
-0

[root@localhostkafka-logs]
#treemy_first_topic-0

my_first_topic-0

|--00000000000000000000.index

`--00000000000000000000.log

0directories,2files

【topic,分区,offset等等这些概念,暂放】

kafkacluster怎么同zookeeper交互的,配置信息中也有体现。

那么下面,我们仿照上面的配置文件,提供2个broker的配置文件:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18
[root@localhostconfig]
#touchserver-1.properties

[root@localhostconfig]
#touchserver-2.properties

[root@localhostconfig]
#viserver-1.properties

[root@localhostconfig]
#

[root@localhostconfig]
#

[root@localhostconfig]
#viserver-2.properties

[root@localhostconfig]
#

[root@localhostconfig]
#

[root@localhostconfig]
#catserver-1.properties

broker.
id
=1

port=9093

log.
dir
=
/tmp/kafka-logs-1

zookeeper.connect=localhost:2181

[root@localhostconfig]
#catserver-2.properties

broker.
id
=2

port=9094

log.
dir
=
/tmp/kafka-logs-2

zookeeper.connect=localhost:2181

Step2:启动所有的broker

由于在上面的实验中,已经启动了zookeeper和一个broker(id=0),那么现在只需要启动
broker(id=1)和broker(id=2)。

命令如下:

1

2
bin
/kafka-server-start
.shconfig
/server-2
.properties&

bin
/kafka-server-start
.shconfig
/server-1
.properties&

会发现,zookeeper进程会有提示信息输出。

进程,端口观察:





一个zookeeper在2181端口上监听,3个kafkacluster(broker)分别在端口
9092,9093,9094监听。

Step3:创建topic



1

2

3

4

5

6
bin
/kafka-topics
.sh--create--topictopic_1--partitions1--replication-factor3\

--zookeeperlocalhost:2181

bin
/kafka-topics
.sh--create--topictopic_2--partitions1--replication-factor3\

--zookeeperlocalhost:2181

bin
/kafka-topics
.sh--create--topictopic_3--partitions1--replication-factor3\

--zookeeperlocalhost:2181

查看topic创建情况:





上面的有些东西,也许还不太清楚,暂放,继续试验。需要注意的是topic_1的Leader=1

Step4:模拟客户端发送,接受消息

XSHELL-A

1
bin
/kafka-console-consumer
.sh--topictopic_1--zookeeperlocalhost:2181--from-beginning

XSHELL-B

1

2
bin
/kafka-console-producer
.sh--topictopic_1--broker-listlocalhost:9092,localhost:9093,

localhost:9094

需要注意,此时producer将topic发布到了3个broker中,现在分布式的概念就有点了。

在XSHELL-B中发消息,XSHELL-A中就会有消息显示出来。

Step5:killsomebroker

测试点一:
killbroker(id=0)
首先,我们根据前面的配置,得到broker(id=0)应该在9092监听,这样就能确定它的PID了。





得到broker(id=0)的PID为9300,那么接下来,我们KILL这个broker:





再次观察,topic在kafkacluster中的情况:





需要与broker(id=0)没有被kill前,做下对比。很明显,主要变化在于Isr,以后在分析。

测试下,发送消息,接受消息,是否收到影响。

生产者:





消费者:





结论,并没有收到影响。

测试点二:

killbroker(id=1)

同上可以得到broker(id=1)的PID为21165,同样的kill它,并测试发送,接受消息
是否收到影响。

发送端:





接受端:





可见,kafka的分布式机制,容错能力还是挺好的~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: