您的位置:首页 > 数据库 > MySQL

Flink之异步I/O案例(二)异步查询MySQL数据库

2020-04-04 07:13 4978 查看

文章目录

  • 二、代码实现
  • 四、技术点
  • 一、需求思考

    1、通过异步查询Mysql中的以下数据:

    2、思考

    Mysql不支持异步查询,那该怎么办呢?

    • 创建线程池和数据库连接池,来实现异步的并发查询。
    • 这样异步查询中,一个请求就是一个线程,一个请求对应一个连接。

    二、代码实现

    1、添加依赖
    • 使用的是阿里的 druid 数据库连接池
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.20</version>
    </dependency>
    2、主线代码
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.AsyncDataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.concurrent.TimeUnit;
    
    /**
    * @date: 2020/3/11 22:51
    * @site: www.ianlou.cn
    * @author: lekko 六水
    * @qq: 496208110
    */
    public class AsyncQueryFromMySQL {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);
    
    int capacity = 20;
    
    SingleOutputStreamOperator<Tuple2<String, String>> result =
    AsyncDataStream.orderedWait(
    lines,                              //要处理的数据流
    new MySQLAsyncFunction(capacity),   //异步查询函数的具体执行实例
    3000,                             //超时时间
    TimeUnit.MILLISECONDS,             //时间单位
    capacity                          //最大异步并发请求数量
    );
    
    result.print();
    
    env.execute("AsyncQueryFromMySQL");
    }
    }
    3、异步查询的MySql的Function
    import com.alibaba.druid.pool.DruidDataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.Collections;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Future;
    import java.util.function.Supplier;
    import static java.util.concurrent.Executors.newFixedThreadPool;
    
    /**
    * @date: 2020/3/11 22:57
    * @site: www.ianlou.cn
    * @author: lekko 六水
    * @qq: 496208110
    */
    public class MySQLAsyncFunction extends RichAsyncFunction<String, Tuple2<String, String>> {
    // 加上transient,不让其序列化
    private transient ExecutorService executorService = null;
    private transient DruidDataSource dataSource = null;
    
    private int maxConn;
    
    public MySQLAsyncFunction(int maxConn) {
    this.maxConn = maxConn;
    }
    
    //创建线程池、Mysql连接池
    @Override
    public void open(Configuration parameters) throws Exception {
    //创建线程池
    executorService = newFixedThreadPool(maxConn);
    
    //创建连接池(异步I/O 一个请求就是一个线程,一个请求对应一个连接)
    dataSource = new DruidDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUsername("root");
    dataSource.setPassword("123456");
    dataSource.setUrl("jdbc:mysql://localhost:3306/day01?characterEncoding=utf8");
    dataSource.setMaxActive(maxConn);
    }
    
    @Override
    public void asyncInvoke(String id, ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
    
    //将一个查询请求丢入到线程池中
    Future<String> future = executorService.submit(new Callable<String>() { // Callable是有返回值的submit
    @Override
    public String call() throws Exception {
    return queryFromMySql(id);
    }
    });
    
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
    try {
    return future.get();
    } catch (Exception e) {
    return null;
    }
    }
    });
    
    // 返回最终的resultFuture给方法
    cf.thenAccept((String result) -> {
    resultFuture.complete(Collections.singleton(Tuple2.of(id, result)));
    });
    }
    
    // 单独查询的方法
    private String queryFromMySql(String param) throws SQLException {
    String sql = "SELECT id, name FROM orde WHERE id = ?";
    String result = null;
    Connection connection = null;
    PreparedStatement stmt = null;
    ResultSet rs = null;
    try {
    connection = dataSource.getConnection();
    stmt = connection.prepareStatement(sql);
    stmt.setString(1, param);
    rs = stmt.executeQuery();
    while (rs.next()) {
    result = rs.getString("name");
    }
    } finally {
    if (rs != null) {
    rs.close();
    }
    if (stmt != null) {
    stmt.close();
    }
    if (connection != null) {
    connection.close();
    }
    }
    return result;
    }
    
    @Override
    public void close() throws Exception {
    dataSource.close();
    executorService.shutdown();
    }
    }

    四、技术点

    • Flink的异步I/O
    • Druid 连接池的使用
    • 从Mysql中查询数据
    • 点赞
    • 收藏
    • 分享
    • 文章举报
    IT_但丁 发布了29 篇原创文章 · 获赞 8 · 访问量 2436 私信 关注
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: