imf大数据:第95课:streaming实战,实现在线热搜索词
2016-05-02 16:34
363 查看
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} object class_95_OnlineHottestItems{ /** * 使用Scala开发集群运行的Spark 在线黑名单过滤程序 * @author DT大数据梦工厂 * 新浪微博:http://weibo.com/ilovepains/ * 不是在线处理数据,都是没有价值的 * * */ def main(args: Array[String]){ val conf=new SparkConf().setMaster("local[2]").setAppName("HottestItems") val ssc=new StreamingContext(conf,Duration(5)) val soDstream=ssc.socketTextStream("master",9999) soDstream .map(_.split(" ")(1))//按照空格分割取出第二个元素 .map(item=>(item,1))//元素转化成turple2 .reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20))//以60s窗口长度,20s滑动间隔进行元素的累加 .transform(ItemRDD=>ItemRDD//DStream没有排序的算子,所以要转化为rdd .map(pair=>(pair._2,pair._1))//将k,v对调 .sortByKey(false))//对v进行排序 .map(pair=>(pair._2,pair._1))//将K,V复原 .print() ssc.start() ssc.awaitTermination() } }
相关文章推荐
- Uva 514 Rails(栈应用)
- 电商大数据学习笔记:理论
- HDU 1789 Doing Homework again
- what's the cygwin and how cai i install this software
- 70. Climbing Stairs
- 服务器TIME_WAIT和CLOSE_WAIT详解和解决办法
- 70. Climbing Stairs
- container_of 宏、offsetof 宏 分析
- Flex AIR ANT 脚本编译
- CodeForces 667A Pouring Rain
- CodeForces 667A Pouring Rain
- hdu2925 Musical Chairs && poj3517 And Then There Was One(约瑟夫环)
- Raid 级别
- Hibernate运行报错Unknown entity: domain.UserBean
- HDU 3394 Railway 点双连通分量 + 桥
- Rails,ACM/ICPC CERC 1997,UVa514
- 清理Kylin的中间存储数据(HDFS & HBase Tables)
- Insufficient memory<failed to allocate 232852> in cv::OutOfMemoryError
- Hadoop2.7实战v1.0之Linux参数调优
- SparkStreaming数据源Flume实际案例分享