Flink的Scala程序实现对相同id的数据,进行温度值累加 5.输出当前传感器最新的温度 +10,而时间戳是上一次数据的时间戳 +1,并将结果打印到控制台;
2020-06-27 05:21
405 查看
package com.xujunqi.source.com.bawei.api import org.apache.flink.api.scala._ object SensorFlinkTemperature { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment // 获取文本路径 val stream = env.readTextFile("D:\\IDEA_Maven\\XiaoWu_Worksoace\\flink_pi_liuchuli\\src\\main\\resources\\temperature") val dataSteam = stream.filter(_.nonEmpty).map(data => { val dataArr : Array[String] = data.split(",") SensorReading(dataArr(0).trim,dataArr(1).trim.toLong,dataArr(2).trim.toDouble) }) dataSteam.map(x=>(x.id,x.temperature)).groupBy(0).sum(1).print() dataSteam.map(x=>SensorReading(x.id,x.timestamp+1,x.temperature+10)).print() } } case class SensorReading(id:String,timestamp:Long,temperature:Double)
1.创建sensor.txt文件,内容如下: sensor_1, 1547718199, 35.80018327300259 sensor_6, 1547718201, 15.402984393403084 sensor_7, 1547718202, 6.720945201171228 sensor_10, 1547718205, 38.101067604893444 sensor_1, 1547718201, 35.6 sensor_1, 1547718233, 42.1
2.创建SensorReading样例类,其中字段包括: (id:string,timestamp:long,temperature:Double) 3.读取sensor.txt文件数据,封装成SensorReading样例类; 4.功能一:实现对相同id的数据,进行温度值累加,并将结果打印到控制台; 5.功能二:输出当前传感器最新的温度 +10,而时间戳是上一次数据的时间戳 +1,并将结果打印到控制台;
相关文章推荐
- 小明去美国旅游,美国温度以华氏温度计量,她需要将华氏温度转换为摄氏温度,编写程序实现此功能. 要求: 从控制台输入华氏温度信息,并且分别打印最后的华氏温度和摄氏温度的结果
- 基因数据处理17之使用scala对BWA运行结果进行各阶段程序时间提取和统计求和
- 用Java实现建立链式二叉树存储数据,并对其进行遍历(前序,中序,后序),打印输出遍历结果。
- 笔试题:编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- 编写一个程序,开启 3 个线程,这三个线程的 ID 分别为 A、B、C,每个线程将自己的 ID 在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。如:ABCABCABC…… 依次递归
- 编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- 编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- 第四题(迅雷笔试题):编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- 编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推
- 1. 手机销售系统(结合着本地文件的读写,进行数据的持久化(.txt),当程序退出的时候,将所有修改后的数据保存在txt中,等下一次再运行程序的时候,可以直接从txt中读取最新的数据信息)
- 编写多线程程序,模拟多个人通过一个山洞。这个山洞每次只能通过一个人,每个人通过山洞的时间为2秒(sleep)。随机生成10个人,都要通过此山洞,用随机值对应的字符串表示人名,打印输出
- 基因数据处理15之scala对BWA运行结果进行时间提取
- 编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- 编写多线程程序,模拟多个人通过一个山洞。这个山洞每次只能通过一个人,每个人通过山洞的时间为2秒(sleep)。随机生成10个人,都要通过此山洞,用随机值对应的字符串表示人名,打印输出每次通过山洞的人名
- 设计一个程序,每秒统计一次当前系统的进程状况,并按照内存使用自多到少排序打印输出相关信息。
- 编写多线程程序,模拟多个人通过一个山洞。这个山洞每次只能通过一个人,每个人通过山洞的时间为2秒(sleep)。随机生成10个人,都要通过此山洞,用随机值对应的字符串表示人名,打印输出每次
- Python实现从键盘接收若干个整数,直至用户输入N或n时停止。将输入数据进行升序排列,并输出排序结果、平均值及中位数。
- 编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- 编写一个程序,开启3个线程,这3个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出结果必须按ABC的顺序显示;如:ABCABC….依次递推。
- C#实现在控制台输出当前系统时间的方法