您的位置:首页 > 其它

Presto源码分析(和hive执行计划的比较)

2016-10-23 10:12 369 查看
聚合操作比较
1 presto groupby

2 hive groupby

排序操作比较
1 presto排序

2 hive排序

join
1 presto

2 hive

1 聚合操作比较

1.1 presto groupby

explain select sum(totalprice),orderpriority from orders group by orderpriority;

- Output[_col0, orderpriority] => [sum:double, orderpriority:varchar(15)]
_col0 := sum
- RemoteExchange[GATHER] => sum:double, orderpriority:varchar(15)
- Project => [sum:double, orderpriority:varchar(15)]
- Aggregate(FINAL)[orderpriority] => [orderpriority:varchar(15), $hashvalue:bigint, sum:double] //最终聚合
sum := "sum"("sum_9")
- RemoteExchange[REPARTITION] => orderpriority:varchar(15), sum_9:double, $hashvalue:bigint //从部分聚合节点远程拉取结果
- Aggregate(PARTIAL)[orderpriority] => [orderpriority:varchar(15), $hashvalue_11:bigint, sum_10:double]
sum_10 := "sum"("totalprice") //部分聚合
- Project => [$hashvalue_11:bigint, orderpriority:varchar(15), totalprice:double]
$hashvalue_11 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderpriority"), 0))
- TableScan[tpch:tpch:orders:sf1.0, originalConstraint = true] => [totalprice:double, orderpriority:varchar(15)] //表扫描操作
totalprice := tpch:totalprice
orderpriority := tpch:orderpriority


1.2 hive groupby

explain select sum(o_totalprice),o_orderpriority from orders where o_orderkey>100 group by o_orderpriority;

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan //表扫描
alias: orders
Statistics: Num rows: 6000000 Data size: 596779236 Basic stats: COMPLETE Column stats: NONE
Filter Operator //表过滤
predicate: (o_orderkey > 100) (type: boolean)
Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE
Select Operator //表投影
expressions: o_orderpriority (type: string), o_totalprice (type: double)
outputColumnNames: o_orderpriority, o_totalprice
Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE
Group By Operator //对扫描、过滤和投影之后 的记过进行局部groupby,hash的方式
aggregations: sum(o_totalprice) //聚合函数
keys: o_orderpriority (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator //输出局部groupby的结果,作为reduce的输入,suffle的key是_col0,应该是o_orderpriority
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 2000000 Data size: 198926412 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0) //聚合函数
keys: KEY._col0 (type: string)
mode: mergepartial //不用于上边的hash聚合,这里执行的是mergepartial聚合,即把部分聚合的记过进行merge
outputColumnNames: _col0, _col1
Statistics: Num rows: 1000000 Data size: 99463206 Basic stats: COMPLETE Column stats: NONE
Select Operator //对map的输出进行投影操作
expressions: _col1 (type: double), _col0 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 1000000 Data size: 99463206 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: true
Statistics: Num rows: 1000000 Data size: 99463206 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


2 排序操作比较

2.1 presto排序

explain select * from nation where nationkey>10 order by regionkey;

- Output[nationkey, name, regionkey, comment] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)]
- Project => [name:varchar(25), nationkey:bigint, comment:varchar(152), regionkey:bigint] //投影操作
- Sort[regionkey ASC_NULLS_LAST] => [regionkey:bigint, nationkey:bigint, name:varchar(25), comment:varchar(152)] //全局排序操作,单节点上执行
- RemoteExchange[GATHER] => regionkey:bigint, nationkey:bigint, name:varchar(25), comment:varchar(152) //从其他节点拉取非排序的结果
- Project => [name:varchar(25), nationkey:bigint, comment:varchar(152), regionkey:bigint]
- Filter[("nationkey" > BIGINT '10')] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:varchar(152)] //过滤操作
- TableScan[tpch:tpch:nation:sf1.0, originalConstraint = ("nationkey" > BIGINT '10')] => [nationkey:bigint, name:varchar(25), regionkey:bigint, comment:va //表扫描操作
nationkey := tpch:nationkey
name := tpch:name
regionkey := tpch:regionkey
comment := tpch:comment


2.2 hive排序

(遗留问题,在哪里进行的全局排序?难道是因为到了reduce默认进行了全局排序?)

explain select * from nation where n_nationkey>10 order by n_regionkey;

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan //表扫描
alias: nation
Statistics: Num rows: 10 Data size: 2224 Basic stats: COMPLETE Column stats: NONE
Filter Operator //行过滤
predicate: (n_nationkey > 10) (type: boolean)
Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE
Select Operator //行投影
expressions: n_nationkey (type: int), n_name (type: string), n_regionkey (type: int), n_comment (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator //map输出,输出的key是n_regionkey,这个key默认是排序的
key expressions: _col2 (type: int)
sort order: +
Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type: string), _col3 (type: string)
Reduce Operator Tree:
Select Operator //把map的输出进行投影操作
expressions: VALUE._col0 (type: int), VALUE._col1 (type: string), KEY.reducesinkkey0 (type: int), VALUE._col2 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE
File Output Operator //直接输出即可,在map的输出拉取到reduce之后,默认进行了全局排序
compressed: true
Statistics: Num rows: 3 Data size: 667 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe


3 join

3.1 presto

explain
select
l.orderkey,  sum(extendedprice)
from
orders o join lineitem l
on l.orderkey = o.orderkey
where
o.orderkey>100
group by l.orderkey
order by l.orderkey
limit 10;


Output[orderkey, _col1] => [orderkey_0:bigint, sum:double]
orderkey := orderkey_0
_col1 := sum
- TopN[10 by (orderkey_0 ASC_NULLS_LAST)] => [orderkey_0:bigint, sum:double]
- RemoteExchange[GATHER] => orderkey_0:bigint, sum:double
- TopN[10 by (orderkey_0 ASC_NULLS_LAST)] => [orderkey_0:bigint, sum:double]
- Aggregate(FINAL)[orderkey_0] => [orderkey_0:bigint, sum:double]
sum := "sum"("sum_22")
- RemoteExchange[REPARTITION] => orderkey_0:bigint, sum_22:double, $hashvalue:bigint
- Project => [orderkey_0:bigint, sum_23:double, $hashvalue_27:bigint]
$hashvalue_27 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderkey_0"), 0))
- Aggregate(PARTIAL)[orderkey_0] => [orderkey_0:bigint, sum_23:double]
sum_23 := "sum"("extendedprice")
- Project => [extendedprice:double, orderkey_0:bigint]
- InnerJoin[("orderkey" = "orderkey_18")] => [orderkey:bigint, $hashvalue_24:bigint, orderkey_0:bigint, extendedprice:double, orderkey_18:bigint, $hashvalue_25:bigint]
- Project => [orderkey:bigint, $hashvalue_24:bigint]
$hashvalue_24 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderkey"), 0))
- Filter[("orderkey" > BIGINT '100')] => [orderkey:bigint]
- TableScan[tpch:tpch:orders:sf1.0, originalConstraint = ("orderkey" > BIGINT '100')] => [orderkey:bigint]
orderkey := tpch:orderkey
- RemoteExchange[REPARTITION] => orderkey_0:bigint, extendedprice:double, orderkey_18:bigint, $hashvalue_25:bigint
- Project => [orderkey_0:bigint, extendedprice:double, $hashvalue_26:bigint]
$hashvalue_26 := "combine_hash"(BIGINT '0', COALESCE("$operator$hash_code"("orderkey_0"), 0))
- Filter[("orderkey_0" > BIGINT '100')] => [orderkey_0:bigint, extendedprice:double]
- TableScan[tpch:tpch:lineitem:sf1.0, originalConstraint = ("orderkey_0" > BIGINT '100')] => [orderkey_0:bigint, extendedprice:double]
orderkey_0 := tpch:orderkey
extendedprice := tpch:extendedprice


对于这样的join操作,总体思路是扫描两个表,并在全网分发其中的一个表,在扫描那个不需要分发的表时,就开始执行join,join执行完后,如果有聚合,就执行部分聚合,因为聚合的key不一定是join的key,如果聚合的key是join的key,那么就不需要两步聚合了,第一步聚合的结果就是最终聚合结果。部分聚合结束后,进行聚合结果的分发,然后执行全局聚合。如果有orderby操作,再执行orderby。一共划分为4个stage:

Stage0

OutputNode->TopNNode->RemoteSourceNode 对应的操作符如下:

ExchangeOperator->TaskOutputOperator->TopNOperator(partial:false)

Stage1

TonNNode->AggregateNode(Final)->RemoteSourceNode() 对应的操作符如下:

ExchangeOperator->HashAggregationOperator(Final)->TopNOperator(partial:true)->TaskOutputOperator //进行最终聚合

Stage2

ProjectNode->AggregateNode(Partial)->ProjectNode->JoinNode-> //join之后进行部分聚合

- rignt->RemoteSourceNode () 对应操作符如下:

ExchangeOperator->HashBuilderOperator 从其他节点获取lineitem表分发过来的数据。hashbuilder是干什么的?猜测因为使用的是lookupjoin,所以要在这里构造hash表,进行hashjoin

- left->PorjectNode->FilterNode->TableScanNode(orders) 对应操作符如下:

ScanFilterAndProjectOperator(orders)->PartitionedOutputOperator 扫描orders表,并进行分发

Stage3

ProjectNode->FilterNode->TableScanNode(lineitem)

ScanFilterAndProjectOperator(lineitem)(在此之前是进行lineitem表的扫描) ->LookupJoinOperator->FilterAndProjectOperator->HashAggregationOperator(Partial)->FilterAndProjectOperator->PartitionedOutputOperator

3.2 hive




                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hive