您的位置:首页
spark streaming updateStateByKey 使用方法
2017-06-30 14:52
441 查看
updateStateByKey 解释:
以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加
在有新的数据信息进入或更新时。能够让用户保持想要的不论什么状。使用这个功能须要完毕两步:
1) 定义状态:能够是随意数据类型
2) 定义状态更新函数:用一个函数指定怎样使用先前的状态。从输入流中的新值更新状态。
对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大。
updateStateByKey源代码:
/**
Return a new “state” DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
org.apache.spark.Partitioner is used to control the partitioning of each RDD.
@param updateFunc State update function. If
corresponding state key-value pair will be eliminated.
@param partitioner Partitioner for controlling the partitioning of each RDD in the new
DStream.
@param initialRDD initial state value of each key.
@tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
}
代码实现
StatefulNetworkWordCount
NetworkWordCount
WebPagePopularityValueCalculator
參考文章:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
http://stackoverflow.com/questions/28998408/spark-streaming-example-calls-updatestatebykey-with-additional-parameters
http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation
尊重原创,未经同意不得转载:
http://blog.csdn.net/stark_summer/article/details/47666337
以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加
在有新的数据信息进入或更新时。能够让用户保持想要的不论什么状。使用这个功能须要完毕两步:
1) 定义状态:能够是随意数据类型
2) 定义状态更新函数:用一个函数指定怎样使用先前的状态。从输入流中的新值更新状态。
对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大。
updateStateByKey源代码:
/**
Return a new “state” DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
org.apache.spark.Partitioner is used to control the partitioning of each RDD.
@param updateFunc State update function. If
thisfunction returns None, then
corresponding state key-value pair will be eliminated.
@param partitioner Partitioner for controlling the partitioning of each RDD in the new
DStream.
@param initialRDD initial state value of each key.
@tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
}
updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
}
代码实现
StatefulNetworkWordCount
object StatefulNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") System.exit(1) } Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() } }
NetworkWordCount
import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) //使用updateStateByKey前须要设置checkpoint ssc.checkpoint("hdfs://master:8020/spark/checkpoint") val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => { //通过Spark内部的reduceByKey按key规约。然后这里传入某key当前批次的Seq/List,再计算当前批次的总和 val currentCount = currValues.sum // 已累加的值 val previousCount = prevValueState.getOrElse(0) // 返回累加后的结果。是一个Option[Int]类型 Some(currentCount + previousCount) } val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) //val currWordCounts = pairs.reduceByKey(_ + _) //currWordCounts.print() val totalWordCounts = pairs.updateStateByKey[Int](addFunc) totalWordCounts.print() ssc.start() ssc.awaitTermination() } }
WebPagePopularityValueCalculator
package com.spark.streaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} /** * ━━━━━━神兽出没━━━━━━ * ┏┓ ┏┓ * ┏┛┻━━━┛┻┓ * ┃ ┃ * ┃ ━ ┃ * ┃ ┳┛ ┗┳ ┃ * ┃ ┃ * ┃ ┻ ┃ * ┃ ┃ * ┗━┓ ┏━┛ * ┃ ┃神兽保佑, 永无BUG! * ┃ ┃Code is far away from bug with the animal protecting * ┃ ┗━━━┓ * ┃ ┣┓ * ┃ ┏┛ * ┗┓┓┏━┳┓┏┛ * ┃┫┫ ┃┫┫ * ┗┻┛ ┗┻┛ * ━━━━━━感觉萌萌哒━━━━━━ * Module Desc: * User: wangyue * DateTime: 15-11-9上午10:50 */ object WebPagePopularityValueCalculator { private val checkpointDir = "popularity-data-checkpoint" private val msgConsumerGroup = "user-behavior-topic-message-consumer-group" def main(args: Array[String]) { if (args.length < 2) { println("Usage:WebPagePopularityValueCalculator zkserver1:2181, zkserver2: 2181, zkserver3: 2181 consumeMsgDataTimeInterval (secs) ") System.exit(1) } val Array(zkServers, processingInterval) = args val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator") val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt)) //using updateStateByKey asks for enabling checkpoint ssc.checkpoint(checkpointDir) val kafkaStream = KafkaUtils.createStream( //Spark streaming context ssc, //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,... zkServers, //kafka message consumer group ID msgConsumerGroup, //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread Map("user-behavior-topic" -> 3)) val msgDataRDD = kafkaStream.map(_._2) //for debug use only //println("Coming data in this interval...") //msgDataRDD.print() // e.g page37|5|1.5119122|-1 val popularityData = msgDataRDD.map { msgLine => { val dataArr: Array[String] = msgLine.split("\\|") val pageID = dataArr(0) //calculate the popularity value val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1 (pageID, popValue) } } //sum the previous popularity value and current value //定义一个匿名函数去把网页热度上一次的计算结果值和新计算的值相加,得到最新的热度值。 val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => { iterator.flatMap(t => { val newValue: Double = t._2.sum val stateValue: Double = t._3.getOrElse(0); Some(newValue + stateValue) }.map(sumedValue => (t._1, sumedValue))) } val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00))) //调用 updateStateByKey 原语并传入上面定义的匿名函数更新网页热度值。 val stateDStream = popularityData.updateStateByKey[Double](updatePopularityValue, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD) //set the checkpoint interval to avoid too frequently data checkpoint which may //may significantly reduce operation throughput stateDStream.checkpoint(Duration(8 * processingInterval.toInt * 1000)) //after calculation, we need to sort the result and only show the top 10 hot pages //最后得到最新结果后,须要对结果进行排序。最后打印热度值最高的 10 个网页。 stateDStream.foreachRDD { rdd => { val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false) val topKData = sortedData.take(10).map { case (v, k) => (k, v) } topKData.foreach(x => { println(x) }) } } ssc.start() ssc.awaitTermination() } }
參考文章:
http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
http://stackoverflow.com/questions/28998408/spark-streaming-example-calls-updatestatebykey-with-additional-parameters
http://stackoverflow.com/questions/27535668/spark-streaming-groupbykey-and-updatestatebykey-implementation
尊重原创,未经同意不得转载:
http://blog.csdn.net/stark_summer/article/details/47666337
相关文章推荐
- SparkStreaming updateStateByKey 使用
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
- 第93讲:Spark Streaming updateStateByKey案例实战和内幕源码
- 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- 大数据10_02_SparkStreaming输入源、foreachRDD、transform、updateStateByKey、reduceByKeyAndWindow
- spark-streaming-[2]-累加器(更新器)操作(updateStateByKey)
- spark streaming updateStateByKey 用法
- spark streaming - kafka updateStateByKey 统计用户消费金额
- 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- 14:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- Spark Streaming updateStateByKey案例实战和内幕源码解密
- 第110讲: Spark Streaming电商广告点击综合案例通过updateStateByKey等实现广告点击流量的在线更新统计
- spark streaming updateStateByKey 用法
- spark streaming updateStateByKey 用法
- (版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- spark streaming - kafka updateStateByKey 统计用户消费金额
- Spark Streaming updateStateByKey 算法
- spark streaming - kafka updateStateByKey 统计用户消费金额
- 第93课:Spark Streaming updateStateByKey案例实战和内幕源码解密