您的位置:首页 > 其它

Kafka集群安装

2015-07-28 21:44 190 查看
目 录

第一章 初始环境准备 1

1.1 部署规划 1

1.2 JDK安装 1

1.3 其他配置 3

1.3.1 时间同步 3

1.3.2 关闭防火墙 4

1.3.3 更改主机名 5

1.3.4 修改/etc/hosts 5

第二章 Kafka分布式环境 7

2.1 安装zookeeper 7

2.1.1 开始安装 7

2.1.2 配置zookeeper 7

2.1.3 运行zookeeper 9

2.2 安装kafka 9

2.2.1 开始安装 9

2.2.2 配置kafka 10

2.2.3 运行kafka 11

2.2.4 常用运维操作 12

附录A 相关技术术语 14

附录B 所需软件清单 15

附录D 参考资源索引 16

第一章 初始环境准备

1.1 部署规划

部署环境所用机器均安装64位CentOS 6.5 操作系统。开发环境中用到了3台机器,具体规划如下所示:

zookeeper集群(端口:2181)

192.168.23.128

192.168.23.129

192.168.23.130

kafka集群(存在多块盘的时候,请尽量按照多个挂载点挂载, 比如/data0, /data1, /data2,这样kafka持久化的时候可以利用上多个盘,同时规范化)

192.168.23.128

192.168.23.129

192.168.23.130

每台机器上新建一个用户hadoop,作为安装运行zookeeper和kafka集群时使用:

>$groupadd hadoop;

>$useradd -g hadoop hadoop;

注:使用过程中请将下列内容加入本机hosts文件中(但如果做了内网解析可以省略此步骤)

192.168.23.128 virt1.weiyun.com

192.168.23.129 virt2.weiyun.com

192.168.23.130 virt3.weiyun.com

1.2 JDK安装

部署环境中很多服务或应用都需要Java运行环境支持,所以需要首先安装JDK,为了方便,所有主机上执行完全一样的JDK安装操作。本文会以在主机名为virt1.weiyun.com的机器上安装jdk-7u75-linux-x64.tar.gz(附录B所需软件清单[1])为例,说明安装步骤。

1) 准备安装文件

以root权限用户登录系统,首先将安装包jdk-7u75-linux-x64.tar.gz传到virt1.weiyun.com主机的/usr/local/src路径下,进入该目录,解压:

2)配置环境变量

用vi编辑器打开/etc/profile文件,在文件末尾增加标红部分后,保存退出,然后命令行下执行 “source /etc/profile”生效:

5)验证是否成功

在命令行下输入命令java -version,如果出现以下信息,说明安装成功:

另外,由于CentOS自带安装了OpenJDK,所以需要首先将其卸载,方法是执行如下命令:rpm -qa | grep gcj; yum -y remove java + java软件包。当然,如果想偷懒,不卸掉之前系统自带的JDK,可以在设置PATH环境变量的时候,将$JAVA_HOME/bin放在$PATH前面,效果如上文截图中所展现的那样。

加油站

1.3 其他配置

为保证部署过程的顺利进行,需要提前做好以下检查及配置工作。

1.3.1 时间同步

已做时间同步的请忽略此部分

首先选择一台服务器作为时间同步服务器,本文选择10.28.172.100,该主机具有外网访问能力。首先准备好软件包ntp-4.2.4p0.tar.gz

1)安装NTP时间服务器

#cd /usr/local/src

#tar –zxxvf ntp-4.2.4p0.tar.gz

#cd ntp-4.2.4p0

#./configure –prefix=/usr/local/ntp –enable-all-clocks –enable-parse-clocks

#make && make install

修改ntp.conf配置文件,将“restrict default nomodify notrap noquery”这行修改成:restrict default nomodify。这样表示允许任何IP的客户机都可以进行时间同步。如果只允许192.168.6.*网段的客户机进行时间同步,在restrict default nomodify notrap noquery (表示默认拒绝所有IP的时间同步)之后增加一行:restrict 192.168.6.0 mask 255.255.255.0 nomodify

以守护进程启动ntpd

#/usr/local/ntp/bin/ntpd –c /etc/ntp.conf –p /tmp/ntpd.pid

ntpd启动后,客户机要等几分钟再与其进行时间同步,否则会提示“no server suitable for synchronization found”错误。

2)客户机配置时间同步

以root用户登录需要配置时间同步的客户机,然后

#vi /var/spool/cron/root

添加一行:

* * * * * /usr/sbin/ntpdate 10.28.172.100 > /dev/null 2>&1

前面5个星号为cron表达式,表示每分钟执行下时间同步操作。

加油站

1.3.2 关闭防火墙

由于集群环境中,各主机需要相互通信协作,经常会以主机名方式相互通信,而防火墙有时由于设置不当,会阻止。所以比较简单的方法就是关闭掉所有主机上的防火墙。

#永久性生效,重启后不会复原

开启:chkconfig iptables on

关闭:chkconfig iptables off

#即时生效,重启后复原

开启:service iptables start

关闭:sevice iptables stop

当然也可以通过修改防火墙策略,将集群主机IP放在允许之列。方法参照如下网页:
http://bbs.chinaunix.net/thread-2204793-1-1.html

1.3.3 更改主机名

首先需要设置每台主机的主机名,方法是以root权限用户登录每台主机,执行如下命令:

然后进入vi编辑器修改hostname为自己想修改的名字:

最后回到命令行状态,红色标记部分必须与之前vi编辑器中输入的名字保持一致,示例为“master”,第二个hostname命令是查看更改是否成功,如下:

1.3.4 修改/etc/hosts

本文的主机名及IP对应关系如下:

192.168.23.128 virt1.weiyun.com

192.168.23.129 virt2.weiyun.com

192.168.23.130 virt3.weiyun.com

需要保证各主机之间可以通过主机名相互解析,具体验证方法是直接ping+主机名,看能否相互ping通,这是安装kafka集群的前提。所以,我们需要修改每台主机的/etc/hosts文件,每台主机在该文件末尾添加标红部分设置即可,根据本文环境最后修改为:

第二章 Kafka分布式环境

本文使用的搭建这套分布式环境的具体分配情况见第一章部署规划。

2.1 安装zookeeper

2.1.1 开始安装

本文采用zookeeper-3.4.5-cdh5.3.5.tar(附录B所需软件清单[2]),首先以root身份登录系统,将其上传到virt1.weiyun.com主机的/usr/local/src目录下,切换到/usr/local/src目录下,执行以下命令后,在当前目录下产生“zookeeper-3.4.5-cdh5.3.5”:

2.1.2 配置zookeeper

以下操作均以hadoop账户操作。

Zookeeper是一个高可用、高可靠的协同工作系统,分布式程序可以用它保存并更新关键共享状态。选定几台机器作为Zookeeper的集群机器,本文选取master、slave1和slave2组成Zookeeper集群。

1)zoo.cfg

首先将${ZOOKEEPER_HOME}/conf/zoo_sample.cfg拷贝为zoo.cfg:

然后做如下修改即可:

其中tickTime是服务端和客户端之间交互的基本时间单位,单位为毫秒;dataDir保存Zookeeper数据,日志的路径;syncLimit是服务端和客户端之间请求应答之间的时间间隔,它是tickTime的倍数;clientPort是客户端与Zookeeper相互交互的端口号,默认情况下是2181。

server.A=B:C:D,其中A是一个数字,表示第几号服务器主机;B是服务器主机名;C表示服务器与集群中的“Leader”交换信息的端口号;当“Leader”失效后,D表示用来执行选举时服务器相互通信的端口。

2)myid

在/data0/zookeeper/data目录下创建文件myid,内容为zoo.cfg中server后面指定的id,以后与对应Zookeeper集群编号保持一致,如virt1.weiyun.com对应的myid中应该是1:

使用scp命令将virt1.weiyun.com机器上配置完成的Zookeeper安装文件打包,然后拷贝到Zookeeper集群的其他主机,如拷贝到virt2.weiyun.com:

以hadoop账户分别登录另外两台主机,然后解压:

修改${ZOOKEEPER_HOME}/目录下得myid,virt2为2,virt3为3。(注意这个数字根据zoo.cfg中的server编号保持一致)

2.1.3 运行zookeeper

在每台Zookeeper集群主机中,以hadoop账户执行如下命令(命令均在${ZOOKEEPER_HOME}/bin):

加油站

2.2 安装kafka

2.2.1 开始安装

本文采用kafka_2.10-0.8.2.1.tgz(附录B所需软件清单[3]),首先以hadoop身份登录系统,将其上传到待安装主机的/usr/local/src目录下,切换到/home/hadoop目录下,将kafka解压到该目录下,同时请确保kafka要用到的目录hadoop账户都是有读写权限的。

2.2.2 配置kafka

将kafka添加到环境变量中,su hadoop切换到hadoop用户后执行以下命令:vim ~/.bashrc,具体添加内容如下:

本文涉及到的配置文件有${KAFKA_HOME}/bin/kafka-run-class.sh, ${KAFKA_HOME}/config/server.properties。

1)kafka-run-class.sh

默认Kafka运行的时候都会通过log4j打印很多日志文件,比如server.log, controller.log, state-change.log等,而都会将其输出到$KAFKA_HOME/logs目录下,这样很不利于线上运维,因为经常容易出现打爆文件系统,一般安装的盘都比较小,而数据和日志会指定打到另一个或多个更大空间的分区盘.具体方法是,打开KAFKA_HOME/bin/kafka-run-class.sh,找到下面标示的位置,并定义一个变量,指定的值为系统日志输出路径,重启broker即可生效。

2)server.properties

a. broker.id:每个服务器上安装的kafka节点需要唯一指定一个非负整数用于表明该节点。

b. log.dirs:指定kafka用于消息持久化时所存储的本地文件路径,可以指定多个路径,以逗号分隔开。

c. port:broker的端口号,默认9092。

d. zookeeper.connect:kafka依赖的zookeeper列表,可以指定所使用的zk根节点或者不用指定;

e. host.name:一般配置成本机的ip地址。

f.log.retention.hours:持久化的消息日志文件保留的时间。

g.auto.create.topics.enable: 是否必须提前创建好topic。

h.delete.topic.enable:是否允许删除topic,该特性是0.8.2新增的。

i.default.replication.factor:默认的partition备份因子,可提供HA机制。

j.zookeeper.session.timeout.ms:zookeeper的session过期时间。

k.num.partitions:创建topic时默认生成的partition个数。

其他需要作为kafka broker的机器上执行类似的操作和配置即可。

2.2.3 运行kafka

首先确保依赖的zk集群已经正常启动,然后同样以hadoop用户执行以下操作(每个节点都是类似这样操作):

使用jps命令查看进程情况,如果主机节点出现进程,同时查看对应的日志(kafka-run-class.sh里面更改的日志输出路径)里面如果没有任何异常信息,说明启动成功:

2.2.4 常用运维操作

A.新建topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3

B.修改topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x

C.删除topic(必须设置delete.topic.enable=true)

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

D.查看消费位置

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

E.查看所有topic

bin/kafka-topics.sh --list --zookeeper loocalhost:2181

F.查看某个topic详情

bin/kafka-topics.sh Ῠ --topic test --describe --zookeeper localhost:2181

... ...

加油站

至此,Kafka-MQ完全分布式整合安装成功完成,安装过程中某些参数配置请根据实际情况作适当修改,避免由于参数配置错误导致无法安装成功。有几点需要注意:

1、某些操作的登录用户角色限制;

2、某些文件或目录的权限设置;

3、某些文件或目录的存放路径;

4、尽量保证各主机环境、安装路径的一致性,便于更新安装操作。



附录A 相关技术术语

Producer :消息生产者,就是向kafka broker发消息的客户端。

Consumer :消息消费者,向kafka broker取消息的客户端。

Topic :可以理解为一个队列。

Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个CG只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。

Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。

Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

附录B 所需软件清单

[1] JDK- http://pan.baidu.com/s/1jGpeV0i

[2] zookeeper- http://pan.baidu.com/s/1i3vm9Tj

[3] kafka- http://pan.baidu.com/s/1hq5yb3I

附录D 参考资源索引

[1] 刘鹏. 实战Hadoop——开启通向云计算的捷径

[2] Sarath Lakshman. Linux Shell脚本攻略

[3] 高俊峰. 高性能Linux服务器构建实战

[4] 官网. http://kafka.apache.org/documentation.html
[5] 丰士昌. 最新Linux命令查询词典
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: