您的位置:首页 > 运维架构 > Linux

Kafka Centos安装指南

2017-04-11 00:00 489 查看
kafka安装
1、jdk
2、zookeeper
启动zookeeper

3、kafka
mkdir /opt/kafka
cd /opt/kafka
wget http://mirrors.cnnic.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz 速度很快,如果不行就自己下载后,上传
tar zxvf kafka_2.11-0.10.0.1.tgz
cd /opt/kafka/kafka_2.11-0.10.0.1/

启动

也可用自带zk(bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &


查看已创建的topic
bin下
./kafka-topics.sh --list --zookeeper localhost:2181
删除已创建的topic
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic __consumer_offsets

创建topic
sh kafka-topics.sh --create --topic lli-topic1 --replication-factor 1 --partitions 1 --zookeeper localhost:2181

查看指定topic信息
sh kafka-topics.sh --zookeeper localhost:2181 --describe --topic lli-topic1

kafka生产者客户端命令
./kafka-console-producer.sh --broker-list localhost:9092 --topic lli-topic1

kafka消费者客户端命令
./kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic lli-topic1

查看consumer组内消费的offset
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic lli-topic1

单机本地跑程序没问题,机器A访问机器B上的kafka不行,
原因是:kafka服务器配置问题

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092 设置对外IP
listeners=PLAINTEXT://192.168.233.128:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
# advertised.listeners=PLAINTEXT://192.168.233.128:9092

broker.id=0
listeners=PLAINTEXT://172.16.49.173:9092
log.dirs=kafka-logs
zookeeper.connect=localhost:2181
备注:listeners一定要配置成为IP地址;如果配置为localhost或服务器的hostname,在使用Java发送数据时就会抛出异 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因为在没有配置advertised.host.name 的情况下,Kafka并没有像官方文档宣称的那样改为广播我们配置的host.name,而是广播了主机配置的hostname。远端的客户端并没有配置 hosts,所以自然是连接不上这个hostname的

server.properties 配置
############################# Server Basics #############################
# 唯一标识一个broker.
broker.id=1
############################# Socket Server Settings #############################
#绑定服务监听的地址和端口,要填写hostname -i 出来的地址,否则可能会绑定到127.0.0.1,producer可能会发不出消息
listeners=PLAINTEXT://172.23.8.144:9092
#broker对producers和consumers服务的地址和端口,如果没有配置,使用listeners的配置,本文没有配置该项
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 处理网络请求的线程数
num.network.threads=3
# 处理磁盘I/O的线程数
num.io.threads=8
# socket server的发送buffer大小 (SO_SNDBUF)
socket.send.buffer.bytes=102400
# socket server的接收buffer大小 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
#一个请求的最大size,用来保护防止oom
socket.request.max.bytes=104857600
############################# Log Basics #############################
#存放日志和消息的目录,可以是用逗号分开的目录,同样不推荐使用/tmp
log.dirs=/usr/local/services/kafka/kafka-logs
#每个topic默认partitions的数量,数量较大表示消费者可以有更大的并行度。
num.partitions=2
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#日志的过期时间,超过后被删除,单位小时
log.retention.hours=168
#一个日志文件最大大小,超过会新建一个文件
log.segment.bytes=1073741824
#根据过期策略检查过期文件的时间间隔,单位毫秒
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#Zookeeper的连接配置,用逗号隔开,也可以用172.23.8.59:2181/kakfa这样的方式指定kafka数据在zk中的根目录
zookeeper.connect=172.23.8.144:2181,172.23.8.179:2181,172.23.8.59:2181
# 连接zk的超时时间
zookeeper.connection.timeout.ms=6000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: