您的位置:首页 > 数据库 > MySQL

FlinkSQL/Table API通过JDBCAppendTableSink方式来实现存入到MySQL

2019-06-13 18:42 2823 查看

这种方式实现存到MySQL,需要注意一些问题。
另外,需要添加 flink-jdbc 这个依赖。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class MainDemo {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

Kafka kafka = new Kafka()
.version("0.10")
.topic("kafka")
.property("bootstrap.servers", "192.168.219.132:9092")
.property("zookeeper.connect", "192.168.219.132:2181");
tableEnv.connect(kafka)
.withFormat(
new Json().failOnMissingField(true).deriveSchema()
)
.withSchema(
new Schema()
.field("id", Types.INT)
.field("iname", Types.STRING)
.field("sex", Types.STRING)
.field("score", Types.INT)//这里定义为float时,会重复消费一条数据,而且不会存到MySQL,像是程序一直死循环在那里了
)
.inAppendMode()
.registerTableSource("test1");

String query = "select id,iname,sex,score from test1";
Table result = tableEnv.sqlQuery(query);

JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost:3306/flink")
.setUsername("root")
.setPassword("123456")
.setParameterTypes(
new TypeInformation[] { BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO })
.setQuery("REPLACE INTO test2 (id,iname,sex,score) VALUES(?,?,?,?)")
.build();

DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);

stream.print();
sink.emitDataStream(stream);

env.execute();
}
}

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: