您的位置:首页 > 产品设计 > UI/UE

storm 开发系列三 Clojue编写程序读取kafka数据并写入到hdfs

2015-10-30 22:49 686 查看
要说storm程序最常用的数据源,自然是kafka,storm通常用来进行各种实时统计,但是也会顺带将从kafka读取的数据顺带写入hdfs,根据我的经验,这几乎是必须的功能。
所以本次程序就是实现读取kafka数据,然后写入hdfs。但是最大的不同是,这是clojure版本,而不是java版本。
下面分别说明

project配置

project.clj文件包含了依赖项:
(defproject kafka2hdfs "0.1.0-SNAPSHOT"
:description "demo to show read from kafka and write to hdfs"
:url "http://blog.csdn.net/csfreebird"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[org.apache.kafka/kafka_2.10 "0.8.2.1"
:exclusions [[org.apache.zookeeper/zookeeper]
[log4j/log4j]
[slf4j-api/org.slf4j]]
]
[org.apache.storm/storm-kafka "0.9.5"]
[org.apache.storm/storm-hdfs "0.9.5"]]
:main kafka2hdfs.core
:aot [kafka2hdfs.core]
:profiles {:provided {:dependencies [[org.apache.storm/storm-core "0.9.5"
:exclusions [[org.slf4j/log4j-over-slf4j]
[org.slf4j/slf4j-api]
[logback-classic/ch.qos.logback]]
]]}}
:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]]
:target-path "target/%s")

core程序编写

引入包

在core.js开头导入java package和class
(ns kafka2hdfs.core
(:import [backtype.storm StormSubmitter LocalCluster spout.SchemeAsMultiScheme]
[storm.kafka ZkHosts SpoutConfig StringScheme KafkaSpout]
[org.apache.storm.hdfs.bolt HdfsBolt]
[org.apache.storm.hdfs.bolt.format DefaultFileNameFormat DelimitedRecordFormat]
[org.apache.storm.hdfs.bolt.sync CountSyncPolicy]
[org.apache.storm.hdfs.bolt.rotation TimedRotationPolicy]
)
(:use [backtype.storm clojure config]) ;; for (topology ...)
(:gen-class))
可以看到,这和java代码比起来简单很多,[ ]里面第一是package名称,第二个开始是属于该package的类,因此一行就可以表达java的多行import语句

组装topology

这个函数才是核心,可以看到
(defn mk-topology []
(let [;; config kafka reader
broker-hosts (ZkHosts. "host1:21818,host2:21818,host3:21818,host4:21818,host5:21818/kafka")
spout-config (SpoutConfig. broker-hosts "topic" "/offset" "bigdata")
;; config hdfs writer
formatter (DelimitedRecordFormat.)
sync-policy (CountSyncPolicy. 1000)
rotation-policy (TimedRotationPolicy. 15.0 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy$TimeUnit/MINUTES)
file-name-format (DefaultFileNameFormat.)
hdfs-writer (HdfsBolt.)
kafka-reader (KafkaSpout. spout-config)
]
(set! (.scheme spout-config) (SchemeAsMultiScheme. (StringScheme.)))
(.forceFromStart spout-config true)
(.withFieldDelimiter formatter ",")
(doto file-name-format
(.withPath "/tmp")
(.withPrefix "kafka_")
(.withExtension ".log"))
(doto hdfs-writer
(.withFsUrl "hdfs://my-hdfs:9000")
(.withFileNameFormat file-name-format)
(.withRecordFormat formatter)
(.withRotationPolicy rotation-policy)
(.withSyncPolicy sync-policy))

(topology
{"kafka-reader" (spout-spec kafka-reader :p 2)}
{"hdfs-writer" (bolt-spec {"kafka-reader" :shuffle} hdfs-writer :p 1)})
))

这里大量使用了java interop语法来实现clojure调用java代码,参考我之前的博客。这个topology只有两个节点,一个spout, 一个bolt。spout负责读取kafka,bolt写入hdfs。数据采用shuffle的策略从kafka bolt emit到 hdfs bolt。

提交任务

这次不是提交本地测试任务了,因为我的笔记本电脑性能很可怜,经不起折腾。就直接编写真正提交的任务了。提交任务有两种方式,一种是用lein do clean, uberjar 打包成jar包, 然后用下面的命令提交
storm jar target/uberjar/kafka2hdfs-0.1.0-SNAPSHOT-standalone.jar kafka2hdfs.core args...

还有一种,是通过代码直接提交,以后再细说。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm clojure