云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战004-Flink基于流的window操作002
2017-11-20 14:38
1171 查看
三、交通场景下time-window实战
1.tumbling-time-window (无重叠数据)实战
1.0实战目的
每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。
1.1发送数据
1.发送命令 nc -lk 9999 2.发送内容 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4
1.2处理数据
执行程序package code.book.stream.window.time //0.引入必要的编程元素 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time object TumblingTW { def main(args: Array[String]): Unit = { //1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("qingcheng11", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒 //也就是说,每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(5)) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName) } }
执行效果
2.sliding-time-window (有重叠数据)实战
2.0实战目的
每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。
2.1发送数据
1.发送命令 nc -lk 9999 2.发送内容 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4
2.2处理数据
执行程序package code.book.stream.window.time //0.引入必要的编程元素 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.api.windowing.time.Time object SlidingTW { def main(args: Array[String]): Unit = { //1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("qingcheng11", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒 //也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(10), Time.seconds(5)) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName) } }
执行效果
四、交通场景下的count-window实战
1.tumbling-count-window (无重叠数据)实战
1.0实战目的
每个路口分别统计,收到关于它的5条消息时统计在最近5条消息中,各自路口通过的汽车数量
1.1发送数据
1.发送命令 nc -lk 9999 2.发送内容 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4
1.2处理数据
执行程序package code.book.stream.window.count //0.引入必要的编程元素 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} object TumblingCW { def main(args: Array[String]): Unit = { //1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("qingcheng11", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5 //也就是说,每个路口分别统计,收到关于它的5条消息时统计在最近5条消息中,各自路口通过的汽车数量 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName) } }
执行效果
2.sliding-count-window (有重叠数据)实战
2.0实战目的
每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量
2.1发送数据
1.发送命令 nc -lk 9999 2.发送内容 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4
2.2处理数据
执行程序package code.book.stream.window.count //0.引入必要的编程元素 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} object SlidingCW { def main(args: Array[String]): Unit = { //1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流来源 val text = env.socketTextStream("qingcheng11", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据 //也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5, 3) .sum("carCnt") //5.显示统计结果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName) } }
执行效果
相关文章推荐
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战003-Flink基于流的window操作001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战005-Flink基于流的window操作003
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战007-DataStream与MySql自定义sink和source(Scala版)002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战010-DataStream与MySql自定义sink和source(Java版)002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战013-Flink在流处理中常见的sink和source002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战002-Flink基于流的wordcount示例002
- 【云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战001-Flink基于流的wordcount示例001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战006-DataStream与MySql自定义sink和source(Scala版)001
- 云星数据---Apache Flink实战系列(精品版)】:Flink其他操作及内容002-Flink基于流的windowAndFunction操作001
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战004--DateSet实用API详解004
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战008-DataStream与MySql自定义sink和source(Scala版)003
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战002--DateSet实用API详解002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战009-DataStream与MySql自定义sink和source(Java版)001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战011-DataStream与MySql自定义sink和source(Java版)003
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战023--DateSet实用API详解023
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战008--DateSet实用API详解008
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战001--DateSet实用API详解001
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战019--DateSet实用API详解019
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战016--DateSet实用API详解016
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战027--DateSet实用API详解027