您的位置:首页 > 其它

spark streaming实现状态可恢复的wordcount计算程序

2015-05-25 17:40 405 查看
在spark streaming examples的源代码中有相关的两个示例程序,一个是可恢复的wordcount程序:RecoverableNetworkWordCount.scala,还一个是有状态的wordcount程序:StatefulNetworkWordCount.scala.关于有状态的wordcount程序如何从失败中恢复,却没有相关的example示例代码.于是我将这两个示例代码整合了一下,实现了一个有状态的可恢复的wordcount程序,当你停止程序重新启动后,会接着上次的状态继续对单词计数.

代码如下:

package com.hupu.dace.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by xiaojun on 2015/5/25.
*/
object MyRecoverableNetworkWordCount {
def createContext(ip: String, port: Int, checkpointDirectory: String) = {

// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
println("Creating new context")
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDirectory)

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)
}
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc
}

def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
|     . <hostname> and <port> describe the TCP server that Spark
|     Streaming would connect to receive data. <checkpoint-directory> directory to
|     HDFS-compatible file system which checkpoint data <output-file> file to which the
|     word counts will be appended
|
|In local mode, <master> should be 'local
' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit(1)
}

val Array(ip, IntParam(port), checkpointDirectory) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
}
//extractor
private object IntParam {
def unapply(str: String): Option[Int] = {
try {
Some(str.toInt)
} catch {
case e: NumberFormatException => None
}
}
}
如何测试?

把代码打成一个JAR包,比如我打的叫sparkdemo.jar,上传到集群某个节点上去.

在shell命令行执行:nc -lk 9999开启一个linux自带的简易socket消息程序

然后启动spark streaming程序:

spark-submit --class com.hupu.dace.spark.streaming.MyRecoverableNetworkWordCount --master local[4] sparkdemo.jar n1 9999 cp3

MyRecoverableNetworkWordCount 有三个参数需要传递: host port checkpointdir, host 和port根据自己的节点来填写,checkpoint名字可以随便填,我这用的是相对路径,会在提交JOB的当前用户的HDFS目录下面创建一个cp3的文件夹.

然后可以在消息程序中随便输入点words,可以看到spark streaming程序输出对应的word counts信息.然后我们把streaming程序停掉来模拟driver挂掉的情况,重新提交一下,会惊喜的发现上次count的结果可以恢复回来.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: