您的位置:首页 > 其它

spark中多表连接优化实例

2016-07-13 17:26 225 查看
环境信息:

hive1.2.1

spark1.6.1

hadoop2.6.0-cdh5.4.2

memory:1918752, vCores:506

表结构:

表名称表容量主键hive存储类型
temp_01_pc_order5GPC_ORDER_IDRCFile
TST_ORDER_RISK9.4G非 PC_ORDER_IDRCFile
TST_ORDER_VEHICLE36GPC_ORDER_VEHICLE_IDRCFile
TST_ORDER_ASSIST800M非ORDER_IDRCFile
TST_PC_ORDER90GPC_ORDER_IDRCFile
原hive sql:

(为求简便,所有字段以XX代替,最终有90个字段)

INSERT OVERWRITE TABLE H_DW_ORDER
SELECT A.PC_ORDER_ID, A.XX90
FROM TST_PC_ORDER A
LEFT JOIN (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID
,XX
,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO
FROM TST_ORDER_RISK) K
WHERE ROWNO = 1) B ON A.PC_ORDER_ID = B.PC_ORDER_ID
LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID = C.PC_ORDER_VEHICLE_ID
LEFT JOIN (SELECT T.* FROM TEMP_01_PC_ORDER T WHERE FLAG_L = 1) D ON A.PC_ORDER_ID =
D.PC_ORDER_ID
LEFT JOIN (SELECT T1.* FROM TEMP_01_PC_ORDER T1 WHERE FLAG_F = 1) F ON A.PC_ORDER_ID =
F.PC_ORDER_ID
LEFT JOIN (SELECT H.ORDER_ID, XX
FROM (SELECT E.XX
,E.ORDER_ID
,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO
FROM TST_ORDER_ASSISTE) H
WHERE H.ROW_NO = 1) G ON A.ORDER_ID = G.ORDER_ID;


原sql,hive运行,具体现象为几个stage最后有一个reduce时间特别长,以上表关联基本是主键关联或做除重,不应存在数据倾斜;

优化开启数据倾斜优化,增加reduce数量

set hive.optimize.skewjoin=true;

set hive.exec.reducers.bytes.per.reducer=400000000;

没有效果,执行过程中现象一样。而且报内存超限错误(143);

继续增大container内存设置:set mapreduce.reduce.memory.mb=6144;

中间运行不报错,但是执行时间很久;最后报错:[Error 30001]: StatsPublisher cannot be initialized·····

改用spark运行:使用belline命令:

/appcom/spark/bin/beeline -u jdbc:hive2://10.xx.xx.xxx:88888 -n user0 --color=true --showHeader=true --verbose=true


直接运行原sql耗时2.3h;大量时间消耗在TST_PC_ORDER与各个表的关联,hdfs读写不断增加,60G左右,非常耗时

表名称表容量hdfs读取量扫描耗时
temp_01_pc_order5G5G5min
TST_ORDER_RISK9.4G5.3G4min
TST_ORDER_VEHICLE36G10G4min
TST_ORDER_ASSIST800M524M4min
TST_PC_ORDER90G53.8G12min
优化1:由于TST_ORDER_VEHICLE 是与 TST_ORDER_RISK 关联,可以将两者先做关联,之后与主表关联;

INSERT OVERWRITE TABLE H_DW_ORDER
SELECT A.PC_ORDER_ID, A.XX90
FROM TST_PC_ORDER A
LEFT JOIN (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID
,XX
,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO
FROM TST_ORDER_RISK) K
WHERE ROWNO = 1) B
LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID =
C.PC_ORDER_VEHICLE_ID) M ON A.PC_ORDER_ID =
M.PC_ORDER_ID
LEFT JOIN (SELECT T.* FROM TEMP_01_PC_ORDER T WHERE FLAG_L = 1) D ON A.PC_ORDER_ID =
D.PC_ORDER_ID
LEFT JOIN (SELECT T1.* FROM TEMP_01_PC_ORDER T1 WHERE FLAG_F = 1) F ON A.PC_ORDER_ID =
F.PC_ORDER_ID
LEFT JOIN (SELECT H.ORDER_ID, XX
FROM (SELECT E.XX
,E.ORDER_ID
,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO
FROM TST_ORDER_ASSISTE) H
WHERE H.ROW_NO = 1) G ON A.ORDER_ID = G.ORDER_ID;


这样做之后发现,spark会多生成一个stage,将TST_ORDER_VEHICLE 与 TST_ORDER_RISK 的关联提前做掉;但是这个stage耗时2min;总体时间42min

优化2:

既然关键耗时在TST_PC_ORDER表的扫描上,可以让其他的stage先运行;修改如下

INSERT OVERWRITE TABLE H_DW_ORDER
SELECT A.PC_ORDER_ID, A.XX90
FROM TST_PC_ORDER A
LEFT JOIN (SELECT H.ORDER_ID, XX
FROM (SELECT E.XX
,E.ORDER_ID
,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO
FROM TST_ORDER_ASSISTE) H
WHERE H.ROW_NO = 1) G ON A.ORDER_ID = G.ORDER_ID
LEFT JOIN (SELECT R.PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID FROM TST_PC_ORDER) R
LEFT JOIN (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID
,XX
,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO
FROM TST_ORDER_RISK) K
WHERE ROWNO = 1) B
LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID = C.PC_ORDER_VEHICLE_ID) M
ON R.PC_ORDER_ID = M.PC_ORDER_ID
LEFT JOIN (SELECT T.*
FROM TEMP_01_PC_ORDER T
WHERE FLAG_L = 1) D ON R.PC_ORDER_ID =
D.PC_ORDER_ID
LEFT JOIN (SELECT T1.*
FROM TEMP_01_PC_ORDER T1
WHERE FLAG_F = 1) F ON R.PC_ORDER_ID = F.PC_ORDER_ID) P
ON A.PC_ORDER_ID =P.PC_ORDER_ID


如此多扫描TST_PC_ORDER表一次,但因为只扫描一列,速度较快;但是受制于A表与G表关联时间较长,且整体P需要A与G的关联结果出来之后才能运行;总耗时38min;

优化3:如果能在TST_PC_ORDER扫描的时候将所有其他的关联做完就完美了;修改表连接顺序如下:

INSERT OVERWRITE TABLE H_DW_ORDER
SELECT A.PC_ORDER_ID, A.XX90
FROM TST_PC_ORDER A
LEFT JOIN (SELECT R.PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID ,ORDER_ID FROM TST_PC_ORDER) R
LEFT JOIN (SELECT T.*
FROM TEMP_01_PC_ORDER T
WHERE FLAG_L = 1) D ON R.PC_ORDER_ID =
D.PC_ORDER_ID
LEFT JOIN (SELECT T1.*
FROM TEMP_01_PC_ORDER T1
WHERE FLAG_F = 1) F ON R.PC_ORDER_ID =
F.PC_ORDER_ID
LEFT JOIN (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID, XX
FROM (SELECT PC_ORDER_ID
,XX
,ROW_NUMBER() OVER(PARTITION BY PC_ORDER_ID ORDER BY UPDATED_DATE DESC) AS ROWNO
FROM TST_ORDER_RISK) K
WHERE ROWNO = 1) B
LEFT JOIN TST_ORDER_VEHICLE C ON B.CLIENT_ID =
C.PC_ORDER_VEHICLE_ID) M ON R.PC_ORDER_ID =
M.PC_ORDER_ID
LEFT JOIN (SELECT H.ORDER_ID, XX
FROM (SELECT E.XX
,E.ORDER_ID
,ROW_NUMBER() OVER(PARTITION BY E.ORDER_ID ORDER BY E.DATE_UPDATED DESC) AS ROW_NO
FROM TST_ORDER_ASSISTE) H
WHERE H.ROW_NO = 1) G ON R.ORDER_ID = G.ORDER_ID) P ON A.PC_ORDER_ID = P.PC_ORDER_ID


这样A表扫描需要大约12分钟,在扫描的同时,P中的关联差不多执行完成;总耗时25min;

综上,优化考虑以下几点:

1、不需要的字段不要查出来,尽量减小结果集的数据量;

2、能够并行执行的stage,尽量在写法上将逻辑隔离开来,降低耦合度,让代码充分并行执行。

3、根据执行的关键路径进行优化,修改表的关联顺序,缩短关键路径执行时间。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 优化