您的位置:首页 > 其它

sparkstreaming性能测试简单例子--53

2015-07-31 11:28 274 查看
package llf

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by root on 15-7-31.
*/
object WindwordCount {
def main(args: Array[String]): Unit ={
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
//
val conf = new SparkConf().setAppName("WindwordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")
//
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a:Int, b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
.updateStateByKey[Int](updateFunc)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}

}


终于这个月凑够4篇博客了``这是凑数的
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: