您的位置:首页 > 其它

Flink的Scala程序实现对相同id的数据,进行温度值累加 5.输出当前传感器最新的温度 +10,而时间戳是上一次数据的时间戳 +1,并将结果打印到控制台;

2020-06-27 05:21 405 查看
package com.xujunqi.source.com.bawei.api
import org.apache.flink.api.scala._
object SensorFlinkTemperature {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//    获取文本路径
val stream = env.readTextFile("D:\\IDEA_Maven\\XiaoWu_Worksoace\\flink_pi_liuchuli\\src\\main\\resources\\temperature")
val dataSteam = stream.filter(_.nonEmpty).map(data => {
val dataArr : Array[String] = data.split(",")
SensorReading(dataArr(0).trim,dataArr(1).trim.toLong,dataArr(2).trim.toDouble)
})

dataSteam.map(x=>(x.id,x.temperature)).groupBy(0).sum(1).print()
dataSteam.map(x=>SensorReading(x.id,x.timestamp+1,x.temperature+10)).print()
}
}
case class SensorReading(id:String,timestamp:Long,temperature:Double)
1.创建sensor.txt文件,内容如下:
sensor_1, 1547718199, 35.80018327300259
sensor_6, 1547718201, 15.402984393403084
sensor_7, 1547718202, 6.720945201171228
sensor_10, 1547718205, 38.101067604893444
sensor_1, 1547718201, 35.6
sensor_1, 1547718233, 42.1
2.创建SensorReading样例类,其中字段包括:
(id:string,timestamp:long,temperature:Double)

3.读取sensor.txt文件数据,封装成SensorReading样例类;
4.功能一:实现对相同id的数据,进行温度值累加,并将结果打印到控制台;
5.功能二:输出当前传感器最新的温度 +10,而时间戳是上一次数据的时间戳 +1,并将结果打印到控制台;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐