Flink之异步I/O案例(二)异步查询MySQL数据库
2020-04-04 07:13
4978 查看
文章目录
一、需求思考
1、通过异步查询Mysql中的以下数据:
2、思考
Mysql不支持异步查询,那该怎么办呢?
- 创建线程池和数据库连接池,来实现异步的并发查询。
- 这样异步查询中,一个请求就是一个线程,一个请求对应一个连接。
二、代码实现
1、添加依赖
- 使用的是阿里的 druid 数据库连接池
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.20</version> </dependency>
2、主线代码
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; /** * @date: 2020/3/11 22:51 * @site: www.ianlou.cn * @author: lekko 六水 * @qq: 496208110 */ public class AsyncQueryFromMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("linux01", 8888); int capacity = 20; SingleOutputStreamOperator<Tuple2<String, String>> result = AsyncDataStream.orderedWait( lines, //要处理的数据流 new MySQLAsyncFunction(capacity), //异步查询函数的具体执行实例 3000, //超时时间 TimeUnit.MILLISECONDS, //时间单位 capacity //最大异步并发请求数量 ); result.print(); env.execute("AsyncQueryFromMySQL"); } }
3、异步查询的MySql的Function
import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.function.Supplier; import static java.util.concurrent.Executors.newFixedThreadPool; /** * @date: 2020/3/11 22:57 * @site: www.ianlou.cn * @author: lekko 六水 * @qq: 496208110 */ public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> { // 加上transient,不让其序列化 private transient ExecutorService executorService = null; private transient DruidDataSource dataSource = null; private int maxConn; public MySQLAsyncFunction(int maxConn) { this.maxConn = maxConn; } //创建线程池、Mysql连接池 @Override public void open(Configuration parameters) throws Exception { //创建线程池 executorService = newFixedThreadPool(maxConn); //创建连接池(异步I/O 一个请求就是一个线程,一个请求对应一个连接) dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setUrl("jdbc:mysql://localhost:3306/day01?characterEncoding=utf8"); dataSource.setMaxActive(maxConn); } @Override public void asyncInvoke(String id, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { //将一个查询请求丢入到线程池中 Future<String> future = executorService.submit(new Callable<String>() { // Callable是有返回值的submit @Override public String call() throws Exception { return queryFromMySql(id); } }); CompletableFuture<String> cf = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return future.get(); } catch (Exception e) { return null; } } }); // 返回最终的resultFuture给方法 cf.thenAccept((String result) -> { resultFuture.complete(Collections.singleton(Tuple2.of(id, result))); }); } // 单独查询的方法 private String queryFromMySql(String param) throws SQLException { String sql = "SELECT id, name FROM orde WHERE id = ?"; String result = null; Connection connection = null; PreparedStatement stmt = null; ResultSet rs = null; try { connection = dataSource.getConnection(); stmt = connection.prepareStatement(sql); stmt.setString(1, param); rs = stmt.executeQuery(); while (rs.next()) { result = rs.getString("name"); } } finally { if (rs != null) { rs.close(); } if (stmt != null) { stmt.close(); } if (connection != null) { connection.close(); } } return result; } @Override public void close() throws Exception { dataSource.close(); executorService.shutdown(); } }
四、技术点
- Flink的异步I/O
- Druid 连接池的使用
- 从Mysql中查询数据
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- JSP案例_利用JBDC连接Mysql数据库,并查询数据
- php中mysql数据库异步查询实现
- C/C++连接MySQL数据库和查询操作案例!!
- php中mysql数据库异步查询实现
- 使用JDBC连接MySQL数据库--典型案例分析(八)----实现员工数据的分页查询
- Mybatis+MySQL动态分页查询数据经典案例(含代码以及测试)
- KendoUI Grid 前后端(Java) 完整案例之查询翻页
- 韩顺平 Mysql数据库优化(三) 慢查询
- Oracle数据库经典案例之学生选课四表联合查询
- 如何提高MYSQL数据库的查询统计速度 select 索引应用
- mysql数据库 查询 比较 日期时间段的方法 多条件查找判断
- SqlServer--查询案例
- php查询mysql数据库并将结果保存到数组的方法
- mysql数据库查询某个时间段内数据的方法
- Mysql数据库对varchar类型字段进行条件查询时结果相关问题
- MySql数据库,对varchar类型字段str进行where str=0条件查询时,查询结果是什么
- 一个通过dblink查询的优化案例(去掉filter)
- Java对MySQL数据库进行连接、查询和修改【转载】
- AsyncTask的案例 异步下载图片
- java程序员第十六课 -MySQL数据库(多表的查询)