您的位置:首页 > 数据库 > MySQL

云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战007-DataStream与MySql自定义sink和source(Scala版)002

2017-11-21 09:11 1046 查看

三、自定义source

1.source主程序

package code.book.stream.customsinkandsource.jdbc.scala

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext

class StudentSourceFromMysql extends RichSourceFunction[Student] {
private var connection: Connection = null
private var ps: PreparedStatement = null
/**
* 一、open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
*/
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:m
c417
ysql://qingcheng11:3306/flinktest"
val username = "root"
val password = "qingcheng"
//1.加载驱动
Class.forName(driver)
//2.创建连接
connection = DriverManager.getConnection(url, username, password)
//3.获得执行语句
val sql = "select stuid,stuname,stuaddr,stusex from Student;"
ps = connection.prepareStatement(sql)
}
/**
* 二、DataStream调用一次run()方法用来获取数据
*/
override def run(sourceContext: SourceContext[Student]): Unit = {
try {
//4.执行查询,封装数据
val resultSet = ps.executeQuery()
while (resultSet.next()) {
val student = Student(resultSet.getInt("stuid"), resultSet.getString("stuname").trim,
resultSet.getString("stuaddr").trim, resultSet.getString("stusex").trim)
sourceContext.collect(student)
}
} catch {
case e: Exception => println(e.getMessage)
}
}

override def cancel(): Unit = {
}

/**
* 三、 程序执行完毕就可以进行,关闭连接和释放资源的动作了
*/
override def close(): Unit = {
//5.关闭连接和释放资源
super.close()
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
}


2.source测试程序

package code.book.stream.customsinkandsource.jdbc.scala

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object StudentSourceFromMysqlTest {
def main(args: Array[String]): Unit = {
//1.创建流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2.从自定义source中读取数据
val dataStream: DataStream[Student] = env.addSource(new StudentSourceFromMysql)

//3.显示结果
dataStream.print()

//4.触发流执行
env.execute()
}
}


3.source测试效果

能够正确查询出mysql中的数据。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐