SparkStreaming+Zookeeper+Kafka入门程序
2016-03-21 10:47
357 查看
原文链接:http://www.jianshu.com/p/5606253f4495
安装 zookeeper
安装 kafka
切换到上级的bin目录中,执行
Starting zookeeper ... STARTED
然后用
Mode: standalone
如果要停止zookeeper,则运行
[2014-11-12 17:38:13,395] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2014-11-12 17:38:13,420] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0"
启动scala程序,然后在 上面第2步的 生产者终端中输入一些字符串,如
4/11/12 16:38:22 INFO scheduler.DAGScheduler: Stage 195 (take at DStream.scala:608) finished in 0.004 s
-------------------------------------------
Time: 1415781502000 ms
-------------------------------------------
(aa,1)
(a,9)
(sdfsadf,1)
说明程序成功运行。
准备工作:
安装 spark安装 zookeeper
安装 kafka
开始工作
1. 启动zookeeper
打开终端,切换到zookeeper HOME目录, 进入conf文件夹,拷贝一份
zoo_sample.cfg副本并重命名为
zoo.cfg
切换到上级的bin目录中,执行
./zkServer.sh start启动zookeeper,会有日志打印
Starting zookeeper ... STARTED
然后用
./zkServer.sh status查看状态,如果有下列信息输出,则说明启动成功
Mode: standalone
如果要停止zookeeper,则运行
./zkServer stop即可
2. 启动kafka
打开终端,切换到kafka HOME目录,运行
bin/kafka-server-start.sh config/server.properties会有以下类似日志输出
[2014-11-12 17:38:13,395] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2014-11-12 17:38:13,420] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
3. 启动kafka生产者
重新打开一个终端,暂叫做 生产者终端,方便后面引用说明。切换到kafka HOME目录,运行
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test创建一个叫
test的主题。
4. 编写scala应用程序
package test import java.util.Properties import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf object KafkaWordCount { def main(args: Array[String]) { // if (args.length < 4) { // System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") // System.exit(1) // } // StreamingExamples.setStreamingLogLevels() //val Array(zkQuorum, group, topics, numThreads) = args val zkQuorum = "localhost:2181" val group = "1" val topics = "test" val numThreads = 2 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) //val wordCounts = words.map(x => (x, 1L)) // .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() } }
build.sbt文件中添加依赖
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.1.0"
启动scala程序,然后在 上面第2步的 生产者终端中输入一些字符串,如
sdfsadf a aa a a a a a a a a,在ide的控制台上可以看到有信息输出
4/11/12 16:38:22 INFO scheduler.DAGScheduler: Stage 195 (take at DStream.scala:608) finished in 0.004 s
-------------------------------------------
Time: 1415781502000 ms
-------------------------------------------
(aa,1)
(a,9)
(sdfsadf,1)
说明程序成功运行。
相关文章推荐
- Java排序算法(七)--简单选择排序(SelectionSort)
- dede后台登陆出现include\\userlogin.class.php on line 21 怎么解决
- 无法将数值写入键
- DEDE网站系统目录权限设置方法
- php防止短时间内重复提交。
- document.all.item通过控件的名字定位控件,
- jQuery实现图片延迟加载
- 缓冲加载图片的 jQuery 插件 lazyload.js 使用方法详解
- 文本框只能输入数字。
- js中的alert()弹出乱码
- dede织梦CMS调用全站文章方法
- 【转载】iframe自适应高度详解(转帖)
- 【转载】用jQuery在IFRAME里取得父窗口的某个元素的值
- 正则表达式应用通配符及常用实例 unterminated parenthetical中文意思为未结束括号
- PHP fsockopen函数详解
- 男戴观音女戴佛有依据吗?
- 为什么网站访问跳转到127.0.0.1 求解决办法!!!!
- display:inline , margin值在IE6变成双倍怎么解决?
- 标签: dedecms 采集教材 采集教程 dede 杂谈 分类
- [field:typelink]和[field:typeurl]有什么区别?