FlinkSQL写入Kafka/ES/MySQL示例-JAVA
2021-06-18 16:53
1066 查看
一、背景说明
Flink的API做了4层的封装,上两层TableAPI、SQL语法相对简单便于编写,面对小需求可以快速上手解决,本文参考官网及部分线上教程编写source端、sink端代码,分别读取socket、kafka及文本作为source,并将流数据输出写入Kafka、ES及MySQL,方便后续查看使用。
二、代码部分
说明:这里使用connect及DDL两种写法,connect满足Flink1.10及以前版本使用,目前官方文档均是以DDL写法作为介绍,建议1.10以后的版本使用DDL写法操作,通用性更强。
1.读取(Source)端写法
1.1 基础环境建立,方便演示并行度为1且不设置CK
//建立Stream环境,设置并行度为1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); //建立Table环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
1.2 读取Socket端口数据,并使用TableAPI及SQL两种方式查询
//读取服务器9999端口数据,并转换为对应JavaBean SingleOutputStreamOperator<WaterSensor> mapDS = env.socketTextStream("hadoop102", 9999) .map(value -> { String[] split = value.split(","); return new WaterSensor(split[0] , Long.parseLong(split[1]) , Integer.parseInt(split[2]));}); //创建表:将流转换成动态表。 Table table = tableEnv.fromDataStream(mapDS); //对动态表进行查询,TableAPI方式 Table selectResult = table.where($("id").isEqual("ws_001")).select($("id"), $("ts"), $("vc")); //对动态表镜像查询,SQL方式-未注册表 Table selectResult = tableEnv.sqlQuery("select * from " + table);
1.3 读取文本(FileSystem)数据,并使用TableAPI进行查询
//Flink1.10写法使用connect方式,读取txt文件并建立临时表 tableEnv.connect(new FileSystem().path("input/sensor.txt")) .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n")) .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .createTemporaryTable("sensor"); //转换成表对象,对表进行查询。SQL写法参考Socket段写法 Table table = tableEnv.from("sensor"); Table selectResult = table.groupBy($("id")).aggregate($("id").count().as("id_count"))select($("id"), $("id_count"));
1.4 消费Kafka数据,并使用TableAPI进行查询,分别用conncet及DDL写法
//Flink1.10写法使用connect方式,消费kafka对应主题并建立临时表 tableEnv.connect(new Kafka().version("universal") .topic("sensor") .startFromLatest() .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092") .property(ConsumerConfig.GROUP_ID_CONFIG,"BD"))//消费者组 .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) 1044 .withFormat(new Csv()) .createTemporaryTable("sensor"); //Flink1.10以后使用DDL写法 tableEnv.executeSql("CREATE TABLE sensor (" + " `id` STRING," + " `ts` BIGINT," + " `vc` INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'sensor'," + " 'properties.bootstrap.servers' = 'hadoop102:9092'," + " 'properties.group.id' = 'BD'," + " 'scan.startup.mode' = 'latest-offset'," + " 'format' = 'csv'" + ")"); //转换成表对象,对表进行查询。SQL写法参考Socket段写法 Table table = tableEnv.from("sensor"); Table selectResult = table.groupBy($("id")).aggregate($("id").count().as("id_count")) .select($("id"), $("id_count"));
2.写入(Sink)端部分写法
2.1 写入文本文件
//创建表:创建输出表,connect写法 tableEnv.connect(new FileSystem().path("out/sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .createTemporaryTable("sensor"); //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表 selectResult.executeInsert("sensor");
2.2 写入Kafka
//connect写法 tableEnv.connect(new Kafka().version("universal") .topic("sensor") .sinkPartitionerRoundRobin() //轮询写入 .property(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")) .withSchema(new Schema().field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .withFormat(new Json()) .createTemporaryTable("sensor"); //DDL写法 tableEnv.executeSql("CREATE TABLE sensor (" + " `id` STRING," + " `ts` BIGINT," + " `vc` INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'sensor'," + " 'properties.bootstrap.servers' = 'hadoop102:9092'," + " 'format' = 'json'" + ")"); //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表 selectResult.executeInsert("sensor");
2.3 写入MySQL(JDBC方式,这里手动导入了mysql-connector-java-5.1.9.jar)
//DDL tableEnv.executeSql("CREATE TABLE sink_sensor (" + " id STRING," + " ts BIGINT," + " vc INT," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://hadoop102:3306/test?useSSL=false'," + " 'table-name' = 'sink_test'," + " 'username' = 'root'," + " 'password' = '123456'" + ")"); //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表 selectResult.executeInsert("sensor");
2.4 写入ES
//connect写法 tableEnv.connect(new Elasticsearch() .index("sensor") .documentType("_doc") .version("7") .host("localhost",9200,"http") //设置为1,每行数据都写入是方便客户端输出展示,生产勿使用 .bulkFlushMaxActions(1)) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("ts", DataTypes.BIGINT()) .field("vc",DataTypes.INT())) .withFormat(new Json()) .inAppendMode() .createTemporaryTable("sensor"); //DDL写法 tableEnv.executeSql("CREATE TABLE sensor (" + " id STRING," + " ts BIGINT," + " vc INT," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'elasticsearch-7'," + " 'hosts' = 'http://localhost:9200'," + " 'index' = 'users'," + " 'sink.bulk-flush.max-actions' = '1')";) //将数据写入到输出表中即实现sink写入,selectResult则是上面source侧查询出来的结果表 selectResult.executeInsert("sensor");
三、补充说明
依赖部分pom.xml
<properties> <java.version>1.8</java.version> <maven.com 564 piler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <flink.version>1.12.0</flink.version> <scala.version>2.12</scala.version> <hadoop.version>3.1.3</hadoop.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.8.0</version> </dependency> <!-- elasticsearch 的客户端 --> <dependency> <groupId>org.elasticsearch.client</groupId> ad8 <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.8.0</version> </dependency> <!-- elasticsearch 依赖 2.x 的 log4j --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.9</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>学习交流,有任何问题还请随时评论指出交流。
相关文章推荐
- Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
- 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
- Flink读取Kafka 消息并批量写入到 MySQL8.0
- MySql系列:中文写入数据库出现错误java.sql.SQLException: Incorrect string value: '\xE5\xxxx' for column 'xxxx' at row 1及其解决方法
- 0.3 Flink--Kafka2Es之写入Es
- MySql系列:中文写入数据库出现错误java.sql.SQLException: Incorrect string value: '\xE5\xxxx' for column 'xxxx' at r
- flinksql处理消费kafka的json数据保存到mysql
- java 通过poi 读取Excel 写入sqlser mysql
- 构建一个flink程序,从kafka读取然后写入MYSQL
- MySQL 连接问题java.sql.SQLException: Communication failure during handshak.
- kafka java示例
- PHP之Mysql常用SQL语句示例的深入分析
- MYSQL维护-java.sql.SQLException: null, message server: "Host 'ora-rac2' is blocked because of many co
- java.sql.SQLException: Access denied for user 'root'@'localhost' (using password: NO) at com.mysql
- 使用jdbc连接mysql 出现 java.sql.Exception: this specifer does...(root@'%')..doesn't exist.
- java连接mysql示例代码
- Java Swing中读取/写入图片到MySQL中类型转换
- 使用java连接Mysql 和Using JDBC Statement Objects to Execute SQL
- Java对象在Mysql的写入读取
- Type mismatch: cannot convert from java.sql.PreparedStatement to com.mysql.jdbc.PreparedStatement