您的位置:首页 > 其它

Kafka-0.10.2.1使用独立zookeeper部署集群

2018-04-21 10:40 246 查看
原文链接:https://my.oschina.net/epoch/blog/1798831

下载zookeeper安装包、kafka安装包以及jdk的安装包

  • 本文档使用的JDK是1.8_131的版本

  • 本文档使用的zookeeper是3.4.11的版本,官方下载地址如下:download

  • 本文档使用的kafka是0.10.2.1的版本

示例中使用到的服务器相关参数以及各服务器上的相关组件:

主机IP

主机名称

操作系统

数据存储目录

安装路径

安装组件

192.168.174.101

kafka1

centos7 x64

/opt/kafka

/usr/local/kafka

kafka,kafka-monitor

192.168.174.102

kafka2

centos7 x64

/opt/kafka

/usr/local/kafka

kafka

192.168.174.103

kafka3

centos7 x64

/opt/kafka

/usr/local/kafka

kafka

 系统基础环境修改

本章节中的所有都需要在ROOT用户下进行操作。

关闭防火墙

•	检查防火墙状态
Centos6: [root@localhost~]# service iptables status
Centos7: [root@localhost ~]# systemctl status firewalld.service
如果显示状态不是iptables: Firewall is not running.则需要关闭防火墙
•	关闭防火墙
Centos6: [root@localhost ~]# service iptables stop
Centos7: [root@localhost ~]# systemctl stop firewalld
•	永久关闭防火墙
Centos6:[root@localhost ~]# chkconfig iptables off
Centos7:[root@localhost ~]# systemctl disable firewalld.service
•	再次检查防火墙状态
Centos6: [root@localhost ~]# service iptables status
Centos7: [root@localhost ~]# systemctl status firewalld.service
iptables: Firewall is not running.

关闭SELinux

因为centos的所有访问权限都是有SELinux来管理的,为了避免我们安装中由于权限关系而导致的失败,我们将其关闭,以后跟根据需要进行重新管理。以下给出关闭SELinux的操作命令(在ROOT用户下进行操作):

•	查看SElinux的状态
[root@localhost~]# /usr/sbin/sestatus –v
SELinux status:                 enabled
如果SELinux status参数为enabled即为开启状态,需要进行下面的关闭操作。
•	关闭SElinux
[root@localhost ~]# vim /etc/selinux/config
在文档中找到SELINUX,将SELINUX的值设置为disabled,即:
SELINUX=disabled
•	在内存中关闭SElinux
[root@localhost ~]# setenforce 0
•	检查内存中状态
[root@localhost ~]# getenforce
如果日志显示结果为disabled或者permissive,说明操作已经成功。

添加安装用户

为了避免有些现场可能以后会回收ROOT权限,我们最好在非ROOT用户下安装,如果在非ROOT用户下能成功,在ROOT下就不会失败。如果条件允许,使用ROOT也不是一件坏事。以下给出创建用户的操作指令:

# 创建用户名称
[root@localhost ~]# adduser kafka
# 为用户初始化密码
# 为这个用户初始化密码,linux会判断密码复杂度,不过可以强行忽略:
[root@localhost ~]# passwd kafka
更改用户 kafka 的密码 。
新的 密码:
无效的密码: 密码未通过字典检查 - 过于简单化/系统化
重新输入新的 密码:
passwd:所有的身份验证令牌已经成功更新。

为用户创建安装目录和数据目录

[root@localhost ~]# mkdir -p /opt/kafka/zkDataDir
[root@localhost ~]# mkdir -p /opt/kafka/logDir
[root@localhost ~]# mkdir -p /usr/local/kafka
# 将以上创建的文件夹的所有者交给kafka用户
[root@localhost ~]# chown -R kafka:kafka /opt/kafka
[root@localhost ~]# chown -R kafka:kafka /usr/local/kafka

修改主机名称

修改主机名的原因是为了方便我们记忆与管理,但这并不是主要原因,更主要的是防止kafka的内部实现机制要通过主机名来路由到该主机的IP上。(确保所有主机的hostname不相同)

Centos6:
[root@localhost ~]# vi /etc/hostname
将内容修改为新的hostname
Centos7:
[root@localhost ~]# hostnamectl set-hostname kafka
[root@localhost ~]# hostname kafka
执行完以上的命令退出,重新登录即可。

主机名中不能使用下划线(_)。

主机名不能保护大写字符

修改HOSTS

修改HOSTS的原因主要有两点:

1、 为了防止kafka内部实现机制通过主机名来进行对主机的访问。

2、 为了我们在配置过程中书写起来比较方便,看起来也一目了然。

这里要说明的是,我们配置HOSTS并不是只配置本机IP和主机的对应关系,而是我们规划中的每一台机器都要配置所有机器的IP和主机名称的对应关系。

修改HOSTS方法:

以下给出修改HOSTS的操作命令(在ROOT用户下进行操作):

修改/etc/hosts文件,在文件中添加规划中的所有主机的IP和主机名的对应关系。而且每一台机器都配置。

[root@kafka ~]# vi /etc/hosts

在该文件中添加以下格式的内容,该内容是我们规划中的所有主机的IP和主机名称,而且每一台机器的HOSTS中都要添加相同的内容,IP与主机名用一个TAB键隔开。

192.168.174.100     kafka

如果想要对个名称路由到同一个IP,我们只需要在后边继续添加即可,同样是使用TAB键隔开。例如:

192.168.186.101     es1  kafka1    spark1    hadoop1

配置免密登录

因为集群再启动中需要跨服务器启动,那么两台服务器之间就需要通过sshd服务进行通信。但是由于我们通过sshd服务进行通信时,每次都要输入密码,这样很麻烦,而且也不合理。如果集群中机器很多我们输入不过来,如果在启动中由于输入密码超时导致启动失败,如果我们集群中分配任务有需要输入密码又该怎么办。所以,配置免密登录是有必要的。

注意:配置免密登录是与用户相关的,如果再root用户下配置就是对root用户免密。所以,我们此时配置要使用kafka用户。但是,在配置免密之前要保证其他设备已经配置好以上的步骤,保证都已经存在kafka用户。

安装SSH服务

[kafka@kafka1 ~]# yum -y install ssh

设置开机启动

centos6: [kafka@kafka1 ~]# chkconfig sshd on
centos7: [kafka@kafka1 ~]# systemctl enable sshd.service

启动服务

[kafka@kafka1 ~]# systemctl start sshd.service

生成密钥

[kafka@kafka1 ~]# ssh_keygen –t rsa

一路回车,什么都不输入

配置免密设备

在当前操作的设备上执行以下命令,有多少台机器需要对当前设备免密就执行多少次,只需要将命令中的USER和IP修改为被免密设备的真实IP和用户名称。凡是选择yes/no的都输入yes,需要输入密 3ff7 码的地方都输入被免密设备中被免密用户的密码。

[spark@spark1~]# ssh-copy-id USER@IP yes PASSWORD 配置设备免密

[kafka@kafka1 ~]# ssh-copy-id kafka@192.168.174.101
[kafka@kafka1 ~]# ssh-copy-id kafka@192.168.174.102
[kafka@kafka1 ~]# ssh-copy-id kafka@192.168.174.103

JDK安装

因为kafka使用scala编写的,他可以运行在JDK1.8的运行环境中。所以在安装kafka之前一定要先安装JDK1.8或者是对应版本的scala。因为scala版本之间具有不兼容性,因此建议安装JDK。本示例中以在ROOT用户下安装JDK为例。

1、 卸载系统自带的open JDK

[root@localhost ~]# rpm -qa|grep jdk
# ***表示的是上述命令查询到的所有结果
[root@localhost ~]# rpm -e --nodeps ***

2、下载并上传JDK1.8的安装包

将压缩包上传到任意目录,本文以ROOT用户的主目录(~)下为例

3、 解压到相应的安装目录下

[root@kafka ~]# tar -zxvf jdk-8u131-linux-x64.tar.gz -C /usr/local/

4、 配置环境变量

将解压后的jdk的目录配置到环境变量中

[root@kafka ~]# vi /etc/profile
# 在该文件的末尾处添加以下内容
export JAVA_HOME=/usr/local/jdk1.8.0_131
export PATH=$JAVA_HOME/bin:$PATH

5、刷新环境变量

[root@kafka ~]# source /etc/profile

6、测试是否安装成功

在任意目录下执行一下命令

[root@kafka ~]# java -version

如果出现Java的版本信息证明安装成功,如果未出现,请检查环境变量中配置的路径是否正确。

Kafka部署

部署zookeeper

参照zookeeper部署文档

部署kafka

创建安装相关目录

sudo mkdir –p /opt/kafka    #创建项目目录
sudo mkdir –p /opt/kafka/data    #创建kafka消息目录,主要存放kafka消息

解压安装包

tar –zxvf kafka_2.11-1.0.0.tgz –C /opt/kafka

修改config/server.properties文件

cd /opt/kafka/kafka_2.11-1.0.0/config
主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

实际的修改项:
broker.id=0  每台服务器的broker.id都不能相同
host.name=192.168.174.116
在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
设置zookeeper的连接端口
zookeeper.connect=192.168.174.116:2181,192.168.174.117:2181,192.168.174.118:2181

常用配置项解释

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.174.116:2181,192.168.174.117:2181,192.168.174.118:2181 #设置zookeeper的连接端口

启动kafka服务

启动服务
从后台启动Kafka集群(每一台都需要启动)
cd /opt/kafka/kafka_2.11-0.9.0.1//bin #进入到kafka的bin目录
./kafka-server-start.sh -daemon ../config/server.properties
检查服务是否启动
执行命令jps
20348 Jps4233 QuorumPeerMain18991 Kafka

停止kafka服务

停止所有kafka
进入[kafka安装目录]\bin
./kafka-server-stop.sh
停止所有zookeeper
./zkServer.sh stop

安装kafka-monitor监控

将KafkaOffsetMonitor-assembly-0.2.0.jar放到服务器的任意目录上

执行以下启动命令
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk 192.168.174.116:2181,192.168.174.117:2181,192.168.174.118:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days &

 

转载于:https://my.oschina.net/epoch/blog/1798831

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: