spark中多表连接优化实例
2016-07-13 17:26
225 查看
环境信息:
hive1.2.1
spark1.6.1
hadoop2.6.0-cdh5.4.2
memory:1918752, vCores:506
表结构:
原hive sql:
(为求简便,所有字段以XX代替,最终有90个字段)
原sql,hive运行,具体现象为几个stage最后有一个reduce时间特别长,以上表关联基本是主键关联或做除重,不应存在数据倾斜;
优化开启数据倾斜优化,增加reduce数量
set hive.optimize.skewjoin=true;
set hive.exec.reducers.bytes.per.reducer=400000000;
没有效果,执行过程中现象一样。而且报内存超限错误(143);
继续增大container内存设置:set mapreduce.reduce.memory.mb=6144;
中间运行不报错,但是执行时间很久;最后报错:[Error 30001]: StatsPublisher cannot be initialized·····
改用spark运行:使用belline命令:
直接运行原sql耗时2.3h;大量时间消耗在TST_PC_ORDER与各个表的关联,hdfs读写不断增加,60G左右,非常耗时
优化1:由于TST_ORDER_VEHICLE 是与 TST_ORDER_RISK 关联,可以将两者先做关联,之后与主表关联;
这样做之后发现,spark会多生成一个stage,将TST_ORDER_VEHICLE 与 TST_ORDER_RISK 的关联提前做掉;但是这个stage耗时2min;总体时间42min
优化2:
既然关键耗时在TST_PC_ORDER表的扫描上,可以让其他的stage先运行;修改如下
如此多扫描TST_PC_ORDER表一次,但因为只扫描一列,速度较快;但是受制于A表与G表关联时间较长,且整体P需要A与G的关联结果出来之后才能运行;总耗时38min;
优化3:如果能在TST_PC_ORDER扫描的时候将所有其他的关联做完就完美了;修改表连接顺序如下:
这样A表扫描需要大约12分钟,在扫描的同时,P中的关联差不多执行完成;总耗时25min;
综上,优化考虑以下几点:
1、不需要的字段不要查出来,尽量减小结果集的数据量;
2、能够并行执行的stage,尽量在写法上将逻辑隔离开来,降低耦合度,让代码充分并行执行。
3、根据执行的关键路径进行优化,修改表的关联顺序,缩短关键路径执行时间。
hive1.2.1
spark1.6.1
hadoop2.6.0-cdh5.4.2
memory:1918752, vCores:506
表结构:
表名称 | 表容量 | 主键 | hive存储类型 |
---|---|---|---|
temp_01_pc_order | 5G | PC_ORDER_ID | RCFile |
TST_ORDER_RISK | 9.4G | 非 PC_ORDER_ID | RCFile |
TST_ORDER_VEHICLE | 36G | PC_ORDER_VEHICLE_ID | RCFile |
TST_ORDER_ASSIST | 800M | 非ORDER_ID | RCFile |
TST_PC_ORDER | 90G | PC_ORDER_ID | RCFile |
(为求简便,所有字段以XX代替,最终有90个字段)
INSERT OVERWRITE TABLE H_DW_ORDER SELECT A.PC_ORDER_ID, A.XX90 FROM TST_PC_ORDER A LEFT JOIN (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID ,XX ,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO FROM TST_ORDER_RISK) K WHERE ROWNO = 1) B ON A.PC_ORDER_ID = B.PC_ORDER_ID LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID = C.PC_ORDER_VEHICLE_ID LEFT JOIN (SELECT T.* FROM TEMP_01_PC_ORDER T WHERE FLAG_L = 1) D ON A.PC_ORDER_ID = D.PC_ORDER_ID LEFT JOIN (SELECT T1.* FROM TEMP_01_PC_ORDER T1 WHERE FLAG_F = 1) F ON A.PC_ORDER_ID = F.PC_ORDER_ID LEFT JOIN (SELECT H.ORDER_ID, XX FROM (SELECT E.XX ,E.ORDER_ID ,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO FROM TST_ORDER_ASSISTE) H WHERE H.ROW_NO = 1) G ON A.ORDER_ID = G.ORDER_ID;
原sql,hive运行,具体现象为几个stage最后有一个reduce时间特别长,以上表关联基本是主键关联或做除重,不应存在数据倾斜;
优化开启数据倾斜优化,增加reduce数量
set hive.optimize.skewjoin=true;
set hive.exec.reducers.bytes.per.reducer=400000000;
没有效果,执行过程中现象一样。而且报内存超限错误(143);
继续增大container内存设置:set mapreduce.reduce.memory.mb=6144;
中间运行不报错,但是执行时间很久;最后报错:[Error 30001]: StatsPublisher cannot be initialized·····
改用spark运行:使用belline命令:
/appcom/spark/bin/beeline -u jdbc:hive2://10.xx.xx.xxx:88888 -n user0 --color=true --showHeader=true --verbose=true
直接运行原sql耗时2.3h;大量时间消耗在TST_PC_ORDER与各个表的关联,hdfs读写不断增加,60G左右,非常耗时
表名称 | 表容量 | hdfs读取量 | 扫描耗时 |
---|---|---|---|
temp_01_pc_order | 5G | 5G | 5min |
TST_ORDER_RISK | 9.4G | 5.3G | 4min |
TST_ORDER_VEHICLE | 36G | 10G | 4min |
TST_ORDER_ASSIST | 800M | 524M | 4min |
TST_PC_ORDER | 90G | 53.8G | 12min |
INSERT OVERWRITE TABLE H_DW_ORDER SELECT A.PC_ORDER_ID, A.XX90 FROM TST_PC_ORDER A LEFT JOIN (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID ,XX ,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO FROM TST_ORDER_RISK) K WHERE ROWNO = 1) B LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID = C.PC_ORDER_VEHICLE_ID) M ON A.PC_ORDER_ID = M.PC_ORDER_ID LEFT JOIN (SELECT T.* FROM TEMP_01_PC_ORDER T WHERE FLAG_L = 1) D ON A.PC_ORDER_ID = D.PC_ORDER_ID LEFT JOIN (SELECT T1.* FROM TEMP_01_PC_ORDER T1 WHERE FLAG_F = 1) F ON A.PC_ORDER_ID = F.PC_ORDER_ID LEFT JOIN (SELECT H.ORDER_ID, XX FROM (SELECT E.XX ,E.ORDER_ID ,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO FROM TST_ORDER_ASSISTE) H WHERE H.ROW_NO = 1) G ON A.ORDER_ID = G.ORDER_ID;
这样做之后发现,spark会多生成一个stage,将TST_ORDER_VEHICLE 与 TST_ORDER_RISK 的关联提前做掉;但是这个stage耗时2min;总体时间42min
优化2:
既然关键耗时在TST_PC_ORDER表的扫描上,可以让其他的stage先运行;修改如下
INSERT OVERWRITE TABLE H_DW_ORDER SELECT A.PC_ORDER_ID, A.XX90 FROM TST_PC_ORDER A LEFT JOIN (SELECT H.ORDER_ID, XX FROM (SELECT E.XX ,E.ORDER_ID ,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO FROM TST_ORDER_ASSISTE) H WHERE H.ROW_NO = 1) G ON A.ORDER_ID = G.ORDER_ID LEFT JOIN (SELECT R.PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID FROM TST_PC_ORDER) R LEFT JOIN (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID ,XX ,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO FROM TST_ORDER_RISK) K WHERE ROWNO = 1) B LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID = C.PC_ORDER_VEHICLE_ID) M ON R.PC_ORDER_ID = M.PC_ORDER_ID LEFT JOIN (SELECT T.* FROM TEMP_01_PC_ORDER T WHERE FLAG_L = 1) D ON R.PC_ORDER_ID = D.PC_ORDER_ID LEFT JOIN (SELECT T1.* FROM TEMP_01_PC_ORDER T1 WHERE FLAG_F = 1) F ON R.PC_ORDER_ID = F.PC_ORDER_ID) P ON A.PC_ORDER_ID =P.PC_ORDER_ID
如此多扫描TST_PC_ORDER表一次,但因为只扫描一列,速度较快;但是受制于A表与G表关联时间较长,且整体P需要A与G的关联结果出来之后才能运行;总耗时38min;
优化3:如果能在TST_PC_ORDER扫描的时候将所有其他的关联做完就完美了;修改表连接顺序如下:
INSERT OVERWRITE TABLE H_DW_ORDER SELECT A.PC_ORDER_ID, A.XX90 FROM TST_PC_ORDER A LEFT JOIN (SELECT R.PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID ,ORDER_ID FROM TST_PC_ORDER) R LEFT JOIN (SELECT T.* FROM TEMP_01_PC_ORDER T WHERE FLAG_L = 1) D ON R.PC_ORDER_ID = D.PC_ORDER_ID LEFT JOIN (SELECT T1.* FROM TEMP_01_PC_ORDER T1 WHERE FLAG_F = 1) F ON R.PC_ORDER_ID = F.PC_ORDER_ID LEFT JOIN (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID, XX FROM (SELECT PC_ORDER_ID ,XX ,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO FROM TST_ORDER_RISK) K WHERE ROWNO = 1) B LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID = C.PC_ORDER_VEHICLE_ID) M ON R.PC_ORDER_ID = M.PC_ORDER_ID LEFT JOIN (SELECT H.ORDER_ID, XX FROM (SELECT E.XX ,E.ORDER_ID ,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO FROM TST_ORDER_ASSISTE) H WHERE H.ROW_NO = 1) G ON R.ORDER_ID = G.ORDER_ID) P ON A.PC_ORDER_ID = P.PC_ORDER_ID
这样A表扫描需要大约12分钟,在扫描的同时,P中的关联差不多执行完成;总耗时25min;
综上,优化考虑以下几点:
1、不需要的字段不要查出来,尽量减小结果集的数据量;
2、能够并行执行的stage,尽量在写法上将逻辑隔离开来,降低耦合度,让代码充分并行执行。
3、根据执行的关键路径进行优化,修改表的关联顺序,缩短关键路径执行时间。
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- MySQL 优化
- Spark,一种快速数据分析替代方案
- Google排名优化的几个影响因素
- DB2优化(简易版)
- Mysql limit 优化,百万至千万级快速分页 复合索引的引用并应用于轻量级框架
- C#中尾递归的使用、优化及编译器优化
- 对优化Ruby on Rails性能的一些办法的探究
- 优化Ruby脚本效率实例分享
- Asp编码优化技巧
- 如何监测和优化OLAP数据库
- mysql -参数thread_cache_size优化方法 小结
- 深入学习SQL Server聚合函数算法优化技巧
- MySQL常见的底层优化操作教程及相关建议
- 详解mysql的limit经典用法及优化实例
- 数据库学习建议之提高数据库速度的十条建议
- oracle数据库sql的优化总结
- SQL语句性能优化(续)