您的位置:首页 > 其它

Storm+kafka的HelloWorld初体验

2016-04-11 23:00 218 查看
从16年4月5号开始学习kafka,后来由于项目需要又涉及到了storm。

经过几天的扫盲,到今天16年4月13日,磕磕碰碰的总算是写了一个kafka+storm的HelloWorld的例子。

为了达到前人栽树后人乘凉的知识共享的目的,我尝试着梳理一下过程。

====实例需求

由kafka消息队列源源不断生产数据,然后由storm进行实时消费。

大家可以设想这些数据源是不同商品的用户行为操作行为,我们是不是就可以实时观测到用户关注商品的热点呢?

====环境准备

(1)Linux:

公司暂时没有多余的Linux主机,所以我只能在自己的电脑上建立的3台Linux虚拟机。

虚拟机的建立方法我做了一个小白级别的手册,按照这个手册就可以建立起虚拟机了。

百度云连接地址:http://pan.baidu.com/s/1hr3lVqG

(2)JDK:

我这里使用的是:jdk-7u80-linux-x64.tar.gz。

在官方网站上下载,然后配置环境变量即可。

(3)zookeeper集群:

搭建方法省略。可以参照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html

(4)kafka:

搭建方法省略。可以参照我的博客:http://www.cnblogs.com/quchunhui/p/5356511.html

(5)storm:

我这里使用的版本是相对稳定的:apache-storm-0.9.5.tar.gz

搭建方法省略,可以参照我的博客:http://www.cnblogs.com/quchunhui/p/5370191.html

(6)Maven:

开发环境的构建使用Maven。我这里使用的版本是:apache-maven-3.3.3.zip

Maven的入门可以参考我的博客:http://www.cnblogs.com/quchunhui/p/5359293.html

补充一下环境变量配置之后的图,以供小白参考。



====程序执行方式

(1)kafka:

需要手动编写kafka的生产者程序,然后通过eclipse等工具在Windows端启动,以达到生产消息的目的。

(2)storm:

可以进行两种方式的启动。一种是通过eclipse等工具在Windows端启动(俗称本地模式)

另一种是将storm的消费者程序打成jar包发布到Linux环境上,通过Linux启动程序进行消费(俗称集群模式)。

====Storm框架前期理解

从某位大神的QQ群组里下载了一篇关于storm的基本框架以及安装的文章

我这里共享到了我的百度云盘上了,请大家在开始编程之前一定要看看。非常值得一看。

百度云地址:http://pan.baidu.com/s/1boRcCeb

那么后面我们就可以开始编写我们的程序了。首先需要编写的是kafka的生产者程序。

====kafka程序相关:

我已经写好的代码共享到了Github上了:https://github.com/quchunhui/kafkaSample/

这里只对目录结构以及重要部分进行说明:

(1)src/main路径结构如下:

+---common

| Constants.java //这里统一定义了所有的常量,修改配置的时候只修改这里就可以。
|
+---consumer
| +---group
| | GroupConsumer.java //kafka消费者程序。消费模型:分组消费
| |
| \---partition
| PartitionConsumer.java //kafka消费者程序。消费模型:分区消费
|
+---producer
| +---async
| | AsyncProduce.java //kafka生产者程序。生产模型:异步生产(本次实例相关)
| |
| +---partiton
| | SimplePartitioner.java //message的序列化类
| |
| \---sync
| SyncProduce.java //kafka生产者程序。生产模型:同步生产
|
\---utilities
CommonUtil.java //共通方法类。

(2)实例所用的代码:

本次实例中,仅仅使用了kafka进行消息的生产,同事考虑到异步生产性能更高一些,

本次实例中使用了异步生产的代码,就是上面红色字标记的java程序(AsyncProduce.java)。

代码本身比较简单,其中下面红色框的部分为【异步】的配置项,需要注意。

各个配置项的说明请参考我的另一篇博客:http://www.cnblogs.com/quchunhui/p/5357040.html



====Storm程序相关:

(1)拓扑设计



【消息源(RandomSentenceSpout)】

接入到从上面的kafka消息队列中,将kafka作为storm的消息源。

【数据标准化(WordNormalizerBolt)】

然后使用一个Bolt进行归一化(语句切分),句子切分成单词发射出去。(代码更新中。。。)

【词频统计(WordCountBolt)】

使用一个Bolt接受订阅切分的单词Tuple,进行单词统计,并且选择使用按字段分组的策略,词频实时排序,把TopN实时发射出去。(代码更新中。。。)

【工具类(PrintBolt)】

最后使用一哥Bolt将结果打印到Log中。(代码更新中。。。)

====实例代码

我自己进行验证用的代码已经上传到Github上了,可以直接下载下来使用。

这里只对代码的目录结构以及需要格外关注的点进行一些补充。

Git地址:https://github.com/quchunhui/storm-kafka-plus-qch

(1)目录结构

src\main\java\com\dscn\helloworld

|
| WordCountTopology.java // Topology代码,程序入口,使用eclipse是需要执行该程序。
|
+---bolt
| PrintBolt.java // 上面讲到的工具类(PrintBolt)类
| WordCountBolt.java // 上面讲到的词频统计(WordCountBolt)类
| WordNormalizerBolt.java // 上面讲到的数据标准化(WordNormalizerBolt)类
|
\---spout
RandomSentenceSpout.java // 未使用

(2)重要代码说明

由于源代码已经共享给大家了,Storm的接口的用法在下面的篇幅中单独罗列了一下,我这里不进行过多的阐述。

在这里只将我碰到过的问题罗列出来、以问题&解决方法的形式分享。

【问题1】

storm是如何实现与kafka的对接的

【回答】

Spout作为storm的消息源的接入点,在这里可以同构设置Storm官方提供【storm.kafka.SpoutConfig】类来指定消息源。



----------------

//配置zookeeper集群地址,毕竟storm是需要集群支持的。

BrokerHosts brokerHosts = new ZkHosts("192.168.93.128:2181,192.168.93.129:2181,192.168.93.130:2181");

//配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "qchlocaltest", "", "topo");

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取
spoutConfig.forceFromStart = true;

//zookeeper集群host

spoutConfig.zkServers = Arrays.asList(new String[] {"192.168.93.128", "192.168.93.129", "192.168.93.130"});

//zookeeper集群port

spoutConfig.zkPort = 2181;

----------------

【问题2】

我尝试着自己重新写代码配置开发环境(不是直接使用Github上的代码),

编译时可以正常通过的,但是本地模式通过eclipse启动Topology的时候,出现了log4j和slf4j的冲突问题。

【解决方法】

问题原因是由于log4j和slf4j之间的重复调用,导致死循环而致使内存溢出。

解决办法就是log4j和slf4j保留一个, 普遍上都是保留slf4j的。

需要在Maven的pom.xml上将log4j的相关依赖移除。

移除方法:



可以通过【mvn dependency:tree】命令来查看修改之后的依赖关系。

如果发现需要移除的包的时候,使用Maven的【exclusion】标签来移除依赖关系。

填写exclusion标签的时候,下图中红色的部分是groupId,蓝色的部分是artifactId。



【问题3】

使用mvn install命令将程序打jar包上传到Linux的storm目录下,然后使用命令

[storm jar test-0.1-jar-with-dependencies.jar com.dscn.helloworld.WordCountTopology 192.168.93.128]

启动Topology的时候,出现了下面的提示错误。



【解决方法】

是Maven的pom.xml的配置出现了问题。详细请参考博客:http://blog.csdn.net/luyee2010/article/details/18455237

修改方法就是强storm的scope修改为provided。如下图所示:



【问题4】

将代码放到实际的集群运行环境(kafka+storm+hbase)中,发现storm接受不到消息。

【原因】

一直以来都是使用kafka的异步生产来生产消息,以为都正常的生产消息了。由于异步生产的时候,并没有消息确认机制,

所以不能确保消息是否正确的进入到了消息队列之中,改用同步生产的代码尝试了一下,果然发生了一下的错误。



【解决办法】

通过网上搜索[kafka Failed to send messages]关键字,发现有可能是需要设置advertised.host.name这个属性。

抱着尝试一下的心态试了一下,果然好使了。至于这个属性的真正意义还有待探索。(TODO)



【问题5】

代码在本地的时候好好的,通过storm jar命令发布到集群环境的时候,发生了Jar包冲突的问题。



【解决方法】

本来是认为自己的Maven环境的依赖有问题,也通过mvn dependency:tree查看了依赖关系,毫无问题。根本就诶有log4j-over-slf4j.jar这个包。

头疼了很久,通过QQ群咨询了一些朋友,他们建议我确认集群环境中storm/lib下是否存在log4j-over-slf4j.jar,如果存在就把它删掉。

尝试了一下之后,果然好使了。原来是我的程序的jar包和集群环境中会有冲突。详细请参考我的另一篇博客:

http://www.cnblogs.com/quchunhui/p/5404168.html

====Storm接口详解:

【IComponent接口】

Spout和Bolt都是其Component。所以,Storm定义了一个名叫IComponent的总接口。



IComponent的继承关系如下图所示:



绿色部分是我们最常用、比较简单的部分。红色部分是与事务相关。

BaseComponent 是Storm提供的“偷懒”的类。为什么这么说呢,它及其子类,都或多或少实现了其接口定义的部分方法。

这样我们在用的时候,可以直接继承该类,而不是自己每次都写所有的方法。

但值得一提的是,BaseXXX这种定义的类,它所实现的方法,都是空的,直接返回null。

【Spout】

类图如下图所示:



接口如下图所示:



各个接口说明:

①、open方法:

是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。

②、close方法

在该spout关闭前执行,但是并不能得到保证其一定被执行。

spout是作为task运行在worker内,在cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。

而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。

③、activate和deactivate方法 :

一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。

④、nextTuple方法:

负责消息的接入,执行数据发射。是Spout中的最重要方法。

⑤、ack(Object)方法:

传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。

⑥、fail(Object)方法:

同ack,只不过是tuple处理失败时执行。

我们的RandomSpout由于继承了BaseRichSpout,

所以不用实现close、activate、deactivate、ack、fail和getComponentConfiguration方法,只关心最基本核心的部分。

结论:

通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。

【Bolt】

类图如下图所示:



这里可以看到一个奇怪的问题: 为什么IBasicBolt并没有继承IBolt? 我们带着问题往下看。

IBolt定义了三个方法:



①、prepare方法:

IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体执行的worker上去。

worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文。

②、execute方法:

接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。

③、cleanup方法:

同ISpout的close方法,在关闭前调用。同样不保证其一定执行。

红色部分(execute方法)是Bolt实现时一定要注意的地方。

而Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。

如果你确实要反馈失败,可以抛出FailedException。

我们来再写一个Bolt继承BaseRichBolt替代ExclaimBasicBolt。代码如下:



修改topology



运行下,结果一致。

结论:
通常情况下,实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,
如果不想自己处理结果反馈,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做掉了prepare方法和collector.emit.ack(inputTuple);

====推荐博客:

【整合实战类】:
http://shiyanjun.cn/archives/934.html http://www.tuicool.com/articles/NzyqAn http://itindex.net/detail/51477-storm-笔记-kafka
【问题解决类】:
http://www.aboutyun.com/thread-12590-1-1.html
【Storm调优类】:
http://blog.csdn.net/derekjiang/article/details/9040243 http://www.51studyit.com/html/notes/20140329/45.html
--END--
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: