sparkstreaming性能测试简单例子--53
2015-07-31 11:28
274 查看
package llf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by root on 15-7-31. */ object WindwordCount { def main(args: Array[String]): Unit ={ val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } // val conf = new SparkConf().setAppName("WindwordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) ssc.checkpoint(".") // val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(",")) val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a:Int, b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt)) .updateStateByKey[Int](updateFunc) wordCounts.print() ssc.start() ssc.awaitTermination() } }
终于这个月凑够4篇博客了``这是凑数的
相关文章推荐
- 4种线程中操作UI
- Windows安装MongoDB
- BZOJ 3039 玉蟾宫 最大子矩阵 算♂法②
- 在eclipse中Java连接Sybase和oracle的例子
- GestureDetector类及其用法
- hdu 5335 Walk Out (搜索)
- javascript实现简单的页面右下角提示信息框
- HDU 5318 The Goddess Of The Moon(递推+矩阵优化)
- HDU 2473 Junk-Mail Filter 并查集 删点
- 从异常堆栈中还原 ProGuard 混淆过的代码
- http-equiv
- springMVC 前后台日期格式传值解决方式之二(共二) @InitBinder的使用
- android studio Gradle基础
- PHP代码实现爬虫记录――超管用
- 深入分析Android系统中SparseArray的源码
- C#中的Finalize,Dispose,SuppressFinalize的实现和使用介绍
- nginx server_name 多个
- spark on hive 总结
- 手把手创建自定义的分层的maven-archetype
- 2015多校联合训练赛 Training Contest 4 1008