您的位置:首页 > 数据库

Spark源码系列(九)Spark SQL初体验之解析过程详解

2017-09-13 18:17 731 查看
首先声明一下这个版本的代码是1.1的,之前讲的都是1.0的。

Spark支持两种模式,一种是在spark里面直接写sql,可以通过sql来查询对象,类似.net的LINQ一样,另外一种支持hive的HQL。不管是哪种方式,下面提到的步骤都会有,不同的是具体的执行过程。下面就说一下这个过程。

Sql解析成LogicPlan

使用Idea的快捷键Ctrl + Shift + N打开SQLQuerySuite文件,进行调试吧。



def sql(sqlText: String): SchemaRDD = {
if (dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $dialect")
}
}




从这里可以看出来,第一步是解析sql,最后把它转换成一个SchemaRDD。点击进入parseSql函数,发现解析Sql的过程在SqlParser这个类里面。

在SqlParser的apply方法里面,我们可以看到else语句里面的这段代码。

//对input进行解析,符合query的模式的就返回Success
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x => sys.error(x.toString)
}


这里我们主要关注query就可以。



protected lazy val query: Parser[LogicalPlan] = (
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
| insert | cache
)




这里面有很多看不懂的操作符,请到下面这个网址里面去学习。这里可以看出来它目前支持的sql语句只是select和insert。

http://www.scala-lang.org/api/2.10.4/index.html#scala.util.parsing.combinator.Parsers$Parser

我们继续查看select。


View Code
可以看得出来它对sql的解析是和我们常用的sql写法是一致的,这里面再深入下去还有递归,并不是看起来那么好理解。这里就不继续讲下去了,在解析hive的时候我会重点讲一下,我认为目前大家使用得更多是仍然是来源于hive的数据集,毕竟hive那么稳定。

到这里我们可以知道第一步是通过Parser把sql解析成一个LogicPlan。

LogicPlan到RDD的转换过程

好,下面我们回到刚才的代码,接着我们应该看SchemaRDD。



override def compute(split: Partition, context: TaskContext): Iterator[Row] =
firstParent[Row].compute(split, context).map(_.copy())

override def getPartitions: Array[Partition] = firstParent[Row].partitions

override protected def getDependencies: Seq[Dependency[_]] =
List(new OneToOneDependency(queryExecution.toRdd))




SchemaRDD是一个RDD的话,那么它最重要的3个属性:compute函数,分区,依赖全在这里面,其它的函数我们就不看了。

挺奇怪的是,我们new出来的RDD,怎么会有依赖呢,这个queryExecution是啥,点击进去看看吧,代码跳转到SchemaRDD继承的SchemaRDDLike里面。

lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }


把这两段很短的代码都放一起了,executePlan方法就是new了一个QueryExecution出来,那我们继续看看QueryExecution这个类吧。



lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
lazy val optimizedPlan = optimizer(analyzed)
lazy val sparkPlan = {
SparkPlan.currentContext.set(self)
planner(optimizedPlan).next()
}
// 在需要的时候加入Shuffle操作
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
lazy val toRdd: RDD[Row] = executedPlan.execute()




从这里可以看出来LogicPlan是经过了5个步骤的转换,要被analyzer和optimizer的处理,然后转换成SparkPlan,在执行之前还要被prepareForExecution处理一下,最后调用execute方法转成RDD.

下面我们分步讲这些个东东到底是干啥了。

首先我们看看Anayzer,它是继承自RuleExecutor的,这里插句题外话,Spark sql的作者Michael Armbrust在2013年的Spark Submit上介绍Catalyst的时候,就说到要从整体地去优化一个sql的执行是很困难的,所有设计成这种基于一个一个小规则的这种优化方式,既简单又方便维护。

好,我们接下来看看RuleExecutor的apply方法。


View Code
看完了RuleExecutor,我们继续看Analyzer,下面我只贴出来batches这块的代码,剩下的要自己去看了哦。


View Code
可以看得出来Analyzer是把Unresolved的LogicPlan解析成resolved的,解析里面的表名、字段、函数、别名什么的。

我们接着看Optimizer, 从单词上看它是用来做优化的,但是从代码上来看它更多的是为了过滤我们写的一些垃圾语句,并没有做什么实际的优化。


View Code
真是用心良苦啊,看来我们写sql的时候还是要注意一点的,你看人家花多大的功夫来优化我们的烂sql。。。要是我肯定不优化。。。写得烂就慢去吧!

接下来,就改看这一句了planner(optimizedPlan).next() 我们先看看SparkPlanner吧。


View Code
这一步是把逻辑计划转换成物理计划,或者说是执行计划了,里面有很多概念是我以前没听过的,网上查了一下才知道,原来数据库的执行计划还有那么多的说法,这一块需要是专门研究数据库的人比较了解了。剩下的两步就是prepareForExecution和execute操作。

prepareForExecution操作是检查物理计划当中的Distribution是否满足Partitioning的要求,如果不满足的话,需要重新弄做分区,添加shuffle操作,这块暂时没咋看懂,以后还需要仔细研究。最后调用SparkPlan的execute方法,这里面稍微讲讲这块的树型结构。



sql解析出来就是一个二叉树的结构,不管是逻辑计划还是物理计划,都是这种结构,所以在代码里面可以看到LogicPlan和SparkPlan的具体实现类都是有继承上面图中的三种类型的节点的。

非LeafNode的SparkPlan的execute方法都会有这么一句child.execute(),因为它需要先执行子节点的execute来返回数据,执行的过程是一个先序遍历。

最后把这个过程也用一个图来表示吧,方便记忆。



(1)通过一个Parser来把sql语句转换成Unresolved LogicPlan,目前有两种Parser,SqlParser和HiveQl。

(2)通过Analyzer把LogicPlan当中的Unresolved的内容给解析成resolved的,这里面包括表名、函数、字段、别名等。

(3)通过Optimizer过滤掉一些垃圾的sql语句。

(4)通过Strategies把逻辑计划转换成可以具体执行的物理计划,具体的类有SparkStrategies和HiveStrategies。

(5)在执行前用prepareForExecution方法先检查一下。

(6)先序遍历,调用执行计划树的execute方法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: