您的位置:首页 > 运维架构 > Apache

Apache Calcite精简入门与学习指导

2021-02-27 02:41 916 查看

1 Apache Calcite基本介绍

Apache Calcite是一个动态数据管理框架,它包含了许多典型数据库管理系统的部分,但省略了一些关键功能:数据存储、数据处理算法和元数据存储。

基于Apache Calcite,我们可以为任何第三方存储引擎开发SQL查询引擎。

  • 官网地址

https://calcite.apache.org/

  • 项目地址

https://github.com/apache/calcite

2 Apache Calcite学习指导

要想了解Calcite,其实官方文档确实不妨一看。尽管官方文档会提及非常多你之前可能没有接触过的概念,但好在它文档内容不多,这样让你对SQL执行中可能涉及的一些关键术语留下一个印象,那对以后深入学习和使用Calcite还是有帮助的,毕竟如果真的想用好Calcite,或者说只是使用Calcite,这些关键术语都是需要掌握和理解的。

不过,仅仅看官方文档,那还是远远不够的。回过头再来看Calcite的文档时,你会发现,它完全是写给“高端玩家”的,它是对Calcite高度抽象总结的文档,并不是写给初学者来进行学习的,以至于你想通过官方文档来跑个QuickStart,那也是相当困难,个人觉得没有一定的折腾能力或者对SQL执行没有理解经验的话,确实不太容易达成。因此,不能仅仅只看官方文档,你还需要通过其它途径获取更多关于它的信息,关于初学者如何快速掌握Apache Calcite,以下是我个人的一些心得体会:

  • 1.先简单用起来

    Calcite作为一个数据管理框架,首先,你得把它用起来才能慢慢理解它到底是干嘛的。理论上,通过Calcite这个组件,你可以以SQL的方式来访问任何你想要访问的数据源,比如你的文件(不管你是什么格式)、Java对象、第三方存储引擎(Redis、Kafka、Elasticsearch)等,所以我是用了“任何”来说明它的能力,这是它实实在在存在的能力。

    本文档会手把手教你,怎么样通过Calcite以SQL的方式来访问CSV文件、Java内部对象、Elasticsearch数据源。

  • 2.生产使用与思考

    所以一旦你知道Calcite可以通过SQL的方式来访问任何的数据源之后,我知道有想法的同学已经会考虑到:

    (1)那假如在我的业务系统中,有各种各样不同的存储系统,是不是可以通过Calcite来构建一个统一的数据查询系统(一个查询入口,同时查询多种不同数据源)?

使用者不需要感知数据存储在哪里,在他们看来,这就是一个只提供SQL查询入口的查询系统,它屏蔽了它所接入的各种存储系统的差异;

  • (2)假如业务存在一个流行的数据存储系统或者引擎,但它不支持SQL查询,我是不是可以借用Calcite来为它开发一个SQL引擎?

答案是肯定的,Calcite是一个组件,本质上也是一个框架,它提供了各种扩展点来让你实现这样的功能。

当然如果你想借用Calcite针对某个存储系统开发一个好的SQL引擎,还是需要相当大的努力的,比如VolcanoPlanner就需要好好理解下,比较可惜的是,直到现在我也没有精力去研究它,以至于我想为Elasticsearch开发一个SQL引擎的想法都迟迟未能实现。

所谓的“借用Calcite针对某个存储系统开发一个好的SQL引擎”,其实在Calcite里有一个专业的术语,叫做“数据源适配器”。

Calcite本身也提供了多个存储引擎的适配器,比如Druid、Elasticsearch、SparkSQL等等,当然开源的就并不一定得,前面之所以一直提及要重新写一个Elasticsearch的适配器,是因为我觉得Calcite本身提供的ES适配器能力比较弱,相信用过的同学都会有所体会。

  • 3.深度使用与思考

    实际上如果只是想知道Calcite怎么使用的,有哪些功能可以使用的,我们不妨站在巨人的肩膀上,看看业界的开源项目是怎么使用它的。

    一个不错的参考是Apache Druid,其SQL引擎正是基于Apache Calcite来开发构建的,因此对于Calcite更多高级功能的使用,我们不妨去研究一下Apache Druid-SQL模块的源码,相信会有非常大的收获。

  • 4.VolcanPlanner

    有时间和精力研究一下其在Calcite的实现,个人觉得会非常不错。

  • 本文档会手把手教你,怎么样通过Calcite以SQL的方式来访问CSV文件、Java内部对象、Elasticsearch数据源。

    对于Calcite更多的实现细节,还是自己想办法根据实际应用场景,去思考一下它的各个模块功能,比如想了解某一个功能原理,就去看其源码结构和细节,我相信这本身对个人能力的提升都是极其有帮助的。

    3 通过Apache Calcite接入不同数据源

    先构建一个maven项目,然后引入Calcite的依赖:

    <dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <version>1.20.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-example-csv</artifactId>
    <version>1.20.0</version>
    </dependency>
    <dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-elasticsearch</artifactId>
    <version>1.20.0</version>
    </dependency>

    3.1 接入CSV数据源

    先准备一个CSV文件:

    EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date
    100,"Fred",10,,,30,25,true,false,"1996-08-03"
    110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01"
    110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03"
    120,"Wilma",20,"F",,1,5,,true,"2005-09-07"
    130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"

    Calcite会把每个csv文件映射成一个SQL表。csv文件表头指定该列数据类型,根据一定规则映射到对应的SQL类型。如没有指定,则统一映射成VARCHAR。

    文件命名为depts.csv,Caclite会构建表名为文件名的table,即depts.

    然后编写下面的代码通过Calcite以SQL方式访问数据:

    // Author: xpleaf
    public class CsvDemo {
    
    public static void main(String[] args) throws Exception {
    // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了
    String path = Objects.requireNonNull(CsvDemo.class.getClassLoader().getResource("csv").getPath());
    
    // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
    CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE);
    
    // 2.构建Connection
    // 2.1 设置连接参数
    Properties info = new Properties();
    // 不区分sql大小写
    info.setProperty("caseSensitive", "false");
    // 2.2 获取标准的JDBC Connection
    Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
    // 2.3 获取Calcite封装的Connection
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
    
    // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
    // 以实现查询不同数据源的目的
    Sch
    8000
    emaPlus rootSchema = calciteConnection.getRootSchema();
    
    // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema
    rootSchema.add("csv", csvSchema);
    
    // 5.执行SQL查询,通过SQL方式访问csv文件
    String sql = "select * from csv.depts";
    Statement statement = calciteConnection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    
    // 6.遍历打印查询结果集
    System.out.println(ResultSetUtil.resultString(resultSet));
    }
    
    }

    执行代码,其输出结果如下:

    100, Fred, 10, , , 30, 25, true, false, 1996-08-03
    110, Eric, 20, M, San Francisco, 3, 80, null, false, 2001-01-01
    110, John, 40, M, Vancouver, 2, null, false, true, 2002-05-03
    120, Wilma, 20, F, , 1, 5, null, true, 2005-09-07
    130, Alice, 40, F, Vancouver, 2, null, false, true, 2007-01-01

    思考:

    csv是官方文档有提及的一个例子,在整体上如果需要对Calcite源码的使用有一个认识(尤其是如何开发适配器),可以基于这个demo,对照文档提及的各个概念、类,通过分析源码来进行理解,比如:

    • 1.Schema是怎么构建的,在Calcite的位置和具体作用是什么;
    • 2.Table是怎么构建的,在Calcite的位置和具体作用是什么;
    • 3.在执行查询时是如何做SQL Parse、Validate、Optimize和执行的;

    你都可以通过这个demo来一探究竟,当然,虽然我这里短短几句话带过,实际上如果你想研究这个过程,可能需要花费你较多时间,我建议不急着步子一下跨得太大,慢慢来,不急的。

    另外,其实通过官方文档的介绍,对于怎么去开发一个Caclite的数据源适配器,应该也是有一定的体会的,其实如果只是实现一个简单的适配器(不考虑太多的SQL优化规则),那这个难度还是不大的。

    我通过这个例子,包括后面的几个例子,其实都是想告诉你,如何快速使用Calcite(也就是相当给你写了一个QuickStart),从而对Calcite整体使用有一个认识,如果你想更深度使用Calcite,建议:

    • 1.不妨看看Calcite源码里面的UT,里面提供了很好的参考案例;
    • 2.但方式1可能会比较零散,你可以研读一下Apache Druid-SQL模块的源码,看一下整体上它是怎么使用的,它里面的许多高级使用技巧和方法还是十分有借鉴意义的,不妨看看。

    3.2 接入Object对象数据源

    3.2.1 SparkSQL接入Object对象

    有用过SparkSQL的同学会知道,在SparkSQL中,可以使用编程的方式来将对象实例转换为DataFrame,进而注册Table,以通过SQL来访问这些对象实例:

    public class _01SparkRDD2DataFrame {
    public static void main(String[] args) {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
    SparkConf conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName(_01SparkRDD2DataFrame.class.getSimpleName())
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .registerKryoClasses(new Class[]{Person.class});
    JavaSparkContext jsc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(jsc);
    List<Person> persons = Arrays.asList(
    new Person(1, "name1", 25, 179),
    new Person(2, "name2", 22, 176),
    new Person(3, "name3", 27, 178),
    new Person(1, "name4", 24, 175)
    );
    
    DataFrame df = sqlContext.createDataFrame(persons, Person.class);   // 构造方法有多个,使用personsRDD的方法也是可以的
    
    // where age > 23 and height > 176
    df.select(new Column("id"),
    new Column("name"),
    new Column("age"),
    new Column("height"))
    .where(new Column("age").gt(23).and(new Column("height").lt(179)))
    .show();
    
    df.registerTempTable("person");
    
    sqlContext.sql("select * from person where age > 23 and height < 179").show();
    
    jsc.close();
    
    }
    }

    以上代码例子来自xpleaf的文章《Spark SQL笔记整理(二):DataFrame编程模型与操作案例》

    注意这里给出的案例还是Spark 1.x的用法,Spark 2.x以及之后的版本则可能不推荐这种用法了,具体请参考Spark的官方文档。

    3.2.2 Calcite接入Object对象

    那么对应到Calcite,它也提供了类似的方式来通过SQL访问对象实例数据。

    为了进行演示,我们先构建Object对象类:

    public class HrSchema {
    public final Employee[] emps = {
    new Employee(100, 10, "Bill", 10000, 1000),
    new Employee(200, 20, "Eric", 8000, 500),
    new Employee(150, 10, "Sebastian", 7000, null),
    new Employee(110, 10, "Theodore", 11500, 250),
    };
    
    @Override
    public String toString() {
    return "HrSchema";
    }
    
    public static class Employee {
    public int empid;
    public int deptno;
    public String name;
    public float salary;
    public Integer commission;
    
    public Employee(int empid, int deptno, String name, float salary,
    Integer commission) {
    this.empid = empid;
    this.deptno = deptno;
    this.name = name;
    this.salary = salary;
    this.commission = commission;
    }
    
    @Override
    public String toString() {
    return "Employee [empid: " + empid + ", deptno: " + deptno
    + ", name: " + name + "]";
    }
    
    @Override
    public boolean equals(Object obj) {
    return obj == this
    || obj instanceof Employee
    && empid == ((Employee) obj).empid;
    }
    }
    }

    Calcite会将HrSchema的emps映射为一张表。

    编写Calcite代码如下:

    public class ObjectDemo {
    
    public static void main(String[] args) throws Exception {
    // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
    ReflectiveSchema reflectiveSchema = new ReflectiveSchema(new HrSchema());
    
    // 2.构建Connection
    // 2.1 设置连接参数
    Properties info = new Properties();
    // 不区分sql大小写
    info.setProperty("caseSensitive", "false");
    // 2.2 获取标准的JDBC Connection
    Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
    // 2.3 获取Calcite封装的Connection
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
    
    // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
    // 以实现查询不同数据源的目的
    SchemaPlus rootSchema = calciteConnection.getRootSchema();
    
    // 4.将不同数据源schema挂载到RootSchema,这里添加ReflectiveSchema
    rootSchema.add("hr", reflectiveSchema);
    
    // 5.执行SQL查询,通过SQL方式访问object对象实例
    String sql = "select * from hr.emps";
    Statement statement = calciteConnection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    
    // 6.遍历打印查询结果集
    System.out.println(ResultSetUtil.resultString(resultSet));
    }
    
    }

    执行代码,其输出结果如下:

    100, 10, Bill, 10000.0, 1000
    200, 20, Eric, 8000.0, 500
    150, 10, Sebastian, 7000.0, null
    110, 10, Theodore, 11500.0, 250

    一般在使用Calcite构建统一查询系统时,Object对象表会被用于构建数据表的元数据信息表(即表有哪些字段、字段的类型、用于构建数据表的元数据信息)等,详情可以参考Apache Druid-SQL源码。

    3.3 接入Elasticsearch数据源

    3.3.1 Elasticsearch极快速入门

    不用有压力,如果你之前完全没有接触过Elasticsearch,也不用担心学习成本的问题,你就完全可以把它简单理解为一个数据库就好了,不用想那么复杂,并且,它开箱即用,没有任何部署成本。

    下载:

    https://www.elastic.co/cn/downloads/elasticsearch

    根据对应的操作系统下载相应的版本就可以。

    下载完成后,解压,进入bin目录,执行

    elasticsearch.bat
    elasticsearch
    (取决于你的操作系统)就可以启动Elasticsearch,在浏览器上面访问
    localhost:9200
    ,返回如下信息:

    {
    "name": "yeyonghaodeMacBook-Pro.local",
    "cluster_name": "elasticsearch",
    "cluster_uuid": "6sMhfd0fSgSnqk7M_CTmug",
    "version": {
    "number": "7.11.1",
    "build_flavor": "default",
    "build_type": "tar",
    "build_hash": "ff17057114c2199c9c1bbecc727003a907c0db7a",
    "build_date": "2021-02-15T13:44:09.394032Z",
    "build_snapshot": false,
    "lucene_version": "8.7.0",
    "minimum_wire_compatibility_version": "6.8.0",
    "minimum_index_compatibility_version": "6.0.0-beta1"
    },
    "tagline": "You Know, for Search"
    }

    则说明服务已经部署成功。

    接下来我们通过postman来创建index(表)和写入数据到ES:

    PUT http://localhost:9200/teachers/_doc/1
    {
    "name":"xpleaf",
    "age":26,
    "rate":0.86,
    "percent":0.95,
    "join_time":1551058601000
    }

    数据写入成功后,通过postman来查询数据:

    GET http://localhost:9200/teachers/_search
    {
    "took": 115,
    "timed_out": false,
    "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
    },
    "hits": {
    "total": {
    "value": 1,
    "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
    {
    "_index": "teachers",
    "_type": "_doc",
    "_id": "1",
    "_score": 1.0,
    "_source": {
    "name": "xpleaf",
    "age": 26,
    "rate": 0.86,
    "percent": 0.95,
    "join_time": 1551058601000
    }
    }
    ]
    }
    }

    3.3.2 Calcite接入Elasticsearch数据源

    当然你可能会说,ES本身也提供了SQL的能力,但实际上它是属于x-pack组件的一部分,是商用的,因此使用需谨慎,并且我个人觉得,它提供的SQL能力比较弱。

    当然Calcite的Elasticsearch适配器其实也写得一般。

    有了前面的准备之后,我们编写如下Calcite代码:

    public class ElasticsearchDemo {
    
    public static void main(String[] args) throws Exception {
    // 1.构建ElasticsearchSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等
    RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
    ElasticsearchSchema elasticsearchSchema = new ElasticsearchSchema(restClient, new ObjectMapper(), "teachers");
    
    // 2.构建Connection
    // 2.1 设置连接参数
    Properties info = new Properties();
    // 不区分sql大小写
    info.setProperty("caseSensitive", "false");
    // 2.2 获取标准的JDBC Connection
    Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
    // 2.3 获取Calcite封装的Connection
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
    
    // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下
    // 以实现查询不同数据源的目的
    SchemaPlus rootSchema = calciteConnection.getRootSchema();
    
    // 4.将不同数据源schema挂载到RootSchema,这里添加ElasticsearchSchema
    rootSchema.add("es", elasticsearchSchema);
    
    // 5.执行SQL查询,通过SQL方式访问object对象实例
    String sql = "select * from es.teachers";
    Statement statement = calciteConnection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    
    // 6.遍历打印查询结果集
    System.out.println(ResultSetUtil.resultString(resultSet));
    }
    
    }

    执行代码,其输出结果如下:

    {name=xpleaf, age=26, rate=0.86, percent=0.95, join_time=1551058601000}

    4 The Next

    通过前面的基本介绍和QuickStart,相信你对Apache Calcite已经有了最基本的了解,当然如果想要在生产环境真正使用Calcite,使用它来定制化构建我们的统一查询系统,仅仅了解这些肯定是远远不够的,确实是路漫漫其修远兮,不过不急,没关系的,后面有机会我将会介绍更多Calcite的高级用法。

    其实很多高级用法都是通过研读Apache Druid-SQL的源码得知的,所以我会一直强调,如果较多时间和精力,不妨阅读它的源码。

    附录1:ResultSetUtil

    public class ResultSetUtil {
    
    public static String resultString(ResultSet resultSet) throws SQLException {
    return resultString(resultSet, false);
    }
    
    public static String resultString(ResultSet resultSet, boolean printHeader) throws SQLException {
    List<List<Object>> resultList = resultList(resultSet, printHeader);
    return resultString(resultList);
    }
    
    public static List<List<Object>> resultList(ResultSet resultSet) throws SQLException {
    return resultList(resultSet, false);
    }
    
    public static String resultString(List<List<Object>> resultList) throws SQLException {
    StringBuilder builder = new StringBuilder();
    resultList.forEach(row -> {
    String rowStr = row.stream()
    .map(columnValue -> columnValue + ", ")
    .collect(Collectors.joining());
    rowStr = rowStr.substring(0, rowStr.lastIndexOf(", ")) + "\n";
    builder.append(rowStr);
    });
    return builder.toString();
    }
    
    public static List<List<Object>> resultList(ResultSet resultSet, boolean printHeader) throws SQLException {
    ArrayList<List<Object>> results = new ArrayList<>();
    final ResultSetMetaData metaData = resultSet.getMetaData();
    final int columnCount = metaData.getColumnCount();
    if (printHeader) {
    ArrayList<Object> header = new ArrayList<>();
    for (int i = 1; i <= columnCount; i++) {
    header.add(metaData.getColumnName(i));
    }
    results.add(header);
    }
    while (resultSet.next()) {
    ArrayList<Object> row = new ArrayList<>();
    for (int i = 1; i <= columnCount; i++) {
    row.add(resultSet.getObject(i));
    }
    results.add(row);
    }
    return results;
    }
    
    }

    附录2:演示Demo源码地址

    https://github.com/xpleaf/calcite-tutorial

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: