storm 开发系列三 Clojue编写程序读取kafka数据并写入到hdfs
2015-10-30 22:49
686 查看
要说storm程序最常用的数据源,自然是kafka,storm通常用来进行各种实时统计,但是也会顺带将从kafka读取的数据顺带写入hdfs,根据我的经验,这几乎是必须的功能。
所以本次程序就是实现读取kafka数据,然后写入hdfs。但是最大的不同是,这是clojure版本,而不是java版本。
下面分别说明
(defproject kafka2hdfs "0.1.0-SNAPSHOT"
:description "demo to show read from kafka and write to hdfs"
:url "http://blog.csdn.net/csfreebird"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[org.apache.kafka/kafka_2.10 "0.8.2.1"
:exclusions [[org.apache.zookeeper/zookeeper]
[log4j/log4j]
[slf4j-api/org.slf4j]]
]
[org.apache.storm/storm-kafka "0.9.5"]
[org.apache.storm/storm-hdfs "0.9.5"]]
:main kafka2hdfs.core
:aot [kafka2hdfs.core]
:profiles {:provided {:dependencies [[org.apache.storm/storm-core "0.9.5"
:exclusions [[org.slf4j/log4j-over-slf4j]
[org.slf4j/slf4j-api]
[logback-classic/ch.qos.logback]]
]]}}
:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]]
:target-path "target/%s")
这里大量使用了java interop语法来实现clojure调用java代码,参考我之前的博客。这个topology只有两个节点,一个spout, 一个bolt。spout负责读取kafka,bolt写入hdfs。数据采用shuffle的策略从kafka bolt emit到 hdfs bolt。
还有一种,是通过代码直接提交,以后再细说。
所以本次程序就是实现读取kafka数据,然后写入hdfs。但是最大的不同是,这是clojure版本,而不是java版本。
下面分别说明
project配置
project.clj文件包含了依赖项:(defproject kafka2hdfs "0.1.0-SNAPSHOT"
:description "demo to show read from kafka and write to hdfs"
:url "http://blog.csdn.net/csfreebird"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[org.apache.kafka/kafka_2.10 "0.8.2.1"
:exclusions [[org.apache.zookeeper/zookeeper]
[log4j/log4j]
[slf4j-api/org.slf4j]]
]
[org.apache.storm/storm-kafka "0.9.5"]
[org.apache.storm/storm-hdfs "0.9.5"]]
:main kafka2hdfs.core
:aot [kafka2hdfs.core]
:profiles {:provided {:dependencies [[org.apache.storm/storm-core "0.9.5"
:exclusions [[org.slf4j/log4j-over-slf4j]
[org.slf4j/slf4j-api]
[logback-classic/ch.qos.logback]]
]]}}
:plugins [[cider/cider-nrepl "0.10.0-SNAPSHOT"]]
:target-path "target/%s")
core程序编写
引入包
在core.js开头导入java package和class(ns kafka2hdfs.core (:import [backtype.storm StormSubmitter LocalCluster spout.SchemeAsMultiScheme] [storm.kafka ZkHosts SpoutConfig StringScheme KafkaSpout] [org.apache.storm.hdfs.bolt HdfsBolt] [org.apache.storm.hdfs.bolt.format DefaultFileNameFormat DelimitedRecordFormat] [org.apache.storm.hdfs.bolt.sync CountSyncPolicy] [org.apache.storm.hdfs.bolt.rotation TimedRotationPolicy] ) (:use [backtype.storm clojure config]) ;; for (topology ...) (:gen-class))可以看到,这和java代码比起来简单很多,[ ]里面第一是package名称,第二个开始是属于该package的类,因此一行就可以表达java的多行import语句
组装topology
这个函数才是核心,可以看到(defn mk-topology [] (let [;; config kafka reader broker-hosts (ZkHosts. "host1:21818,host2:21818,host3:21818,host4:21818,host5:21818/kafka") spout-config (SpoutConfig. broker-hosts "topic" "/offset" "bigdata") ;; config hdfs writer formatter (DelimitedRecordFormat.) sync-policy (CountSyncPolicy. 1000) rotation-policy (TimedRotationPolicy. 15.0 org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy$TimeUnit/MINUTES) file-name-format (DefaultFileNameFormat.) hdfs-writer (HdfsBolt.) kafka-reader (KafkaSpout. spout-config) ] (set! (.scheme spout-config) (SchemeAsMultiScheme. (StringScheme.))) (.forceFromStart spout-config true) (.withFieldDelimiter formatter ",") (doto file-name-format (.withPath "/tmp") (.withPrefix "kafka_") (.withExtension ".log")) (doto hdfs-writer (.withFsUrl "hdfs://my-hdfs:9000") (.withFileNameFormat file-name-format) (.withRecordFormat formatter) (.withRotationPolicy rotation-policy) (.withSyncPolicy sync-policy)) (topology {"kafka-reader" (spout-spec kafka-reader :p 2)} {"hdfs-writer" (bolt-spec {"kafka-reader" :shuffle} hdfs-writer :p 1)}) ))
这里大量使用了java interop语法来实现clojure调用java代码,参考我之前的博客。这个topology只有两个节点,一个spout, 一个bolt。spout负责读取kafka,bolt写入hdfs。数据采用shuffle的策略从kafka bolt emit到 hdfs bolt。
提交任务
这次不是提交本地测试任务了,因为我的笔记本电脑性能很可怜,经不起折腾。就直接编写真正提交的任务了。提交任务有两种方式,一种是用lein do clean, uberjar 打包成jar包, 然后用下面的命令提交storm jar target/uberjar/kafka2hdfs-0.1.0-SNAPSHOT-standalone.jar kafka2hdfs.core args...
还有一种,是通过代码直接提交,以后再细说。
相关文章推荐
- Release Notes - Apache Storm - Version 0.9.2-incub
- C/C++实现对STORM运行信息查看及控制的方法
- 基于Storm的Nginx log实时监控系统
- 整合Kafka到Spark Streaming——代码示例和挑战
- Clojure基础环境搭建
- 大白话storm
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- storm
- Storm配置项详解
- Twitter Storm 安装篇
- Storm入门教程 Storm安装部署步骤
- Storm常见问题及解决方法收集
- storm 配置项详解
- storm 删数据后上传topology无法启动?
- TowerMadness之Brewing Storm攻略 Blizzardgale
- Ubuntu 12.04 中安装storm
- Storm是如何做到事务一致的研究
- 流式计算框架:Storm VS Spark Streaming