FlinkSQL/Table API通过JDBCAppendTableSink方式来实现存入到MySQL
2019-06-13 18:42
2823 查看
这种方式实现存到MySQL,需要注意一些问题。
另外,需要添加 flink-jdbc 这个依赖。
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class MainDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(5000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka() .version("0.10") .topic("kafka") .property("bootstrap.servers", "192.168.219.132:9092") .property("zookeeper.connect", "192.168.219.132:2181"); tableEnv.connect(kafka) .withFormat( new Json().failOnMissingField(true).deriveSchema() ) .withSchema( new Schema() .field("id", Types.INT) .field("iname", Types.STRING) .field("sex", Types.STRING) .field("score", Types.INT)//这里定义为float时,会重复消费一条数据,而且不会存到MySQL,像是程序一直死循环在那里了 ) .inAppendMode() .registerTableSource("test1"); String query = "select id,iname,sex,score from test1"; Table result = tableEnv.sqlQuery(query); JDBCAppendTableSink sink = JDBCAppendTableSink.builder() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/flink") .setUsername("root") .setPassword("123456") .setParameterTypes( new TypeInformation[] { BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO }) .setQuery("REPLACE INTO test2 (id,iname,sex,score) VALUES(?,?,?,?)") .build(); DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class); stream.print(); sink.emitDataStream(stream); env.execute(); } }
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/
相关文章推荐
- JSP通过JDBC连接各种数据库的连接方式总结。(MySql、Orcal、SqlServer、DB2等数据库)
- Java基础---“接口”实现时的另一种方式。通过Java JDK API 1.6.0文档实例发现并得出结论
- linux mysql新建用户及让该用户可以在另外一台电脑通过sqlyong,jdbc方式访问设置说明
- Mybatis Generator的model生成中文注释,支持oracle和mysql(通过修改源码的方式来实现)
- Java通过JsApi方式实现微信支付
- java通过JDBC获取MySQL的数据实现
- pl/sql通过修改配置文件的方式实现数据库的连接
- Java通过JsApi方式实现微信支付
- android(13)(sqlite的CRUD使用SQL和API两种方式实现以及Linearlayout的列表展示)
- mysql、ms sql、oracle3中连接jdbc的方式
- Spark SQL通过JDBC连接MySQL读写数据
- lua数据库链接--mysql完整实现和luasql驱动API说明
- 通过SQL Server Agent方式实现数据库自动备份!
- Oracle中实现MySQL show index from table命令SQL脚本分享
- 在vc中通过连接池操作mysql(api方式),附c++访问mysql的封装类
- Vert-x-通过异步的方式使用JDBC连接SQL
- mysql通过gitd方式实现多线程主从复制
- JDBC简介,JDBC API,,MySQL连接、SQL语句
- Spark SQL通过JDBC连接MySQL读写数据实例(比较过时了)
- Java通过JDBC实现对数据库的增删查改(预编译方式)