Apache Calcite精简入门与学习指导
1 Apache Calcite基本介绍
Apache Calcite是一个动态数据管理框架,它包含了许多典型数据库管理系统的部分,但省略了一些关键功能:数据存储、数据处理算法和元数据存储。
基于Apache Calcite,我们可以为任何第三方存储引擎开发SQL查询引擎。
- 官网地址
- 项目地址
https://github.com/apache/calcite
2 Apache Calcite学习指导
要想了解Calcite,其实官方文档确实不妨一看。尽管官方文档会提及非常多你之前可能没有接触过的概念,但好在它文档内容不多,这样让你对SQL执行中可能涉及的一些关键术语留下一个印象,那对以后深入学习和使用Calcite还是有帮助的,毕竟如果真的想用好Calcite,或者说只是使用Calcite,这些关键术语都是需要掌握和理解的。
不过,仅仅看官方文档,那还是远远不够的。回过头再来看Calcite的文档时,你会发现,它完全是写给“高端玩家”的,它是对Calcite高度抽象总结的文档,并不是写给初学者来进行学习的,以至于你想通过官方文档来跑个QuickStart,那也是相当困难,个人觉得没有一定的折腾能力或者对SQL执行没有理解经验的话,确实不太容易达成。因此,不能仅仅只看官方文档,你还需要通过其它途径获取更多关于它的信息,关于初学者如何快速掌握Apache Calcite,以下是我个人的一些心得体会:
-
1.先简单用起来
Calcite作为一个数据管理框架,首先,你得把它用起来才能慢慢理解它到底是干嘛的。理论上,通过Calcite这个组件,你可以以SQL的方式来访问任何你想要访问的数据源,比如你的文件(不管你是什么格式)、Java对象、第三方存储引擎(Redis、Kafka、Elasticsearch)等,所以我是用了“任何”来说明它的能力,这是它实实在在存在的能力。
本文档会手把手教你,怎么样通过Calcite以SQL的方式来访问CSV文件、Java内部对象、Elasticsearch数据源。
-
2.生产使用与思考
所以一旦你知道Calcite可以通过SQL的方式来访问任何的数据源之后,我知道有想法的同学已经会考虑到:
(1)那假如在我的业务系统中,有各种各样不同的存储系统,是不是可以通过Calcite来构建一个统一的数据查询系统(一个查询入口,同时查询多种不同数据源)?
使用者不需要感知数据存储在哪里,在他们看来,这就是一个只提供SQL查询入口的查询系统,它屏蔽了它所接入的各种存储系统的差异;
- (2)假如业务存在一个流行的数据存储系统或者引擎,但它不支持SQL查询,我是不是可以借用Calcite来为它开发一个SQL引擎?
答案是肯定的,Calcite是一个组件,本质上也是一个框架,它提供了各种扩展点来让你实现这样的功能。
当然如果你想借用Calcite针对某个存储系统开发一个好的SQL引擎,还是需要相当大的努力的,比如VolcanoPlanner就需要好好理解下,比较可惜的是,直到现在我也没有精力去研究它,以至于我想为Elasticsearch开发一个SQL引擎的想法都迟迟未能实现。
所谓的“借用Calcite针对某个存储系统开发一个好的SQL引擎”,其实在Calcite里有一个专业的术语,叫做“数据源适配器”。
Calcite本身也提供了多个存储引擎的适配器,比如Druid、Elasticsearch、SparkSQL等等,当然开源的就并不一定得,前面之所以一直提及要重新写一个Elasticsearch的适配器,是因为我觉得Calcite本身提供的ES适配器能力比较弱,相信用过的同学都会有所体会。
3.深度使用与思考
实际上如果只是想知道Calcite怎么使用的,有哪些功能可以使用的,我们不妨站在巨人的肩膀上,看看业界的开源项目是怎么使用它的。
一个不错的参考是Apache Druid,其SQL引擎正是基于Apache Calcite来开发构建的,因此对于Calcite更多高级功能的使用,我们不妨去研究一下Apache Druid-SQL模块的源码,相信会有非常大的收获。
4.VolcanPlanner
有时间和精力研究一下其在Calcite的实现,个人觉得会非常不错。
本文档会手把手教你,怎么样通过Calcite以SQL的方式来访问CSV文件、Java内部对象、Elasticsearch数据源。
对于Calcite更多的实现细节,还是自己想办法根据实际应用场景,去思考一下它的各个模块功能,比如想了解某一个功能原理,就去看其源码结构和细节,我相信这本身对个人能力的提升都是极其有帮助的。
3 通过Apache Calcite接入不同数据源
先构建一个maven项目,然后引入Calcite的依赖:
<dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-example-csv</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-elasticsearch</artifactId> <version>1.20.0</version> </dependency>
3.1 接入CSV数据源
先准备一个CSV文件:
EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date 100,"Fred",10,,,30,25,true,false,"1996-08-03" 110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01" 110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03" 120,"Wilma",20,"F",,1,5,,true,"2005-09-07" 130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"
Calcite会把每个csv文件映射成一个SQL表。csv文件表头指定该列数据类型,根据一定规则映射到对应的SQL类型。如没有指定,则统一映射成VARCHAR。
文件命名为depts.csv,Caclite会构建表名为文件名的table,即depts.
然后编写下面的代码通过Calcite以SQL方式访问数据:
// Author: xpleaf public class CsvDemo { public static void main(String[] args) throws Exception { // 0.获取csv文件的路径,注意获取到文件所在上层路径就可以了 String path = Objects.requireNonNull(CsvDemo.class.getClassLoader().getResource("csv").getPath()); // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等 CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE); // 2.构建Connection // 2.1 设置连接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 // 以实现查询不同数据源的目的 Sch 8000 emaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不同数据源schema挂载到RootSchema,这里添加CsvSchema rootSchema.add("csv", csvSchema); // 5.执行SQL查询,通过SQL方式访问csv文件 String sql = "select * from csv.depts"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
执行代码,其输出结果如下:
100, Fred, 10, , , 30, 25, true, false, 1996-08-03 110, Eric, 20, M, San Francisco, 3, 80, null, false, 2001-01-01 110, John, 40, M, Vancouver, 2, null, false, true, 2002-05-03 120, Wilma, 20, F, , 1, 5, null, true, 2005-09-07 130, Alice, 40, F, Vancouver, 2, null, false, true, 2007-01-01
思考:
csv是官方文档有提及的一个例子,在整体上如果需要对Calcite源码的使用有一个认识(尤其是如何开发适配器),可以基于这个demo,对照文档提及的各个概念、类,通过分析源码来进行理解,比如:
- 1.Schema是怎么构建的,在Calcite的位置和具体作用是什么;
- 2.Table是怎么构建的,在Calcite的位置和具体作用是什么;
- 3.在执行查询时是如何做SQL Parse、Validate、Optimize和执行的;
你都可以通过这个demo来一探究竟,当然,虽然我这里短短几句话带过,实际上如果你想研究这个过程,可能需要花费你较多时间,我建议不急着步子一下跨得太大,慢慢来,不急的。
另外,其实通过官方文档的介绍,对于怎么去开发一个Caclite的数据源适配器,应该也是有一定的体会的,其实如果只是实现一个简单的适配器(不考虑太多的SQL优化规则),那这个难度还是不大的。
我通过这个例子,包括后面的几个例子,其实都是想告诉你,如何快速使用Calcite(也就是相当给你写了一个QuickStart),从而对Calcite整体使用有一个认识,如果你想更深度使用Calcite,建议:
- 1.不妨看看Calcite源码里面的UT,里面提供了很好的参考案例;
- 2.但方式1可能会比较零散,你可以研读一下Apache Druid-SQL模块的源码,看一下整体上它是怎么使用的,它里面的许多高级使用技巧和方法还是十分有借鉴意义的,不妨看看。
3.2 接入Object对象数据源
3.2.1 SparkSQL接入Object对象
有用过SparkSQL的同学会知道,在SparkSQL中,可以使用编程的方式来将对象实例转换为DataFrame,进而注册Table,以通过SQL来访问这些对象实例:
public class _01SparkRDD2DataFrame { public static void main(String[] args) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName(_01SparkRDD2DataFrame.class.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{Person.class}); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); List<Person> persons = Arrays.asList( new Person(1, "name1", 25, 179), new Person(2, "name2", 22, 176), new Person(3, "name3", 27, 178), new Person(1, "name4", 24, 175) ); DataFrame df = sqlContext.createDataFrame(persons, Person.class); // 构造方法有多个,使用personsRDD的方法也是可以的 // where age > 23 and height > 176 df.select(new Column("id"), new Column("name"), new Column("age"), new Column("height")) .where(new Column("age").gt(23).and(new Column("height").lt(179))) .show(); df.registerTempTable("person"); sqlContext.sql("select * from person where age > 23 and height < 179").show(); jsc.close(); } }
以上代码例子来自xpleaf的文章《Spark SQL笔记整理(二):DataFrame编程模型与操作案例》
注意这里给出的案例还是Spark 1.x的用法,Spark 2.x以及之后的版本则可能不推荐这种用法了,具体请参考Spark的官方文档。
3.2.2 Calcite接入Object对象
那么对应到Calcite,它也提供了类似的方式来通过SQL访问对象实例数据。
为了进行演示,我们先构建Object对象类:
public class HrSchema { public final Employee[] emps = { new Employee(100, 10, "Bill", 10000, 1000), new Employee(200, 20, "Eric", 8000, 500), new Employee(150, 10, "Sebastian", 7000, null), new Employee(110, 10, "Theodore", 11500, 250), }; @Override public String toString() { return "HrSchema"; } public static class Employee { public int empid; public int deptno; public String name; public float salary; public Integer commission; public Employee(int empid, int deptno, String name, float salary, Integer commission) { this.empid = empid; this.deptno = deptno; this.name = name; this.salary = salary; this.commission = commission; } @Override public String toString() { return "Employee [empid: " + empid + ", deptno: " + deptno + ", name: " + name + "]"; } @Override public boolean equals(Object obj) { return obj == this || obj instanceof Employee && empid == ((Employee) obj).empid; } } }
Calcite会将HrSchema的emps映射为一张表。
编写Calcite代码如下:
public class ObjectDemo { public static void main(String[] args) throws Exception { // 1.构建CsvSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等 ReflectiveSchema reflectiveSchema = new ReflectiveSchema(new HrSchema()); // 2.构建Connection // 2.1 设置连接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 // 以实现查询不同数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不同数据源schema挂载到RootSchema,这里添加ReflectiveSchema rootSchema.add("hr", reflectiveSchema); // 5.执行SQL查询,通过SQL方式访问object对象实例 String sql = "select * from hr.emps"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
执行代码,其输出结果如下:
100, 10, Bill, 10000.0, 1000 200, 20, Eric, 8000.0, 500 150, 10, Sebastian, 7000.0, null 110, 10, Theodore, 11500.0, 250
一般在使用Calcite构建统一查询系统时,Object对象表会被用于构建数据表的元数据信息表(即表有哪些字段、字段的类型、用于构建数据表的元数据信息)等,详情可以参考Apache Druid-SQL源码。
3.3 接入Elasticsearch数据源
3.3.1 Elasticsearch极快速入门
不用有压力,如果你之前完全没有接触过Elasticsearch,也不用担心学习成本的问题,你就完全可以把它简单理解为一个数据库就好了,不用想那么复杂,并且,它开箱即用,没有任何部署成本。
下载:
https://www.elastic.co/cn/downloads/elasticsearch
根据对应的操作系统下载相应的版本就可以。
下载完成后,解压,进入bin目录,执行
elasticsearch.bat或
elasticsearch(取决于你的操作系统)就可以启动Elasticsearch,在浏览器上面访问
localhost:9200,返回如下信息:
{ "name": "yeyonghaodeMacBook-Pro.local", "cluster_name": "elasticsearch", "cluster_uuid": "6sMhfd0fSgSnqk7M_CTmug", "version": { "number": "7.11.1", "build_flavor": "default", "build_type": "tar", "build_hash": "ff17057114c2199c9c1bbecc727003a907c0db7a", "build_date": "2021-02-15T13:44:09.394032Z", "build_snapshot": false, "lucene_version": "8.7.0", "minimum_wire_compatibility_version": "6.8.0", "minimum_index_compatibility_version": "6.0.0-beta1" }, "tagline": "You Know, for Search" }
则说明服务已经部署成功。
接下来我们通过postman来创建index(表)和写入数据到ES:
PUT http://localhost:9200/teachers/_doc/1 { "name":"xpleaf", "age":26, "rate":0.86, "percent":0.95, "join_time":1551058601000 }
数据写入成功后,通过postman来查询数据:
GET http://localhost:9200/teachers/_search { "took": 115, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "teachers", "_type": "_doc", "_id": "1", "_score": 1.0, "_source": { "name": "xpleaf", "age": 26, "rate": 0.86, "percent": 0.95, "join_time": 1551058601000 } } ] } }
3.3.2 Calcite接入Elasticsearch数据源
当然你可能会说,ES本身也提供了SQL的能力,但实际上它是属于x-pack组件的一部分,是商用的,因此使用需谨慎,并且我个人觉得,它提供的SQL能力比较弱。
当然Calcite的Elasticsearch适配器其实也写得一般。
有了前面的准备之后,我们编写如下Calcite代码:
public class ElasticsearchDemo { public static void main(String[] args) throws Exception { // 1.构建ElasticsearchSchema对象,在Calcite中,不同数据源对应不同Schema,比如CsvSchema、DruidSchema、ElasticsearchSchema等 RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build(); ElasticsearchSchema elasticsearchSchema = new ElasticsearchSchema(restClient, new ObjectMapper(), "teachers"); // 2.构建Connection // 2.1 设置连接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 // 以实现查询不同数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不同数据源schema挂载到RootSchema,这里添加ElasticsearchSchema rootSchema.add("es", elasticsearchSchema); // 5.执行SQL查询,通过SQL方式访问object对象实例 String sql = "select * from es.teachers"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
执行代码,其输出结果如下:
{name=xpleaf, age=26, rate=0.86, percent=0.95, join_time=1551058601000}
4 The Next
通过前面的基本介绍和QuickStart,相信你对Apache Calcite已经有了最基本的了解,当然如果想要在生产环境真正使用Calcite,使用它来定制化构建我们的统一查询系统,仅仅了解这些肯定是远远不够的,确实是路漫漫其修远兮,不过不急,没关系的,后面有机会我将会介绍更多Calcite的高级用法。
其实很多高级用法都是通过研读Apache Druid-SQL的源码得知的,所以我会一直强调,如果较多时间和精力,不妨阅读它的源码。
附录1:ResultSetUtil
public class ResultSetUtil { public static String resultString(ResultSet resultSet) throws SQLException { return resultString(resultSet, false); } public static String resultString(ResultSet resultSet, boolean printHeader) throws SQLException { List<List<Object>> resultList = resultList(resultSet, printHeader); return resultString(resultList); } public static List<List<Object>> resultList(ResultSet resultSet) throws SQLException { return resultList(resultSet, false); } public static String resultString(List<List<Object>> resultList) throws SQLException { StringBuilder builder = new StringBuilder(); resultList.forEach(row -> { String rowStr = row.stream() .map(columnValue -> columnValue + ", ") .collect(Collectors.joining()); rowStr = rowStr.substring(0, rowStr.lastIndexOf(", ")) + "\n"; builder.append(rowStr); }); return builder.toString(); } public static List<List<Object>> resultList(ResultSet resultSet, boolean printHeader) throws SQLException { ArrayList<List<Object>> results = new ArrayList<>(); final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); if (printHeader) { ArrayList<Object> header = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { header.add(metaData.getColumnName(i)); } results.add(header); } while (resultSet.next()) { ArrayList<Object> row = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { row.add(resultSet.getObject(i)); } results.add(row); } return results; } }
附录2:演示Demo源码地址
- Apache Thrift学习之一(入门及Java实例演示)
- IT技术学习指导之Linux系统入门的4个阶段
- JAVA入门学习指导
- hive入门学习线路指导
- 入门级学习指导:如何学习GIS?
- IT技术学习指导之Linux系统入门的4个阶段(纯干货带图)
- IT技术学习指导之Linux系统入门的4个阶段
- 新手入门指导:Vue 2.0 的建议学习顺序——尤雨溪
- 路在何方:JAVA入门学习指导方向
- 如何学习Javascript入门指导
- IT技术学习指导之Linux系统入门的4个阶段
- C++程序员学习发展方向分析和指导(C++入门学习指导建议必看)
- hive入门学习线路指导
- Javascript 入门学习指导
- JAVA 入门学习之路-apache-dbutils-BeanUtils使用详解。
- IT技术学习指导之Linux系统入门的4个阶段
- NVIDIA Jetson TK1学习与开发(二):入门指导
- java入门学习指导之最佳方向
- 竟有如此缤纷的 AIR 学习指导:推荐《Adobe AIR 完整入门与开发实录》
- 大数据入门学习指导