您的位置:首页 > 数据库

intel stream-sql功能代码分析

2017-03-02 14:31 866 查看


1. 从代码中看已经实现的功能

功能1:Support create/drop stream by SQL queries

功能2:Support simple queries for kinds of streaming inputs

功能3:stream-to-table join.


2. 分析上面已实现的功能


2.1 功能1实现


2.1.1 stream-sql中代码支持

StreamHiveContext.streamHql.streamHiveql

         StreamQl.parseSql

                   HiveQl.parseSql//走正常的Hive语句解析并获得LogicalPlan

                            HiveQl.nodeToPlan//将Hive ASTNode转为stream-sql中的LogicalPlan

                                     Token("TOK_CREATETABLE", children)//通过Token.unapply将一个Hive AST节点形式Node转为Spark SQL中Token形式

                                              nodeToRelation

                                                       UnresolvedRelation//至此Hive中一个AstNode形式的表名转为了一个Spark SQL中的LogicalPlan形式。

                   HiveQl.getAst//此处和Hive连接起来。如果解析并获取LogicalPlan的过程中错误,就只获取解析到的AstNode,

                            ParseDriver.parse//Hive QL Parser模块

                  StreamQL.nodeToPlan

                           Token("TOK_CREATESTREAM", children)//通过Token.unapply将一个Hive AST节点形式Node转为stream-sql中Token形式

                                    CREATESTREAM//至此Hive中一个AstNode形式的表名转为了一个stream-sql的LogicalPlan形式。

                            ...

        SchemaDStream.queryExecution

                            StreamSQLContext.executePlan

                                     StreamHiveContext.QueryExecution//此处实际执行StreamHiveContext中的QueryExecution子类

                                               Analyzer//添加了hive函数的分析器

                                                        HiveFunctionRegistry

                                               SQLContext.optimzer

                                                        StreamPlanner with StreamHiveStrategies//执行StreamHiveContext中的StreamPlanner

                                                                 StreamHiveScans

                                                                           StreamHiveScan //执行计划树中执行流数据扫描功能,叶子节点

                                                                                    GenericStreamReader.makeDStreamForStream

                                                                                             GenericInputDStream.createStream

                                                                                                       GenericReceiver

                                                                           StreamDDL

                                                                                    CreateStream

                                                                                             CreateStreamOperator

                                                                                                       StreamMetastoreCatalog.createStream

                                                                                                                Hive.createTable

TextKafkaInputFormat

         TextKafkaRecordReader(KafkaInputFormat)

                   KafkaRecordReader(KafkaInputSplit)

                            Consumer.create //和kafka数据源连接起来了


2.1.2 hive语法支持

sql/hive/lib下有hive-exec-stream-0.12.0.jar。没有提供源码。从反编译看主要是修改了HiveParser.g中文件,然后在HiveParser.class中可以看到改变,这样在StreamHiveContext中调用Hive的解析器获取Astnode的时候可以得到TOK_CREATESTREAM等定制的节点。然后就可根据解析出的节点在stream-sql中相应DDL操作。目前SELECT STREAM这样的语句还没实现,语法文件中也没有体现,修改DDL的Alter操作只是在stream-sql代码中有一些,Hive语法文件中还没体现。


2.2 功能2实现

实现了类似的功能

SELECT select_expr, ...FROM stream_reference

[WHERE where_condition]

[LIMIT number]

和hive sql兼容,从流中查询数据。

实现上语法不需要修改。但操作符需要修改。

StreamSQLContext.streamSql

         SchemaDStream.queryExecution

         SQLContext.analyzer

                   Analyzer

                            EmptyFunctionRegistry

         SQLContext.optimzer

         StreamPlanner.apply //应用优化策略

                   Strategy.apply

                            BasicOperators //基本的投影,过滤等操作

                            HashJoin //实现逻辑与Spark SQL中类似,用于创建的流和普通表做Join

                            BroadcastNestedLoopJoin

                            CartesianProduct

                            ...

         RuleExecutor

                   AddExchange //为有partition的普通表或者流做查询分布

                            StreamExchange

         StreamPlan.executetoDStream

 

因为StreamExchange涉及查询拆分,并且在其中的child查询计划经常是叶子节点,涉及物理流数据的扫描,所以重点分析一下。

-------------

case class StreamExchange(newPartitioning: Partitioning, child: StreamPlan) extends UnaryNode {

  def output = child.output

  lazy val sparkPlan = execution.Exchange(newPartitioning, child.sparkPlan)

  override def execute = attachTree(this, "execute") {

    child.execute().transform(_ => sparkPlan.execute())

  }

}

-------------

代码中child对应的实际对象为StreamHiveScan,child.sparkPlan对应的实际对象为ExistingRDD

child.execute执行后child.sparkPlan.rdd变为从流转化过来的RDD(具体参照StreamHiveScan的execute方法最后一行),所以sparkPlan.execute()执行时实际调用Exchange对象的execute方法,Exchange.execute函数中调用的child.execute().mapPartitions实际为"流转化过来的RDD"的mapPartitions操作。这样就把StreamSQL模块的代码和SQL SQL Catalyst的代码结合起来了。


2.3 功能3实现

在功能2的基础上实现HashJoin,BroadcastNestedLoopJoin,CartesianProduct三个stream包下的策略,并且替换对应条件下的Join操作,Join实现是目前和Spark SQL中的实现代码一样,猜测是准备修改代码,添加更多优化方法,但看代码似乎还没做。

 

参考资料:

intel stream-sql: https://github.com/intel-spark/stream-sql
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐