您的位置:首页 > 运维架构

Kafka快速上手教程 1

2017-05-18 10:04 656 查看


Kafka 介绍与实践


1.1 实验内容

本节课将介绍 Kafka 及实现原理,然后完整搭建,案例演示,学习完本课程,你将对 kafka 有深入的了解,很快上手。


1.2 课程来源

参考资料: http://kafka.apache.org/documentation.html


1.3. 实验知识点

生产者/消费者模型
单机/集群的区别
设计原理


1.4 实验环境

Hadoop 2.6.1
kafka_2.10-0.8.1.1
Xfce 终端


1.5 适合人群

本课程属于中等难度级别,适合具有 hadoop 基础的用户,如果对分布式文件系统 了解能够更好的上手本课程。


二、kafka 入门


2.1 简介

Kafka 是 linkedin 用于日志处理的分布式消息队列,同时支持离线和在线日志处理。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer,消息接受者成为 Consumer,此外 kafka 集群由多个 kafka 实例组成,每个实例 (server) 称为 broker。无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性,为集群保存一些 meta 信息。




2.2 Kafka应用场景

1. 日志收集


日志收集方面,其实开源产品有很多,包括 Scribe、Apache Flume。很多人使用 Kafka 代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或 HDFS)进行处理。然而 Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让 Kafka 处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如 Scribe 或者 Flume 来说,Kafka 提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

2. 行为跟踪


Kafka 的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的 topic 里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop 离线数据仓库里处理。

3. 持久性日志(commit log)


Kafka 可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka 中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka 类似于 Apache BookKeeper 项目。


2.3 Kafka基本概念

Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。

Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。

Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。

Producers:消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers。

Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。

Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。


三、设计原理

Kafka 的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力。


3.1 Kafka 的 Topics/Log

一个Topic 可以认为是一类消息,每个 topic 将被分成多 partition (区),每个partition在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称 offset(偏移量),partition 是以文件的形式存储在文件系统中。Logs 文件根据 broker 中的配置要求,保留一定时间后删除来释放磁盘空间。

Partition


Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。 partition 中的每条消息都会被分配一个有序的 id(offset)。




3.2 Kafka的储存策略

kafka以 topic 来进行消息管理,每个 topic 包含多个 partition,每个 part 对应一个逻辑 log,有多个 segment 组成。

每个 segment 中存储多条消息(见下图),消息 id 由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

broker 收到发布消息往对应 partition 的最后一个 segment 上添加该消息。

每个 part 在内存中对应一个index,记录每个 segment 中的第一条消息偏移。

发布者发到某个 topic 的 消息会被均匀的分布到多个 part 上(随机或根据用户指定的回调函数进行分布),broker 收到发布消息往对应 part 的最后一个 segment 上添加 该消息,当某个 segment 上的消息条数达到配置值或消息发布时间超过阈值时,segment 上的消息会被 flush 到磁盘,只有 flush 到磁盘上的消息订阅者才能订阅到,segment 达到一定的大小后将不会再往该 segment
写数据,broker 会创建新的 segment。




3.3 Kafka的消息发送的流程

由于 kafka broker 会持久化数据,broker 没有内存压力,因此,consumer 非常适合采取 pull 的方式消费数据 Producer 向 Kafka(push)推数据,consumer 从 kafka 拉(pull)数据。




3.4 Kafka 的 Zookeeper 协调控制

管理 broker 与 consumer 的动态加入与离开。
触发负载均衡,当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的订阅负载平衡。
维护消费关系及每个 partion 的消费信息。

Zookeeper上的细节


每个 broker 启动后会在 zookeeper 上注册一个临时的 broker registry,包含 broker 的 ip 地址和端口号,所存储的 topics 和 partitions 信息。

每个 consumer 启动后会在 zookeeper 上注册一个临时的 consumer registry:包含 consumer 所属的consumer group 以及订阅的 topics。

每个 consumer group 关 联一个临时的 owner registry 和一个持久的 offset registry。对于被订阅的每个 partition 包含一个 owner registry,内容为订阅这个 partition 的 consumer id;同时包含一个 offset registry,内容为上一次订阅的 offset。


四、安装部署

集群方式
:单节点单 broker,单节点多 broker,多节点多 broker。


4.1 准备工作

我们已经在实验楼环境里下载并配置启动 hadoop-2.6.1 所需的文件,免除您配置文件的麻烦,您可以在 
/opt
 找到,只需格式化并启动
hadoop 进程即可。

双击打开桌面上的 Xfce 终端,用 
sudo
 命令切换到 hadoop 用户,hadoop 用户密码为 hadoop,用 
cd
 命令进入 
/opt
目录。
$ su hadoop
$ cd /opt/




在 
/opt
 目录下格式化 hadoop。
$ hadoop-2.6.1/bin/hdfs namenode -format




在 
/opt
 目录下启动 hadoop 进程。
$ hadoop-2.6.1/sbin/start-all.sh




用 
jps
 查看 hadoop 进程是否启动。




4.2 下载 kafka 及解压

你可以通过下面命令将 Kafka 下载到实验楼环境中,作为参照对比进行学习。
$ sudo wget http://labfile.oss.aliyuncs.com/courses/785/kafka_2.10-0.8.1.1.tgz


在 /opt 目录下,用 
tar
 命令解压下载的 Kafka。




4.3 单节点单 broker

Kafka 用到了 Zookeeper,所有首先启动 Zookeeper,下面简单的启用一个单实例的 Zookeeper 服务。可以在命令的结尾加个 & 符号,这样就可以启动后离开控制台。

注意:kafka 自带了 zookeeper,为了方便,这里我们直接使用,当然也可以使用自己下载的 zookeeper*.tar.gz

#权限不足,授权
hadoop@945f39ae074b:/opt$ sudo chmod 777 -R kafka_2.10-0.8.1.1
#启动自带的zookeeper
hadoop@945f39ae074b:/home/shiyanlou$ cd /opt/kafka_2.10-0.8.1.1/
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  bin/zookeeper-server-start.sh  config/zookeeper.properties &
#查看进程




启动 kafka 服务器。
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  bin/kafka-server-start.sh  config/server.properties  >/dev/null 2>&1 &


查看 kafka 进程。



创建一个叫做“test”的 topic,它只有一个分区,一个副本。
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


可以通过 
list
 命令查看创建的 topic:



查看一个 topic 的分区及副本状态信息。



启动 producer:
运行 producer 并在控制台中输一些消息,这些消息将被发送到服务端。

hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test



启动 consumer:
接着再开启一个终端,运行 consumer 可以读取消息并输出到标准输出。

hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning



边生产边消费。




4.4 单节点多 broker

4.3
节只是启动了单个 broker,现在启动由3个 broker 组成的集群,这些 broker 节点也都是在本机上的,首先为每个节点编写配置文件。
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ cp config/server.properties  config/server-1.properties
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ cp config/server.properties  config/server-2.properties
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$




注意: broker.id 在集群中唯一的标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。

在拷贝出的新文件中添加以下参数:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2


hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ sudo vi config/server-1.properties





hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ sudo vi config/server-2.properties





刚才已经启动一个 Zookeeper 和一个节点,现在启动另外两个节点,只需要在开启的两个终端做同样的操作,如下:
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server-1.properties &
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-server-start.sh config/server-2.properties &






创建一个拥有3个副本的 topic:

hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test2



单节点多 broker 搭建完毕,那么现在怎么知道每个节点的信息呢?运行“describe topics”命令就可以。

hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test2



下面解释一下这些输出,第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。

leader:
负责处理消息的读和写,leader 是从所有节点中随机选择的。

replicas:
列出了所有的副本节点,不管节点是否在服务中。

isr:
是正在服务中的节点。

在我们的例子中,节点2是作为 leader 运行。启动生产者向 topic 发送消息:
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2


生产者生产消息。



启动消费者,接收消息。
hadoop@945f39ae074b:/opt/kafka_2.10-0.8.1.1$  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test2


消费者消费消息。




4.5 多节点 broker

介绍了上面两种配置方法,再理解集群配置就简单了,zookeeper 配置文件(zookeeper.properties)不变

broker 的配置配置文件(server.properties):按照单节点多实例配置方法在一个节点上启动一个或多个实例,不同的地方是 zookeeper 的连接串需要把所有节点的 zookeeper 都连接起来,然后复制 kafka 到从机,在所有节点启动 Kafka broker 即可。

注意:由于实验环境的限制,多节点 broker 在此不演示了,有条件的同学可以课下练习。


可能遇到的问题。




解决办法。

首先检查
server.properties
文件的
broker.id
端口号
是否唯一,

必要时用 
kill
 命令杀死 kafka 进程,重启 kafka 服务器。




五、实验总结

本课程首先简单讲解 kafka 理论,接着讲解安装的三种模式,并实践操作,对深入学习 kafka 有很大帮助。


六、参考文献

http://kafka.apache.org/documentation.html。
http://blog.sina.com.cn/s/blog_5c51172c0102uxb0.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka hadoop