您的位置:首页 > 其它

Spark Streaming on Kafka解析和安装实战

2016-08-21 11:41 375 查看
本博文内容主要包括以下几点:

1、Kafka解析;

2、Kafka的安装和实战。

一、Kafka的概念、架构和用例场景:

1、Kafka的概念:

Apache Kafka是分布式发布-订阅消息系统。

它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

kafka对消息保存时根据Topic(及发送消息内容)进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

Kafka就是这样的通信组件,将不同对象组件粘合起来的纽带,且是解耦合方式传递数据。

2,为何使用消息系统:

(1)解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

(2)冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

(3)扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

(4)灵活性 & 峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

(5)可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

(6)顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

(7)缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

(8)异步通信

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Apache Kafka与传统消息系统相比,有以下不同的特点:

(1)分布式系统,易于向外扩展;

(2)在线低延迟,同时为发布和订阅提供高吞吐量;

(3)将消息存储到磁盘,因此可以处理1天甚至1周前内容

3、Kafka的架构:



Kafka既然具备消息系统的基本功能,那么就必然会有组成消息系统的组件:Topic,Producer和Consumer。Kafka还有其特殊的Kafka Cluster组件。

Topic主题:

代表一种数据的类别或类型,工作、娱乐、生活有不同的Topic,生产者需要说明把说明数据分别放在那些Topic中,里面就是一个个小对象,并将数据数据推到Kafka,消费者获取数据是pull的过程。一组相同类型的消息数据流。这些消息在Kafka会被分区存放,并且有多个副本,以防数据丢失。每个分区的消息是顺序写入的,并且不可改写。



Producer(生产者):把数据推到Kafka系统的任何对象。

Kafka Cluster(Kafka集群):把推到Kafka系统的消息保存起来的一组服务器,也叫Broker。因为Kafka集群用到了Zookeeper作为底层支持框架,所以由一个选出的服务器作为Leader来处理所有消息的读和写的请求,其他服务器作为Follower接受Leader的广播同步备份数据,以备灾难恢复时用。

Consumer(消费者):从Kafka系统订阅消息的任何对象。

消费者可以有多个,并且某些消费者还可以组成Consumer Group。多个Consumer Group之间组成消息广播的关系,所以各个Group可以拉相同的消息数据。在Consumer Group内部,各消费者之间对Consumer Group拉出来的消息数据是队列先进先出的关系,某个消息数据只能给该Group的一个消费者使用。



数据传输基于kernel(内核)级别的(传输速度接近0拷贝-ZeroCopy)、没有用户空间的参与。Linux本身是软件,软件启动时第一个启动进程叫init,在init进程启动后会进入用户空间;例如:在分布式系统中,机器A上的应用程序需要读取机器B上的Java服务数据,由于Java程序对应的JVM是用户空间级别而且数据在磁盘上,A上应用程序读取数据时会首先进入机器B上的内核空间再进入机器B的用户空间,读取用户空间的数据后,数据再经过B机器上的内核空间分发到网络中,机器A网卡接收到传输过来的数据后再将数据写入A机器的内核空间,从而最终将数据传输给A的用户空间进行处理。如下图:



外部系统从Java程序中读取数据,传输给内核空间并依赖网卡将数据写入到网络中,从而把数据传输出去。其实Java本身是内核的一层外衣,Java Socket编程,操作的各种数据都是在JVM的用户空间中进行的。而Kafka操作数据是放在内核空间的,通常内核空间处理数据的速度比用户空间快上万倍,所以通过kafka可以实现高速读、写数据

二:Kafka的安装:

1、最详细的Zookeeper安装和配置

Kafka集群模式需要提前安装好Zookeeper。

- 提示:Kafka单例模式不需要安装额外的Zookeeper,可以使用内置的Zookeeper。

- Kafka集群模式需要至少3台服务器。本课实战用到的服务器Hostname:master,slave1,slave2。

- 本博文中用到的Zookeeper版本是Zookeeper-3.4.6。

进入http://www.apache.org/dyn/closer.cgi/zookeeper/,你可以选择其他镜像网址去下载,用官网推荐的镜像:http://mirror.bit.edu.cn/apache/zookeeper/提示:下载官网里的的zookeeper-3.4.6.tar.gz 安装文件。

1) 安装Zookeeper

提示:下面的步骤发生在master服务器。

以ubuntu14.04举例,把下载好的文件放到root/Downloads目录,用下面的命令解压:

cd Downloads

tar -zxvf zookeeper-3.4.6.tar.gz

解压后在Downloads目录会多出一个zookeeper-3.4.6的新目录,用下面的命令把它剪切到指定目录即安装好Zookeeper了:

cd Downloads

mv zookeeper-3.4.6 /usr/local/spark

之后在/usr/local/spark目录会多出一个zookeeper-3.4.6的新目录。下面我们讲如何配置安装好的Zookeeper。

2) 配置Zookeeper

提示:下面的步骤发生在master服务器。

配置.bashrc

- 打开文件:vi ~/.bashrc

在PATH配置行前添加:

export ZOOKEEPER_HOME=/usr/local/spark/zookeeper-3.4.6

最后修改PATH:

export PATH=/usr/local/eclipse:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${SCALA_HOME}/bin:${HIVE_HOME}/bin:${FLUME_HOME}/bin:$PATH


使配置的环境变量立即生效:source ~/.bashrc

3)创建data目录

- cd $ZOOKEEPER_HOME

mkdir data

4)创建并打开zoo.cfg文件

- cd $ZOOKEEPER_HOME/conf

cp zoo_sample.cfg zoo.cfg

vi zoo.cfg

5)配置zoo.cfg,在zoo.cfg的文档中输入下面的内容:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
#dataDir=/tmp/zookeeper
# the port at which the clients will connect
#clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#http://zookeeper.apache.org/doc/current/zookeeperAdmin.htmlsc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
# 配置Zookeeper的日志和服务器身份证号等数据存放的目录。
# 千万不要用默认的/tmp/zookeeper目录,因为/tmp目录的数据容易被意外删除。
dataDir=../data
# Zookeeper与客户端连接的端口
clientPort=2181
# 在文件最后新增3行配置每个服务器的2个重要端口:Leader端口和选举端口
# server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;
# B 是这个服务器的hostname或ip地址;
# C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
# D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,
# 选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
# 如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信
# 端口号不能一样,所以要给它们分配不同的端口号。

server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888


6)创建并打开myid文件

- cd $ZOOKEEPER_HOME/data

- touch myid

- vi myid

配置myid

按照zoo.cfg的配置,myid的内容就是1。

7) 同步master的安装和配置到slave1和slave2

在master服务器上运行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave1:/usr/local/spark

scp -r ./zookeeper-3.4.6 root@slave2:/usr/local/spark

在slave1服务器上运行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的内容就是2。

在slave2服务器上运行下面的命令

vi $ZOOKEEPER_HOME/data/myid

按照zoo.cfg的配置,myid的内容就是3。

8) 启动Zookeeper服务

在master服务器上运行下面的命令

zkServer.sh start

在slave1服务器上运行下面的命令

source /root/.bashrc

zkServer.sh start

在slave1服务器上运行下面的命令

source /root/.bashrc

zkServer.sh start

在slave2服务器上运行下面的命令

source /root/.bashrc

zkServer.sh start

5) 验证Zookeeper是否安装和启动成功

在master服务器上运行(1)命令jps:



(2)zkServer.sh status命令:



在slave1服务器上运行命令:jps和zkServer.sh status

source /root/.bashrc



在slave2服务器上运行命令:jps和zkServer.sh status ……

至此,代表Zookeeper已经安装和配置成功。

2,Kafka的安装配置

本博文中用到的Kafka版本是Kafka-2.10-0.9.0.1。

1) 下载Kafka

进入http://kafka.apache.org/downloads.html,左键单击kafka_2.10-0.9.0.1.tgz。下载的为http://mirrors.hust.edu.cn/apache/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

1) 安装Kafka

提示:下面的步骤发生在master服务器。

以ubuntu14.04举例,把下载好的文件放到/root目录,用下面的命令解压:

cd /root

tar -zxvf kafka_2.10-0.9.0.1.tgz

解压后在/root目录会多出一个kafka_2.10-0.9.0.1的新目录,用下面的命令把它剪切到指定目录即安装好Kafka了:

cd /root

mv kafka_2.10-0.9.0.1 /usr/local

之后在/usr/local目录会多出一个kafka_2.10-0.9.0.1的新目录。下面我们讲如何配置安装好的Kafka。

2) 配置Kafka

提示:下面的步骤发生在master服务器。

(1)配置.bashrc

- 打开文件:vim ~/.bashrc

在PATH配置行前添加:

export KAFKA_HOME=/usr/local/kafka_2.10-0.9.0.1


最后修改PATH:

export PATH=/usr/local/eclipse:${JAVA_HOME}/bin:${KAFKA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${SPARK_HOME}/bin:${SPARK_HOME}/sbin:${SCALA_HOME}/bin:${HIVE_HOME}/bin:${FLUME_HOME}/bin:$PATH


使配置的环境变量立即生效:source ~/.bashrc

(2)配置server.properties:

cd $KAFKA_HOME/config

vi server.properties

配置server.properties,初步修改为下方这样,后期将会进行跟复杂的修改,已达到性能最优情况:

broker.id=0
port=9092
zookeeper.connect=master:2181,slave1:2181,slave2:2181


3) 同步master的安装和配置到slave1和slave2

在master服务器上运行下面的命令

cd /root

scp ./.bashrc root@slave1:/root

scp ./.bashrc root@slave2:/root

cd /usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave1:/usr/local

scp -r ./kafka_2.10-0.9.0.1 root@slave2:/usr/local

在slave1服务器上运行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=1。

在slave2服务器上运行下面的命令

vi $KAFKA_HOME/config/server.properties

修改broker.id=2。

4) 启动Kafka服务

在master服务器上运行下面的命令

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

在slave1服务器上运行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

在slave2服务器上运行下面的命令

source /root/.bashrc

cd $KAFKA_HOME/bin

kafka-server-start.sh ../config/server.properties &

5) 验证Kafka是否安装和启动成功

在任意服务器上运行命令创建Topic“HelloKafka”:

kafka-topics.sh –create –zookeeper master:2181,slave1:2181,slave2:2181 –replication-factor 3 –partitions 1 –topic HelloKafka

在任意服务器上运行命令为创建的Topic“HelloKafka”生产一些消息:

kafka-console-producer.sh –broker-list master:9092,slave1:9092,slave2:9092 –topic HelloKafka

输入下面的消息内容:



在任意服务器上运行命令从指定的Topic“HelloKafka”上消费(拉取)消息:

kafka-console-consumer.sh –zookeeper master:2181,slave1:2181,slave2:2181 –from-beginning –topic HelloKafka

过一会儿,你会看到打印的消息内容:



在任意服务器上运行命令查看所有的Topic名字:

kafka-topics.sh –list –zookeeper master:2181,slave1:2181,slave2:2181

在任意服务器上运行命令查看指定Topic的概况:

kafka-topics.sh –describe –zookeepermaster:2181,slave1:2181,slave2:2181 –topic HelloKafka

至此,代表Kafka已经安装和配置成功。

博文内容源自DT大数据梦工厂Spark课程。相关课程内容视频可以参考:

百度网盘链接:http://pan.baidu.com/s/1slvODe1(如果链接失效或需要后续的更多资源,请联系QQ460507491或者微信号:DT1219477246 获取上述资料)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐