spark streaming实现状态可恢复的wordcount计算程序
2015-05-25 17:40
405 查看
在spark streaming examples的源代码中有相关的两个示例程序,一个是可恢复的wordcount程序:RecoverableNetworkWordCount.scala,还一个是有状态的wordcount程序:StatefulNetworkWordCount.scala.关于有状态的wordcount程序如何从失败中恢复,却没有相关的example示例代码.于是我将这两个示例代码整合了一下,实现了一个有状态的可恢复的wordcount程序,当你停止程序重新启动后,会接着上次的状态继续对单词计数.
代码如下:
把代码打成一个JAR包,比如我打的叫sparkdemo.jar,上传到集群某个节点上去.
在shell命令行执行:nc -lk 9999开启一个linux自带的简易socket消息程序
然后启动spark streaming程序:
spark-submit --class com.hupu.dace.spark.streaming.MyRecoverableNetworkWordCount --master local[4] sparkdemo.jar n1 9999 cp3
MyRecoverableNetworkWordCount 有三个参数需要传递: host port checkpointdir, host 和port根据自己的节点来填写,checkpoint名字可以随便填,我这用的是相对路径,会在提交JOB的当前用户的HDFS目录下面创建一个cp3的文件夹.
然后可以在消息程序中随便输入点words,可以看到spark streaming程序输出对应的word counts信息.然后我们把streaming程序停掉来模拟driver挂掉的情况,重新提交一下,会惊喜的发现上次count的结果可以恢复回来.
代码如下:
package com.hupu.dace.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by xiaojun on 2015/5/25. */ object MyRecoverableNetworkWordCount { def createContext(ip: String, port: Int, checkpointDirectory: String) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint println("Creating new context") val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(checkpointDirectory) val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() ssc } def main(args: Array[String]) { if (args.length != 3) { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> | . <hostname> and <port> describe the TCP server that Spark | Streaming would connect to receive data. <checkpoint-directory> directory to | HDFS-compatible file system which checkpoint data <output-file> file to which the | word counts will be appended | |In local mode, <master> should be 'local ' with n > 1 |Both <checkpoint-directory> and <output-file> must be absolute paths """.stripMargin ) System.exit(1) } val Array(ip, IntParam(port), checkpointDirectory) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(ip, port, checkpointDirectory) }) ssc.start() ssc.awaitTermination() } } //extractor private object IntParam { def unapply(str: String): Option[Int] = { try { Some(str.toInt) } catch { case e: NumberFormatException => None } } }如何测试?
把代码打成一个JAR包,比如我打的叫sparkdemo.jar,上传到集群某个节点上去.
在shell命令行执行:nc -lk 9999开启一个linux自带的简易socket消息程序
然后启动spark streaming程序:
spark-submit --class com.hupu.dace.spark.streaming.MyRecoverableNetworkWordCount --master local[4] sparkdemo.jar n1 9999 cp3
MyRecoverableNetworkWordCount 有三个参数需要传递: host port checkpointdir, host 和port根据自己的节点来填写,checkpoint名字可以随便填,我这用的是相对路径,会在提交JOB的当前用户的HDFS目录下面创建一个cp3的文件夹.
然后可以在消息程序中随便输入点words,可以看到spark streaming程序输出对应的word counts信息.然后我们把streaming程序停掉来模拟driver挂掉的情况,重新提交一下,会惊喜的发现上次count的结果可以恢复回来.
相关文章推荐
- Hadoop2.7.3+Hive2.1.0整合实现wordcount程序
- Hadoop流实现WordCount程序样例
- spark streaming 实现接收网络传输数据进行WordCount功能
- hadoop命令行运行wordcount程序 web端监控不到作业状态
- 使用SAS实现HADOOP Map/Reduce程序-wordcount
- Eclipse重写Wordcount类实现处理中文字符,利用hadoop Eclipse插件远程调试hadoop运行WordCount程序
- 基于HDFS的实时计算和wordcount程序
- Spark中的wordCount程序实现
- spark streaming 实现接收网络传输数据进行WordCount功能
- MapReduce编写wordcount程序代码实现
- 用mapreduce计算wordCount和手机流量统计程序运行过程
- python实现wordcount程序
- Spark on Yarn上实现WordCount程序
- hadoop案例实现之WordCount (计算单词出现的频数)
- hadoop+intellij+maven实现wordcount程序
- 编写Akka程序实现WordCount功能
- hadoop-python——Wordcount程序:python实现详解
- Spark 程序 WordCount实现 Scala、Python
- Hadoop学习笔记(1):WordCount程序的实现与总结