您的位置:首页 > 数据库

Spark-Sql源码解析之二 Sqlparser:sql –> unresolved logical plan

2016-08-11 16:18 351 查看
前面章节讲解了Spark-SQL中的核心流程,接下来主要讲解如何将sql语句转化为UnResolved Logical Plan(包含UnresolvedRelation、 UnresolvedFunction、 UnresolvedAttribute)。

protected[sql] def parseSql(sql: String): LogicalPlan = {
val ret = ddlParser.parse(sql, false)
ret
}
private[sql] class DDLParser(
parseQuery: String => LogicalPlan)
extends AbstractSparkSQLParser with DataTypeParser with Logging {

def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
//先解析看看是不是DDL语句
parse(input)
} catch {
case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => parseQuery(input)//进一步解析其它类型的语句,其parseQuery为DDLParser的构造参数
case x: Throwable => throw x
}
}
}
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))

//其中fallback= getSQLDialect().parse(_)
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect().parse(_))

private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | others
private lazy val cache: Parser[LogicalPlan] =
CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
case isLazy ~ tableName ~ plan =>
CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)
}
private lazy val uncache: Parser[LogicalPlan] =
( UNCACHE ~ TABLE ~> ident ^^ {
case tableName => UncacheTableCommand(tableName)
}
| CLEAR ~ CACHE ^^^ ClearCacheCommand
)
private lazy val set: Parser[LogicalPlan] =
SET ~> restInput ^^ {
case input => SetCommandParser(input)
}

private lazy val show: Parser[LogicalPlan] =
SHOW ~> TABLES ~ (IN ~> ident).? ^^ {
case _ ~ dbName => ShowTablesCommand(dbName)
}

private lazy val others: Parser[LogicalPlan] =
wholeInput ^^ {
case input => fallback(input)//select语句利用fallback解析
}
}
//继续往下追踪getSQLDialect().parse(_)就是DefaultParserDialect.parse(_)
private[spark] class DefaultParserDialect extends ParserDialect {
@transient
protected val sqlParser = new SqlParser

override def parse(sqlText: String): LogicalPlan = {
sqlParser.parse(sqlText)
}
}
关键就是SqlParser

class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected lazy val select: Parser[LogicalPlan] =
SELECT ~> DISTINCT.? ~
repsep(projection, ",") ~
(FROM   ~> relations).? ~
(WHERE  ~> expression).? ~
(GROUP  ~  BY ~> rep1sep(expression, ",")).? ~
(HAVING ~> expression).? ~
sortType.? ~
(LIMIT  ~> expression).? ^^ {
case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l =>//解析顺序为r,f,g,p,d,h,o,l
val base = r.getOrElse(OneRowRelation)
val withFilter = f.map(Filter(_, base)).getOrElse(base)
val withProjection = g
.map(Aggregate(_, assignAliases(p), withFilter))
.getOrElse(Project(assignAliases(p), withFilter))
val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct)
val withOrder = o.map(_(withHaving)).getOrElse(withHaving)
val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder)
withLimit
}
}


比方说:

Sql语句为:

SELECT id,dev_chnid,dev_chnname,car_num,car_speed,car_direct fromtest where id > 1 group by dev_chnid sort by car_num

unresolvedlogical plan为:

['Sort ['car_num ASC], false//最后是o

 'Aggregate ['dev_chnid],['id,'dev_chnid,'dev_chnname,'car_num,'car_speed,'car_direct]//接着是g

  'Filter ('id > 1)//然后f

   'UnresolvedRelation [test],None//先解析r

]

可见其unresolvedlogical plan的语法树是根据select语句的解析顺序生成的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 源码