利用mapWithState实现按照首字母统计的有状态的wordCount
2019-07-07 13:22
851 查看
最近在做sparkstreaming整合kafka的时候遇到了一个问题:
可以抽象成这样一个问题:有状态的wordCount,且按照word的第一个字母为key,但是要求输出的格式为(word,1)这样的形式
举例来说:
例如第一批数据为: hello how when hello
则要求输出为:(hello,1) (how,2) (when,1) (hello,3)
第二批数据为: hello how when what hi
则要求输出为: (hello,4) (how,5) (when,2) (what,3) (hi,6)
首先了解一下mapWithState的常规用法:
ref: https://www.jianshu.com/p/a54b142067e5
http://sharkdtu.com/posts/spark-streaming-state.html
稍微总结一下mapWithState的几个tips:
- mapWithState是1.6版本之后推出的
- 必须设置checkpoint来储存历史数据
- mapWithState和updateStateByKey的区别 : 他们类似,都是有状态DStream操作, 区别在于,updateStateByKey是输出增量数据,随着时间的增加, 输出的数据越来越多,这样会影响计算的效率, 对CPU和内存压力较大.而mapWithState则输出本批次数据,但是也含有状态更新.
- checkpoint的数据会分散存储在不同的分区中, 在进行状态更新时, 首先会对当前 key 做 hash , 再到对应的分区中去更新状态 , 这种方式大大提高了效率.
解决问题的思路:
State中保存状态为(String,Int) 元组类型, 其中String为word的全量, 而Int为word的计数.
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.MapWithStateDStream import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} object MapWithStateApp { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("MapWithStateApp") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("C:\\Users\\hylz\\Desktop\\checkpoint") val lines = ssc.socketTextStream("192.168.100.11",8888) val words = lines.flatMap(_.split(" ")) def mappingFunc(key: String, value: Option[(String, Int)], state: State[(String, Int)]): (String, Int) = { val cnt: Int = value.getOrElse((null, 0))._2 + state.getOption.getOrElse((null, 0))._2 val allField: String = value.getOrElse((null, 0))._1 state.update((allField, cnt)) (allField, cnt) } val cnt: MapWithStateDStream[String, (String, Int), (String, Int), (String, Int)] = words.map(x => (x.substring(0, 1), (x, 1))).mapWithState(StateSpec.function(mappingFunc _)) cnt.print() ssc.start() ssc.awaitTermination() } }
测试结果如下
input: hello how when hello
input: hello how when what hi
相关文章推荐
- java实现kafka整合spark streaming完成wordCount,updateStateByKey完成实时状态更新
- Java实现词频统计(Wordcount)-Map或Hashtable的value排序
- spark streaming实现状态可恢复的wordcount计算程序
- 利用Map实现的统计每个输入字符串或则单词出现的次数
- spark helloworld (wordCount实现并按照词频排序)
- Python利用WordCount实现词云
- java编写的hadoop wordcount,单MR任务实现按照词频排序输出结果
- Eclipse重写Wordcount类实现处理中文字符,利用hadoop Eclipse插件远程调试hadoop运行WordCount程序
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
- java8实现spark wordcount并且按照value排序输出
- Map-Reduce下使用Streaming实现 WordCount
- 利用shell实现WordCount
- spark-streaming状态流之mapWithState
- hadoop入门之利用hadoop来对文档数据归类统计案例wordcount
- flatMap功能不只是wordcount,不知不觉用flatmap实现了hive的自带函数explode功能
- MapReduce-WordCount实现按照value降序排序、字符小写、识别不同标点
- java利用map来实现可视化统计随机数分布
- [spark streaming] 状态管理 updateStateByKey&mapWithState
- Hive实现wordcount的统计
- lucene中facet实现统计分析的思路——本质上和word count计数无异,像splunk这种层层聚合(先filed1统计,再field2统计,最后field3统计)lucene是排序实现