intel stream-sql功能代码分析
2017-03-02 14:31
866 查看
功能1:Support create/drop stream by SQL queries
功能2:Support simple queries for kinds of streaming inputs
功能3:stream-to-table join.
2. 分析上面已实现的功能
2.1 功能1实现
2.1.1 stream-sql中代码支持
StreamHiveContext.streamHql.streamHiveqlStreamQl.parseSql
HiveQl.parseSql//走正常的Hive语句解析并获得LogicalPlan
HiveQl.nodeToPlan//将Hive ASTNode转为stream-sql中的LogicalPlan
Token("TOK_CREATETABLE", children)//通过Token.unapply将一个Hive AST节点形式Node转为Spark SQL中Token形式
nodeToRelation
UnresolvedRelation//至此Hive中一个AstNode形式的表名转为了一个Spark SQL中的LogicalPlan形式。
HiveQl.getAst//此处和Hive连接起来。如果解析并获取LogicalPlan的过程中错误,就只获取解析到的AstNode,
ParseDriver.parse//Hive QL Parser模块
StreamQL.nodeToPlan
Token("TOK_CREATESTREAM", children)//通过Token.unapply将一个Hive AST节点形式Node转为stream-sql中Token形式
CREATESTREAM//至此Hive中一个AstNode形式的表名转为了一个stream-sql的LogicalPlan形式。
...
SchemaDStream.queryExecution
StreamSQLContext.executePlan
StreamHiveContext.QueryExecution//此处实际执行StreamHiveContext中的QueryExecution子类
Analyzer//添加了hive函数的分析器
HiveFunctionRegistry
SQLContext.optimzer
StreamPlanner with StreamHiveStrategies//执行StreamHiveContext中的StreamPlanner
StreamHiveScans
StreamHiveScan //执行计划树中执行流数据扫描功能,叶子节点
GenericStreamReader.makeDStreamForStream
GenericInputDStream.createStream
GenericReceiver
StreamDDL
CreateStream
CreateStreamOperator
StreamMetastoreCatalog.createStream
Hive.createTable
TextKafkaInputFormat
TextKafkaRecordReader(KafkaInputFormat)
KafkaRecordReader(KafkaInputSplit)
Consumer.create //和kafka数据源连接起来了
2.1.2 hive语法支持
sql/hive/lib下有hive-exec-stream-0.12.0.jar。没有提供源码。从反编译看主要是修改了HiveParser.g中文件,然后在HiveParser.class中可以看到改变,这样在StreamHiveContext中调用Hive的解析器获取Astnode的时候可以得到TOK_CREATESTREAM等定制的节点。然后就可根据解析出的节点在stream-sql中相应DDL操作。目前SELECT STREAM这样的语句还没实现,语法文件中也没有体现,修改DDL的Alter操作只是在stream-sql代码中有一些,Hive语法文件中还没体现。
2.2 功能2实现
实现了类似的功能SELECT select_expr, ...FROM stream_reference
[WHERE where_condition]
[LIMIT number]
和hive sql兼容,从流中查询数据。
实现上语法不需要修改。但操作符需要修改。
StreamSQLContext.streamSql
SchemaDStream.queryExecution
SQLContext.analyzer
Analyzer
EmptyFunctionRegistry
SQLContext.optimzer
StreamPlanner.apply //应用优化策略
Strategy.apply
BasicOperators //基本的投影,过滤等操作
HashJoin //实现逻辑与Spark SQL中类似,用于创建的流和普通表做Join
BroadcastNestedLoopJoin
CartesianProduct
...
RuleExecutor
AddExchange //为有partition的普通表或者流做查询分布
StreamExchange
StreamPlan.executetoDStream
因为StreamExchange涉及查询拆分,并且在其中的child查询计划经常是叶子节点,涉及物理流数据的扫描,所以重点分析一下。
-------------
case class StreamExchange(newPartitioning: Partitioning, child: StreamPlan) extends UnaryNode {
def output = child.output
lazy val sparkPlan = execution.Exchange(newPartitioning, child.sparkPlan)
override def execute = attachTree(this, "execute") {
child.execute().transform(_ => sparkPlan.execute())
}
}
-------------
代码中child对应的实际对象为StreamHiveScan,child.sparkPlan对应的实际对象为ExistingRDD
child.execute执行后child.sparkPlan.rdd变为从流转化过来的RDD(具体参照StreamHiveScan的execute方法最后一行),所以sparkPlan.execute()执行时实际调用Exchange对象的execute方法,Exchange.execute函数中调用的child.execute().mapPartitions实际为"流转化过来的RDD"的mapPartitions操作。这样就把StreamSQL模块的代码和SQL SQL Catalyst的代码结合起来了。
2.3 功能3实现
在功能2的基础上实现HashJoin,BroadcastNestedLoopJoin,CartesianProduct三个stream包下的策略,并且替换对应条件下的Join操作,Join实现是目前和Spark SQL中的实现代码一样,猜测是准备修改代码,添加更多优化方法,但看代码似乎还没做。参考资料:
intel stream-sql: https://github.com/intel-spark/stream-sql
相关文章推荐
- 【SQL】分析函数功能-排序
- int 10H 显示字符串功能在《自己动手写OS》第五章代码中的分析
- 用SQL实现Oracle中的分析函数功能
- 偷懒日志 - 自动生成代码 - 第一步 分析SQL
- 代码分析错误查询SQL
- discuz论坛apache日志hadoop大数据分析项目:清洗数据核心功能解说及代码实现
- 交接工作不要只分析流程和看静态的看代码呀,一定要动手,增加一个功能,解决一个 BUG什么的,才能真正理解交接的工作内容呀!
- JAVA功能代码《5》----将Java中的util.Date转换成sql.Date
- CSS相册简单实现方法(功能分析及代码)
- DB Optimizer:数据库SQL代码分析调试优化,数据库存储过程优化,数据库负载测试
- 代码详解C#中SQL连接字符串的功能
- webrtc中的MethodCall0代码功能分析
- SQL Server 第四堂课,创建存储过程。存储过程是一组编译在单个执行计划中的transact-SQL语句。存储过程相当于C#函数,可以允许模块化程序设计,允许更快执行如果某操作需要大量transct-SQL代码或需要重复执行,将在创建存储过程中对其进行分析和优化。
- Intel 82599 ixgbe & ixgbevf CNA 卡驱动分析03——部分功能代码分析
- 【性能诊断】二、单功能场景的性能分析(fiddler、SQL Profiler)
- 怎样在VC中用代码操作SQL(1)?---注册帐号,修改密码,登录3个功能
- JAVA功能代码《5》----将Java中的util.Date转换成sql.Date
- etimer_process功能代码分析
- owncloud代码分析 剔除用户管理功能
- 触摸屏代码分析——实现滑动功能