您的位置:首页 > 其它

kafka基本概念和操作

2017-11-27 00:00 183 查看
摘要: kafka基本概念和操作

一.kafka的基本概念:
Broker:安装了Kafka服务的机器就是一个Broker,(broker的ID必须唯一)
Producer:消息的生产者,负责数据主动写入到broker(push)
Consumer:消息的消费者,负责从Kafka中读取数据(pull),老版本的消费者需要依赖zk,新版本不需要依赖zk
Topic:消息的分类,不同topic存放不同的数据,对比数据库的table
Consumer Group:消费者组,一个Topic可以有多个消费者同时消费,但是多个消费者如果在一个消费者组中,那么它们不能重复消费数据

二:Kafka安装:
1.考虑Kafka和Spark版本兼容问题:
官网:Kafka: Spark Streaming 2.2.0 is compatible with Kafka broker versions 0.8.2.1 or higher.

spark-streaming-kafka-0-8spark-streaming-kafka-0-10
Broker Version0.8.2.1 or higher0.10.0 or higher
Api StabilityStableExperimental
Language SupportScala, Java, PythonScala, Java
Receiver DStreamYesNo
Direct DStreamYesYes
SSL / TLS SupportNoYes
Offset Commit ApiNoYes
Dynamic Topic SubscriptionNoYes
必须安装zookeeper:zk集群启动脚本

​​​​​​​#!/bin/sh
for i in {1,2,3}
do
ssh xupan00$i 'source /etc/profile;/usr/local/devtools/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start'
done


Kafka配置文件修改:注意broker.id全局唯一,别的机器要改

需要修改一个文件server.properties:
log.dirs : kafka保存数据的目录
num.partitions : log partitions per topic,每个topic的分区,一个分区对应一个文件
log.retention.hours=168 : 数据清除时间默认为7天
zookeeper.connect : zookeeper地址

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=xupan001

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/usr/local/devtools/kafka/kafka_2.10-0.8.2.1/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

zookeeper.connect=xupan001:2181,xupan002:2181,xupan003:2181


Kafka基本操作:

打开Kafka服务
kafka-server-start.sh -daemon config/server.properties
kafka-server-start.sh -daemon /usr/local/devtools/kafka/kafka_2.10-0.8.2.1/config/server.properties
kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

创建topic
kafka-topics.sh -zookeeper xupan001:2181,xupan002:2181,xupan003:2181 --create --topic test001 --replication-factor 3 --partitions 3

查看topic
kafka-topics.sh  -zookeeper xupan001:2181,xupan002:2181,xupan003:2181  --list

描述
kafka-topics.sh --describe --zookeeper  xupan001:2181,xupan002:2181,xupan003:2181 --topic test

删除topic
kafka-topics.sh --delete --zookeeper xupan001:2181,xupan002:2181,xupan003:2181 --topic test

生产者推送消息topic
bin/kafka-console-producer.sh --broker-list xupan001:9092,xupan002:9092,xupan003:9092 --topic test

--消费者消费topic       --from-beginning:从头开始消费,可选
./bin/kafka-console-consumer.sh --zookeeper xupan001:2181,xupan002:2181,xupan003:2181 --topic test --from-beginning
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Kafka