云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战007-DataStream与MySql自定义sink和source(Scala版)002
2017-11-21 09:11
1046 查看
三、自定义source
1.source主程序
package code.book.stream.customsinkandsource.jdbc.scala import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext class StudentSourceFromMysql extends RichSourceFunction[Student] { private var connection: Connection = null private var ps: PreparedStatement = null /** * 一、open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。 */ override def open(parameters: Configuration): Unit = { super.open(parameters) val driver = "com.mysql.jdbc.Driver" val url = "jdbc:m c417 ysql://qingcheng11:3306/flinktest" val username = "root" val password = "qingcheng" //1.加载驱动 Class.forName(driver) //2.创建连接 connection = DriverManager.getConnection(url, username, password) //3.获得执行语句 val sql = "select stuid,stuname,stuaddr,stusex from Student;" ps = connection.prepareStatement(sql) } /** * 二、DataStream调用一次run()方法用来获取数据 */ override def run(sourceContext: SourceContext[Student]): Unit = { try { //4.执行查询,封装数据 val resultSet = ps.executeQuery() while (resultSet.next()) { val student = Student(resultSet.getInt("stuid"), resultSet.getString("stuname").trim, resultSet.getString("stuaddr").trim, resultSet.getString("stusex").trim) sourceContext.collect(student) } } catch { case e: Exception => println(e.getMessage) } } override def cancel(): Unit = { } /** * 三、 程序执行完毕就可以进行,关闭连接和释放资源的动作了 */ override def close(): Unit = { //5.关闭连接和释放资源 super.close() if (connection != null) { connection.close() } if (ps != null) { ps.close() } } }
2.source测试程序
package code.book.stream.customsinkandsource.jdbc.scala import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} object StudentSourceFromMysqlTest { def main(args: Array[String]): Unit = { //1.创建流执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.从自定义source中读取数据 val dataStream: DataStream[Student] = env.addSource(new StudentSourceFromMysql) //3.显示结果 dataStream.print() //4.触发流执行 env.execute() } }
3.source测试效果
能够正确查询出mysql中的数据。
相关文章推荐
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战008-DataStream与MySql自定义sink和source(Scala版)003
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战006-DataStream与MySql自定义sink和source(Scala版)001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战010-DataStream与MySql自定义sink和source(Java版)002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战011-DataStream与MySql自定义sink和source(Java版)003
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战009-DataStream与MySql自定义sink和source(Java版)001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战013-Flink在流处理中常见的sink和source002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战012-Flink在流处理中常见的sink和source001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战004-Flink基于流的window操作002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战003-Flink基于流的window操作001
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战007--DateSet实用API详解007
- 【云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战001-Flink基于流的wordcount示例001
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战005-Flink基于流的window操作003
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战002--DateSet实用API详解002
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战002-Flink基于流的wordcount示例002
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战027--DateSet实用API详解027
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战017--DateSet实用API详解017
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战023--DateSet实用API详解023
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战018--DateSet实用API详解018
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战008--DateSet实用API详解008
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战014--DateSet实用API详解014