您的位置:首页 > 运维架构 > Apache

云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战004-Flink基于流的window操作002

2017-11-20 14:38 1171 查看

三、交通场景下time-window实战

1.tumbling-time-window (无重叠数据)实战

1.0实战目的

每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。


1.1发送数据

1.发送命令
nc -lk 9999

2.发送内容
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4


1.2处理数据

执行程序

package code.book.stream.window.time

//0.引入必要的编程元素
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time

object TumblingTW {
def main(args: Array[String]): Unit = {
//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.定义数据流来源
val text = env.socketTextStream("qingcheng11", 9999)

//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
(f) => {
val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}

//4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒
//也就是说,每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.timeWindow(Time.seconds(5))
.sum("carCnt")

//5.显示统计结果
ds2.print()

//6.触发流计算
env.execute(this.getClass.getName)
}
}


执行效果



2.sliding-time-window (有重叠数据)实战

2.0实战目的

每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。


2.1发送数据

1.发送命令
nc -lk 9999

2.发送内容
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4


2.2处理数据

执行程序

package code.book.stream.window.time

//0.引入必要的编程元素
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time

object SlidingTW {
def main(args: Array[String]): Unit = {

//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.定义数据流来源
val text = env.socketTextStream("qingcheng11", 9999)

//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
(f) => {
val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒
//也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum("carCnt")

//5.显示统计结果
ds2.print()

//6.触发流计算
env.execute(this.getClass.getName)
}
}


执行效果



四、交通场景下的count-window实战

1.tumbling-count-window (无重叠数据)实战

1.0实战目的

每个路口分别统计,收到关于它的5条消息时统计在最近5条消息中,各自路口通过的汽车数量


1.1发送数据

1.发送命令
nc -lk 9999

2.发送内容
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4


1.2处理数据

执行程序

package code.book.stream.window.count

//0.引入必要的编程元素
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object TumblingCW {
def main(args: Array[String]): Unit = {

//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.定义数据流来源
val text = env.socketTextStream("qingcheng11", 9999)

//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
(f) => {
val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5
//也就是说,每个路口分别统计,收到关于它的5条消息时统计在最近5条消息中,各自路口通过的汽车数量
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.countWindow(5)
.sum("carCnt")

//5.显示统计结果
ds2.print()

//6.触发流计算
env.execute(this.getClass.getName)
}
}


执行效果



2.sliding-count-window (有重叠数据)实战

2.0实战目的

每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量


2.1发送数据

1.发送命令
nc -lk 9999

2.发送内容
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4


2.2处理数据

执行程序

package code.book.stream.window.count

//0.引入必要的编程元素
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}

object SlidingCW {
def main(args: Array[String]): Unit = {

//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.定义数据流来源
val text = env.socketTextStream("qingcheng11", 9999)

//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
(f) => {
val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据
//也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.countWindow(5, 3)
.sum("carCnt")

//5.显示统计结果
ds2.print()

//6.触发流计算
env.execute(this.getClass.getName)
}
}


执行效果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐