Presto源码分析(和hive执行计划的比较)
2016-10-23 10:12
369 查看
聚合操作比较
1 presto groupby
2 hive groupby
排序操作比较
1 presto排序
2 hive排序
join
1 presto
2 hive
explain select * from nation where n_nationkey>10 order by n_regionkey;
对于这样的join操作,总体思路是扫描两个表,并在全网分发其中的一个表,在扫描那个不需要分发的表时,就开始执行join,join执行完后,如果有聚合,就执行部分聚合,因为聚合的key不一定是join的key,如果聚合的key是join的key,那么就不需要两步聚合了,第一步聚合的结果就是最终聚合结果。部分聚合结束后,进行聚合结果的分发,然后执行全局聚合。如果有orderby操作,再执行orderby。一共划分为4个stage:
Stage0
OutputNode->TopNNode->RemoteSourceNode 对应的操作符如下:
ExchangeOperator->TaskOutputOperator->TopNOperator(partial:false)
Stage1
TonNNode->AggregateNode(Final)->RemoteSourceNode() 对应的操作符如下:
ExchangeOperator->HashAggregationOperator(Final)->TopNOperator(partial:true)->TaskOutputOperator //进行最终聚合
Stage2
ProjectNode->AggregateNode(Partial)->ProjectNode->JoinNode-> //join之后进行部分聚合
- rignt->RemoteSourceNode () 对应操作符如下:
ExchangeOperator->HashBuilderOperator 从其他节点获取lineitem表分发过来的数据。hashbuilder是干什么的?猜测因为使用的是lookupjoin,所以要在这里构造hash表,进行hashjoin
- left->PorjectNode->FilterNode->TableScanNode(orders) 对应操作符如下:
ScanFilterAndProjectOperator(orders)->PartitionedOutputOperator 扫描orders表,并进行分发
Stage3
ProjectNode->FilterNode->TableScanNode(lineitem)
ScanFilterAndProjectOperator(lineitem)(在此之前是进行lineitem表的扫描) ->LookupJoinOperator->FilterAndProjectOperator->HashAggregationOperator(Partial)->FilterAndProjectOperator->PartitionedOutputOperator
1 presto groupby
2 hive groupby
排序操作比较
1 presto排序
2 hive排序
join
1 presto
2 hive
1 聚合操作比较
1.1 presto groupby
explain select sum(totalprice),orderpriority from orders group by orderpriority;- Output[_col0, orderpriority] => [sum:double, orderpriority:varchar(15)] _col0 := sum - RemoteExchange[GATHER] => sum:double, orderpriority:varchar(15) - Project => [sum:double, orderpriority:varchar(15)] - Aggregate(FINAL)[orderpriority] => [orderpriority:varchar(15), $hashvalue:bigint, sum:double] //最终聚合 sum := "sum"("sum_9") - RemoteExchange[REPARTITION] => orderpriority:varchar(15), sum_9:double, $hashvalue:bigint //从部分聚合节点远程拉取结果 - Aggregate(PARTIAL)[orderpriority] => [orderpriority:varchar(15), $hashvalue_11:bigint, sum_10:double] sum_10 := "sum"("totalprice") //部分聚合 - Project => [$hashvalue_11:bigint, orderpriority:varchar(15), totalprice:double] $hashvalue_11 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderpriority"), 0)) - TableScan[tpch:tpch:orders:sf1.0, originalConstraint = true] => [totalprice:double, orderpriority:varchar(15)] //表扫描操作 totalprice := tpch:totalprice orderpriority := tpch:orderpriority
1.2 hive groupby
explain select sum(o_totalprice),o_orderpriority from orders where o_orderkey>100 group by o_orderpriority;STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan //表扫描 alias: orders Statistics: Num rows: 6000000 Data size: 596779236 Basic stats: COMPLETE Column stats: NONE Filter Operator //表过滤 predicate: (o_orderkey > 100) (type: boolean) Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE Select Operator //表投影 expressions: o_orderpriority (type: string), o_totalprice (type: double) outputColumnNames: o_orderpriority, o_totalprice Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE Group By Operator //对扫描、过滤和投影之后 的记过进行局部groupby,hash的方式 aggregations: sum(o_totalprice) //聚合函数 keys: o_orderpriority (type: string) mode: hash outputColumnNames: _col0, _col1 Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator //输出局部groupby的结果,作为reduce的输入,suffle的key是_col0,应该是o_orderpriority key expressions: _col0 (type: string) sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: double) Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0) //聚合函数 keys: KEY._col0 (type: string) mode: mergepartial //不用于上边的hash聚合,这里执行的是mergepartial聚合,即把部分聚合的记过进行merge outputColumnNames: _col0, _col1 Statistics: Num rows: 1000000 Data size: 99463206 Basic stats: COMPLETE Column stats: NONE Select Operator //对map的输出进行投影操作 expressions: _col1 (type: double), _col0 (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 1000000 Data size: 99463206 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: true Statistics: Num rows: 1000000 Data size: 99463206 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
2 排序操作比较
2.1 presto排序
explain select * from nation where nationkey>10 order by regionkey;- Output[nationkey, name, regionkey, comment] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)] - Project => [name:varchar(25), nationkey:bigint, comment:varchar(152), regionkey:bigint] //投影操作 - Sort[regionkey ASC_NULLS_LAST] => [regionkey:bigint, nationkey:bigint, name:varchar(25), comment:varchar(152)] //全局排序操作,单节点上执行 - RemoteExchange[GATHER] => regionkey:bigint, nationkey:bigint, name:varchar(25), comment:varchar(152) //从其他节点拉取非排序的结果 - Project => [name:varchar(25), nationkey:bigint, comment:varchar(152), regionkey:bigint] - Filter[("nationkey" > BIGINT '10')] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)] //过滤操作 - TableScan[tpch:tpch:nation:sf1.0, originalConstraint = ("nationkey" > BIGINT '10')] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:va //表扫描操作 nationkey := tpch:nationkey name := tpch:name regionkey := tpch:regionkey comment := tpch:comment
2.2 hive排序
(遗留问题,在哪里进行的全局排序?难道是因为到了reduce默认进行了全局排序?)explain select * from nation where n_nationkey>10 order by n_regionkey;
STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan //表扫描 alias: nation Statistics: Num rows: 10 Data size: 2224 Basic stats: COMPLETE Column stats: NONE Filter Operator //行过滤 predicate: (n_nationkey > 10) (type: boolean) Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE Select Operator //行投影 expressions: n_nationkey (type: int), n_name (type: string), n_regionkey (type: int), n_comment (type: string) outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator //map输出,输出的key是n_regionkey,这个key默认是排序的 key expressions: _col2 (type: int) sort order: + Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int), _col1 (type: string), _col3 (type: string) Reduce Operator Tree: Select Operator //把map的输出进行投影操作 expressions: VALUE._col0 (type: int), VALUE._col1 (type: string), KEY.reducesinkkey0 (type: int), VALUE._col2 (type: string) outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE File Output Operator //直接输出即可,在map的输出拉取到reduce之后,默认进行了全局排序 compressed: true Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
3 join
3.1 presto
explain select l.orderkey, sum(extendedprice) from orders o join lineitem l on l.orderkey = o.orderkey where o.orderkey>100 group by l.orderkey order by l.orderkey limit 10;
Output[orderkey, _col1] => [orderkey_0:bigint, sum:double] orderkey := orderkey_0 _col1 := sum - TopN[10 by (orderkey_0 ASC_NULLS_LAST)] => [orderkey_0:bigint, sum:double] - RemoteExchange[GATHER] => orderkey_0:bigint, sum:double - TopN[10 by (orderkey_0 ASC_NULLS_LAST)] => [orderkey_0:bigint, sum:double] - Aggregate(FINAL)[orderkey_0] => [orderkey_0:bigint, sum:double] sum := "sum"("sum_22") - RemoteExchange[REPARTITION] => orderkey_0:bigint, sum_22:double, $hashvalue:bigint - Project => [orderkey_0:bigint, sum_23:double, $hashvalue_27:bigint] $hashvalue_27 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderkey_0"), 0)) - Aggregate(PARTIAL)[orderkey_0] => [orderkey_0:bigint, sum_23:double] sum_23 := "sum"("extendedprice") - Project => [extendedprice:double, orderkey_0:bigint] - InnerJoin[("orderkey" = "orderkey_18")] => [orderkey:bigint, $hashvalue_24:bigint, orderkey_0:bigint, extendedprice:double, orderkey_18:bigint, $hashvalue_25:bigint] - Project => [orderkey:bigint, $hashvalue_24:bigint] $hashvalue_24 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderkey"), 0)) - Filter[("orderkey" > BIGINT '100')] => [orderkey:bigint] - TableScan[tpch:tpch:orders:sf1.0, originalConstraint = ("orderkey" > BIGINT '100')] => [orderkey:bigint] orderkey := tpch:orderkey - RemoteExchange[REPARTITION] => orderkey_0:bigint, extendedprice:double, orderkey_18:bigint, $hashvalue_25:bigint - Project => [orderkey_0:bigint, extendedprice:double, $hashvalue_26:bigint] $hashvalue_26 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderkey_0"), 0)) - Filter[("orderkey_0" > BIGINT '100')] => [orderkey_0:bigint, extendedprice:double] - TableScan[tpch:tpch:lineitem:sf1.0, originalConstraint = ("orderkey_0" > BIGINT '100')] => [orderkey_0:bigint, extendedprice:double] orderkey_0 := tpch:orderkey extendedprice := tpch:extendedprice
对于这样的join操作,总体思路是扫描两个表,并在全网分发其中的一个表,在扫描那个不需要分发的表时,就开始执行join,join执行完后,如果有聚合,就执行部分聚合,因为聚合的key不一定是join的key,如果聚合的key是join的key,那么就不需要两步聚合了,第一步聚合的结果就是最终聚合结果。部分聚合结束后,进行聚合结果的分发,然后执行全局聚合。如果有orderby操作,再执行orderby。一共划分为4个stage:
Stage0
OutputNode->TopNNode->RemoteSourceNode 对应的操作符如下:
ExchangeOperator->TaskOutputOperator->TopNOperator(partial:false)
Stage1
TonNNode->AggregateNode(Final)->RemoteSourceNode() 对应的操作符如下:
ExchangeOperator->HashAggregationOperator(Final)->TopNOperator(partial:true)->TaskOutputOperator //进行最终聚合
Stage2
ProjectNode->AggregateNode(Partial)->ProjectNode->JoinNode-> //join之后进行部分聚合
- rignt->RemoteSourceNode () 对应操作符如下:
ExchangeOperator->HashBuilderOperator 从其他节点获取lineitem表分发过来的数据。hashbuilder是干什么的?猜测因为使用的是lookupjoin,所以要在这里构造hash表,进行hashjoin
- left->PorjectNode->FilterNode->TableScanNode(orders) 对应操作符如下:
ScanFilterAndProjectOperator(orders)->PartitionedOutputOperator 扫描orders表,并进行分发
Stage3
ProjectNode->FilterNode->TableScanNode(lineitem)
ScanFilterAndProjectOperator(lineitem)(在此之前是进行lineitem表的扫描) ->LookupJoinOperator->FilterAndProjectOperator->HashAggregationOperator(Partial)->FilterAndProjectOperator->PartitionedOutputOperator
3.2 hive
相关文章推荐
- hive原理与源码分析-物理执行计划与执行引擎(六)
- Hive 执行过程源码分析
- Hive SQL解析/执行计划生成流程分析
- Hive----查询执行计划(explain)和分析表数据(ANALYZE)
- Hive执行过程源码分析
- hive 结合执行计划 分析 limit 执行原理
- Presto 源码分析 (1) - 一条 sql 在 presto 内如何被执行
- hive原理与源码分析-UDxF、优化器及执行引擎(五)
- hive 结合执行计划 分析 limit 执行原理
- Hive SQL解析/执行计划生成流程分析
- hive 结合执行计划 分析 limit 执行原理
- hive执行源码分析
- hive 结合执行计划 分析 limit 执行原理
- Pig源码分析: 逻辑执行计划优化
- [一起学Hive]之十九-使用Hive API分析HQL的执行计划、Job数量和表的血缘关系
- Presto源码分析(hive-rcfile)
- Pig源码分析: 逻辑执行计划模块
- hive 结合执行计划 分析 limit 执行原理
- Hive SQL解析/执行计划生成流程分析
- Pig源码分析: 简析执行计划的生成