Flink的TableAPI与SQL
2019-07-11 17:53
549 查看
Table API是流处理和批处理通用的关系型API,Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的,Table API是Scala 和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义的,具有IDE支持如:自动完成和语法检测。
1.需要引入的pom依赖
[code]<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.7.0</version> </dependency>
2.构造表环境
[code]def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) } val startupLogTable: Table = tableEnv.fromDataStream(startupLogDstream) val table: Table = startupLogTable.select("mid,ch").filter("ch ='appstore'") val midchDataStream: DataStream[(String, String)] = table.toAppendStream[(String,String)] midchDataStream.print() env.execute() }
动态表
如果流中的数据类型是case class可以直接根据case class的结构生成table
[code]tableEnv.fromDataStream(startupLogDstream)
或者根据字段顺序单独命名
[code]tableEnv.fromDataStream(startupLogDstream,’mid,’uid .......)
最后的动态表可以转换为流进行输出
[code]table.toAppendStream[(String,String)]
字段
用一个单引放到字段前面 来标识字段名, 如 ‘name , ‘mid ,’amount 等
3.通过一个例子连了解TableAPI
[code]//每10秒中渠道为appstore的个数 def main(args: Array[String]): Unit = { //sparkcontext val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //时间特性改为eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) } //告知watermark 和 eventTime如何提取 val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) { override def extractTimestamp(element: StartupLog): Long = { element.ts } }).setParallelism(1) //SparkSession val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) //把数据流转化成Table val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream , 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) //通过table api 进行操作 // 每10秒 统计一次各个渠道的个数 table api 解决 //1 groupby 2 要用 window 3 用eventtime来确定开窗时间 val resultTable: Table = startupTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count) //把Table转化成数据流 //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)] val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)] resultDstream.filter(_._1).print() env.execute() }
关于group by
1.如果使用 groupby table转换为流的时候只能用toRetractDstream
[code] val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]
2.toRetractDstream 得到的第一个boolean型字段标识 true就是最新的数据,false表示过期老数据
[code]val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] rDstream.filter(_._1).print()
3.如果使用的api包括时间窗口,那么时间的字段必须,包含在group by中。
[code]val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ")
关于时间窗口
1.用到时间窗口,必须提前声明时间字段,如果是processTime直接在创建动态表时进行追加就可以
[code]val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]
2.如果是EventTime要在创建动态表时声明
[code]val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]rDstream.filter(_._1).print()
3.滚动窗口可以使用Tumble over 10000.millis on
[code]val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.processtime)
4.SQL如何编写
[code]def main(args: Array[String]): Unit = { //sparkcontext val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //时间特性改为eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) } //告知watermark 和 eventTime如何提取 val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) { override def extractTimestamp(element: StartupLog): Long = { element.ts } }).setParallelism(1) //SparkSession val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) //把数据流转化成Table val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream , 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) //通过table api 进行操作 // 每10秒 统计一次各个渠道的个数 table api 解决 //1 groupby 2 要用 window 3 用eventtime来确定开窗时间 val resultTable: Table = startupTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count) // 通过sql 进行操作 val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from "+startupTable+" group by ch ,Tumble(ts,interval '10' SECOND )") //把Table转化成数据流 //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)] val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)] resultDstream.filter(_._1).print() env.execute() }
相关文章推荐
- Flink Table API和SQL实践
- FlinkSQL/Table API通过JDBCAppendTableSink方式来实现存入到MySQL
- Flink Table & SQL API--动态表与Redo和Undo
- [从0到1学习Flink]---- 2.2 DataSet API & DataStream API & Table & SQL API 的对比
- Apache Flink:Table API和SQL发展现状概述
- flink table api usage.
- Flink-Table-SQL系列之source
- 《Microsoft Sql server 2008 Internals》读书笔记--第五章Table(3)
- Last_SQL_Error: Could not executeUpdate_rows event on table eip_fileservice.T_FILE_LOCATION; Can't f
- Spark SQL利器:cacheTable/uncacheTable
- QSqlRelationalTableModel使用方法
- SQL Server 关于 Table 字典数据的查询SQL
- SQL to select a random row from a database table
- SQL Tips: How to detect duplicate rows in table
- 在线API,桌面版,jquery,css,Android中文开发文档,JScript,SQL掌用实例
- QT笔记:数据库总结(三)之SQL模型类-QSqlTableModel模型
- 《Spark Python API 官方文档中文版》 之 pyspark.sql (四)
- QtCreator 使用SQL模型类(QSqlQueryModel,QSqlTableModel,QSqlRelationalTableModel)
- SQL_XML 转化为标准Table格式
- SQL ALTER TABLE 语法