Spark源码解析之SparkSql
2017-09-18 16:55
537 查看
首先我们回顾一下使用SparkSql的一般步骤:
1. 从数据源或者RDD读取数据,构造出一个DataFrame
2. 使用DataFrame的registerTempTable方法根据刚才读取的数据创建一个临时表
3. 调用sqlContext的sql方法执行sql语句
那么在这里我们就从sql语句的调用开始:
parseSql的返回类型是一个LogicalPlan(代表一个逻辑计划),这里就是使用sqlContext和调用parseSql方法返回的Logical Plan封装为一个DataFrame
在parseSql中我们看到会调用ddlParser方法的parse方法:
在此我们先需要知道下面的类之间的关系
那么这里调用的parse方法其实是会调用AbstractSparkSQLParser的parse方法:
在这里我们可以看到根据scala的柯里化,我们知道会调用start方法,这个start方法就是在我们的DDLParser中
调用start方法其实就是调用createTable,describeTable,refreshTable等方法,至此我们可以知道其实DDLParser的主要目的是处理DDL语句,主要就是建表的一些语句,那么对于那些不是ddl的sql语句该如何处理呢,显然DDLParser是无法处理的。然后它会抛出异常
当然这个类里面也会包含这些方法的实现,具体对诸如cache方法的实现我们就不深入了,重点我们看一下others的实现
对于一些没有办法处理的sql关键字,我们又会调用fallback方法,那么这个fallback方法是从哪里来的呢?
很明显这是其构造函数的参数,也就是在创建类对象的时候传过来的,我们创建这个类的对象的代码如下
那么很明显这个fallback方法就是getSQLDialect().parse(_)方法返回的那个方法
传入的参数是getSQLDialect().parse方法
在这里我们可以看到getSQLDialect()方法返回的是一个DefaultParserDialect类,那么它传进去的就是DefaultParserDialect.parse方法
因此fallback其实就是sqlParse.parse,还是跟之前一样,这边还是会调用AbstarctSparkSQLParser的parse方法,最后还是SqlParser的start方法:
这里要处理的就是诸如select,insert的关键字
当上面的步骤都执行完了以后,我们就对传进去的sql语句做好了初步的解析,即将调用parser这个组件将sql解析成了Unresolved Logical Plan,这个类继承了QueryPlan,而QueryPlan又继承自TreeNode,所以其实到这里就是根据语法生成了一棵树,该树一直在内存里维护,不会以某种格式保存到磁盘中,且无论是Analyzer分析过的逻计划还是Optimizer优化过的逻辑计划,树的修改是以替换现有节点的方式进行的。
其实dataFrame也是具有懒加载的特性的,也就意味着只有当遇到show,collect等需要正真的返回结果的地方才会真正地去执行sparkSql后面的流程
新建一个QueryExecution的对象的代码如下
在创建这个对象的时候:
1. 调用analyzer组件去对上一步得到的Unresolved Logical Plan进行解析得到resolved Logical Plan,在Analyzer中会定义一系列的Rules用来解析这个Logical Plan
2. 调用optimizer组件对上一步得到的Resolved Logical Plan进行优化,这一步非常重要,了解其中的一些规则我们就可以按照它的优化策略写出合适的sql语句,减少sql优化的过程,提高性能
3. 根据上一步中创建的Optimized Logical Plan创建一个SparkPlan即物理计划,这边就是可以正真执行的了
4. 调用SparkPlan的execute方法去执行物理计划
1. 从数据源或者RDD读取数据,构造出一个DataFrame
2. 使用DataFrame的registerTempTable方法根据刚才读取的数据创建一个临时表
3. 调用sqlContext的sql方法执行sql语句
那么在这里我们就从sql语句的调用开始:
def sql(sqlText: String): DataFrame = { DataFrame(this, parseSql(sqlText)) }
parseSql的返回类型是一个LogicalPlan(代表一个逻辑计划),这里就是使用sqlContext和调用parseSql方法返回的Logical Plan封装为一个DataFrame
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_)) protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
在parseSql中我们看到会调用ddlParser方法的parse方法:
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { try { parse(input) } catch { case ddlException: DDLException => throw ddlException case _ if !exceptionOnError => parseQuery(input) case x: Throwable => throw x } }
在此我们先需要知道下面的类之间的关系
那么这里调用的parse方法其实是会调用AbstractSparkSQLParser的parse方法:
def parse(input: String): LogicalPlan = synchronized { // Initialize the Keywords. initLexical //用lexical.Scanner针对sql语句来进行语法检查和分析,满足语法检查结果的话就是用sql解析器针对sql进行解析 //包括词法解析,将sql语句解析成一个个的token,最后生成一个UnResolved LogicPlan //该LogicPlan仅仅针对SQL语句本身生成,不涉及任何关联的数据源等信息 phrase(start)(new lexical.Scanner(input)) match { case Success(plan, _) => plan case failureOrError => sys.error(failureOrError.toString) } }
在这里我们可以看到根据scala的柯里化,我们知道会调用start方法,这个start方法就是在我们的DDLParser中
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable protected def start: Parser[LogicalPlan] = ddl protected lazy val describeTable: Parser[LogicalPlan] = (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ { case e ~ tableIdent => DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined) } createTable... ...
调用start方法其实就是调用createTable,describeTable,refreshTable等方法,至此我们可以知道其实DDLParser的主要目的是处理DDL语句,主要就是建表的一些语句,那么对于那些不是ddl的sql语句该如何处理呢,显然DDLParser是无法处理的。然后它会抛出异常
case _ if !exceptionOnError => parseQuery(input)那么此时会调用parseQuer方法进行处理,这个parseQuery就是在创建DDLParser时传递过来的参数,根据我们创建DDLParser的代码可以看出这个parseQuery就是SparkSQLParser.parse,根据上面的类的继承关系,可以知道这个方法也是调用的其父类的方法即AbstractSparkSQLParser类的parse方法,在这个方法中最后还是调用SparkSQLParser的start方法
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | desc | others
当然这个类里面也会包含这些方法的实现,具体对诸如cache方法的实现我们就不深入了,重点我们看一下others的实现
private lazy val others: Parser[LogicalPlan] = wholeInput ^^ { case input => fallback(input) }
对于一些没有办法处理的sql关键字,我们又会调用fallback方法,那么这个fallback方法是从哪里来的呢?
private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {}
很明显这是其构造函数的参数,也就是在创建类对象的时候传过来的,我们创建这个类的对象的代码如下
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))
那么很明显这个fallback方法就是getSQLDialect().parse(_)方法返回的那个方法
传入的参数是getSQLDialect().parse方法
protected[sql] def dialectClassName = if (conf.dialect == "sql") { classOf[DefaultParserDialect].getCanonicalName } else { conf.dialect } protected[sql] def getSQLDialect(): ParserDialect = { try { val clazz = Utils.classForName(dialectClassName) clazz.newInstance().asInstanceOf[ParserDialect] } catch { ... } }
在这里我们可以看到getSQLDialect()方法返回的是一个DefaultParserDialect类,那么它传进去的就是DefaultParserDialect.parse方法
private[spark] class DefaultParserDialect extends ParserDialect { @transient protected val sqlParser = SqlParser override def parse(sqlText: String): LogicalPlan = { sqlParser.parse(sqlText) } }
因此fallback其实就是sqlParse.parse,还是跟之前一样,这边还是会调用AbstarctSparkSQLParser的parse方法,最后还是SqlParser的start方法:
protected lazy val start: Parser[LogicalPlan] = start1 | insert | cte c1ac protected lazy val start1: Parser[LogicalPlan] = (select | ("(" ~> select <~ ")")) * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } )
这里要处理的就是诸如select,insert的关键字
当上面的步骤都执行完了以后,我们就对传进去的sql语句做好了初步的解析,即将调用parser这个组件将sql解析成了Unresolved Logical Plan,这个类继承了QueryPlan,而QueryPlan又继承自TreeNode,所以其实到这里就是根据语法生成了一棵树,该树一直在内存里维护,不会以某种格式保存到磁盘中,且无论是Analyzer分析过的逻计划还是Optimizer优化过的逻辑计划,树的修改是以替换现有节点的方式进行的。
其实dataFrame也是具有懒加载的特性的,也就意味着只有当遇到show,collect等需要正真的返回结果的地方才会真正地去执行sparkSql后面的流程
//实际上在后面真正对DataFrame操作的时候需要真正的去执行sql语句的时候 // 就会触发sqlContext的executesql方法的执行 //该方法实际上会返回一个QueryExecution,这个QueryExecution就会触发后续的整个流程 protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
新建一个QueryExecution的对象的代码如下
@DeveloperApi protected[sql] class QueryExecution(val logical: LogicalPlan) { def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) //使用一个UnResolved LogicPlan去构造一个QueryExecution的实例对象 //那么sql语句的设计执行就会立即一步一步的触发 //调用analyzer来生成一个resolved LogicPlan lazy val analyzed: LogicalPlan = analyzer.execute(logical) //如果当前的这个执行计划缓存中有,那么就从缓存中读取 lazy val withCachedData: LogicalPlan = { assertAnalyzed() cacheManager.useCachedData(analyzed) } //针对resolvedPlan调用optimizer的execute进行优化,得到优化后的optimized LogicalPlan //获得优化后的逻辑执行计划 lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) // TODO: Don't just pick the first one... //使用sparkPlanner根据刚刚创建的一个optimized LogicalPlan创建一个sparkplan lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(self) planner.plan(optimizedPlan).next() } /*在sparksql中,逻辑执行计划就是LogicalPlan,物理执行计划就是SparkPlan*/ // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. //生成一个可以执行的sparkplan,此时就是physicplan,此时就是物理执行计划 //此时的话就已经绑定好了数据源,知各个表如何join //如果进行join,默认spark内部是会对小表进行广播的 lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ //调用sparkPlan (封装了Physical plan)的execute方法,execute方法实际上就会执行物理执行计划 lazy val toRdd: RDD[InternalRow] = executedPlan.execute() protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } def simpleString: String = s"""== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim override def toString: String = { def output = analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") s"""== Parsed Logical Plan == |${stringOrError(logical)} |== Analyzed Logical Plan == |${stringOrError(output)} |${stringOrError(analyzed)} |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} """.stripMargin.trim } }
在创建这个对象的时候:
1. 调用analyzer组件去对上一步得到的Unresolved Logical Plan进行解析得到resolved Logical Plan,在Analyzer中会定义一系列的Rules用来解析这个Logical Plan
2. 调用optimizer组件对上一步得到的Resolved Logical Plan进行优化,这一步非常重要,了解其中的一些规则我们就可以按照它的优化策略写出合适的sql语句,减少sql优化的过程,提高性能
3. 根据上一步中创建的Optimized Logical Plan创建一个SparkPlan即物理计划,这边就是可以正真执行的了
4. 调用SparkPlan的execute方法去执行物理计划
相关文章推荐
- 第47课:spark中的新解析引擎catalyst源码sqlparser彻底详解
- Spark-Sql源码解析之七 Execute: executed Plan -> RDD[Row]
- Spark之SQL解析(源码阅读十)
- Spark-Sql源码解析之八 Codegen
- Spark-Sql源码解析之四 Optimizer: analyzed logical plan –> optimized logical plan
- Spark源码走读之 -- sql的解析与执行
- Spark-Sql源码解析之五 Spark Planner:optimized logical plan –> spark plan
- Spark-Sql源码解析之一 引言
- spark SQL源码阅读002——sql.core包核心类——002执行SQL语法2次解析SQL词(analyse)
- sparksql源码解析(执行计划)
- 47:Spark中的新解析引擎Catalyst源码SqlParser彻底详解
- Spark-Sql源码解析之三 Analyzer:Unresolved logical plan –> analyzed logical plan
- Spark源码系列(九)Spark SQL初体验之解析过程详解
- Spark源码系列(九)Spark SQL初体验之解析过程详解
- Apache Spark源码走读之11 -- sql的解析与执行
- 第51课:Spark中的新解析引擎Catalyst源码SQL最终转化为RDD具体实现
- 51:Spark中的新解析引擎Catalyst源码SQL最终转化为RDD具体实现
- Spark-Sql源码解析之六 PrepareForExecution: spark plan -> executed Plan
- Spark-Sql源码解析之二 Sqlparser:sql –> unresolved logical plan
- 第51课: Spark中的新解析引擎Catalyst源码SQL最终转化为RDD具体实现