原文标题:奇思妙想的SQL|兼顾性能的数据倾斜处理新姿势
原文作者:阿里云开发者
冷月清谈:
怜星夜思:
2、文章提到的“热点Mapjoin+非热点Distmapjoin”方案,如何确定热点数据的阈值?
3、除了文章中提到的方案,还有哪些方法可以优化数据倾斜问题?
原文内容
阿里妹导读
文章导读
一、场景描述
如上图所示,数据重分发过程中,按照Join Key(即卖家ID)进行Shuffle,大部分交易数据记录分发至处理节点1,导致三个并发处理节点中,处理节点1需要处理的数据量远大于其他两个处理节点,从而造成数据处理的不均匀,即数据倾斜。
二、常见的优化方法
2.1.Mapjoin
SELECT /*+MAPJOIN(dim)*/ *
FROM (SELECT * FROM dwd_tbl) base
LEFT OUTER JOIN (SELECT * FROM dim_tbl) dim
ON base.dim_key = dim.dim_key
2.2.特殊值/空值打散
-
特殊值/空值场景也比较普遍,比如主表中有个属性字段在某些场景下为空或为一些无业务含义的特殊字符串(如DEFAULT),然后此属性字段本身对应了一张数据量较大的维表,需要关联打宽补全。此时做数据关联,由于两张表需要按照关联key进行shuffle,就会导致主表中该字段为空/相同特殊字符串的数据记录shuffle到同一节点上,从而导致数据倾斜。
-
此类场景也好解决,对特殊值/空值在关联时转为随机值就行。
SELECT *
FROM (SELECT * FROM dwd_tbl) base
LEFT OUTER JOIN (SELECT * FROM dim_tbl) dim
ON IF(COALESCE(base.dim_key,'')='',CONCAT('HIVE_',RAND()),base.dim_key) = dim.dim_key
2.3.热点值打散,副表呈倍数扩散
-
此类方法使用较少,核心在于对于主表附加一个随机值(比如1~10)字段,记为ext_a字段,然后对应被关联维表数据按照对应倍数进行复制膨胀,并依次赋予1~10的编号,记为ext_b字段,然后在关联两张表时把ext_a、ext_b两个字段也作为关联字段之一。
-
此方法适用于被关联表远比主表小,但又因数据大小超过内存容量而无法使用Mapjoin,且主表的数据倾斜程度不大(即极值对应的数据行数相较于值平均对应行数,倍数差距不太大)的情况下可以使用,但整体上此方案只能对数据热点成倍数的削弱一些。
SELECT *
FROM (
SELECT *,CAST(RAND()*10 AS BIGINT) AS ext_a
FROM dwd_tbl
) base
LEFT OUTER JOIN (
SELECT *
FROM dim_tbl
LATERAL VIEW EXPLODE(SPLIT('0;1;2;3;4;5;6;7;8;9',';')) tt AS ext_b
-- 或者Join一个用于倍数膨胀的小表
) dim
ON base.dim_key = dim.dim_key
AND base.ext_a = dim.ext_b
2.4.热点数据单独处理/SkewJoin
-
使用此方法通常也意味着被关联的维表数据大小较大,无法使用Mapjoin,只能走普通shuffle模式的join方案。此类场景最典型的案例就是双十一淘系交易大表关联商家维表,此时的商家维表因记录数和数据大小都较大而无法放入内存,再加上部分商家的交易单量远超大盘平均,此时的数据倾斜就得使用热点数据单独处理的方案了。
-
热点数据单独处理的方案的核心点在于将热点数据提取出来单独处理,热点数据可以用Mapjoin的方式完成关联维表热点记录行,非热点则使用普通的shuffle模式的join方案完成关联。
-
具体操作主要分三个部分:基于主表统计获得Top热点的属性值;用热点属性值将被关联维表拆成热点小表和非热点表,同时也将主表拆成热点主表和非热点主表;热点小表通过Mapjoin与热点主表join,非热点表与非热点主表join,最终两部分再Union到一起,完成数据关联。
-- Step01:热点数据记录提取 INSERT OVERWRITE TABLE tmp_hot_list PARTITION (dt = '${bizdate}') SELECT dim_shop_id AS hot_id FROM main_table WHERE dt = '${bizdate}' GROUP BY dim_shop_id HAVING COUNT(1) > 10000 ;
INSERT OVERWRITE TABLE final_result_table PARTITION (dt = ‘${bizdate}’)
– Step02:热点数据处理,使用MapJoin完成处理
SELECT /+MAPJOIN(a2,a3)/
a1.trade_no AS trade_no
,a1.dim_shop_id AS shop_id
,a3.shop_name AS shop_name
,a3.shop_type AS shop_type
FROM (SELECT * FROM main_table WHERE dt = ‘${bizdate}’) a1
– Step02-1:主表用JOIN关联热点表进行热点记录筛选
JOIN (SELECT * FROM tmp_hot_list WHERE dt = ‘${bizdate}’) a2 – 热点数据清单
ON a1.dim_shop_id = a2.dim_shop_id
– Step02-2:热点维度数据处理
LEFT OUTER JOIN (
SELECT /+MAPJOIN(b2)/ b1.*
FROM (SELECT * FROM dim_table_info WHERE dt = ‘${bizdate}’) b1
JOIN (SELECT * FROM tmp_hot_list WHERE dt = ‘${bizdate}’) b2 – 热点数据清单
ON b1.dim_shop_id = b2.dim_shop_id
) a3
ON a1.dim_shop_id = a3.dim_shop_id
UNION ALL
– Step03:非热点数据处理,使用普通Join完成处理,两张表均需要进行Shuffle
SELECT /+MAPJOIN(a12)/
a11.trade_no AS trade_no
,a11.dim_shop_id AS shop_id
,a13.shop_name AS shop_name
,a13.shop_type AS shop_type
FROM (SELECT * FROM main_table WHERE dt = ‘${bizdate}’) a11
– Step03-1:主表用ANTI JOIN关联热点表进行剔除
LEFT ANTI JOIN (SELECT * FROM tmp_hot_list WHERE dt = ‘${bizdate}’) a12
ON a11.dim_shop_id = a12.dim_shop_id
– Step03-2:非热点维度数据处理
LEFT OUTER JOIN (
SELECT /+MAPJOIN(b12)/ b11.*
FROM (SELECT * FROM dim_table_info WHERE dt = ‘${bizdate}’) b11
LEFT ANTI JOIN (SELECT * FROM tmp_hot_list WHERE dt = ‘${bizdate}’) b12
ON b11.dim_shop_id = b12.dim_shop_id
) a13
ON a11.dim_shop_id = a13.dim_shop_id
;
-
整个步骤稍有些复杂,这里也可以直接用平台的skewjoin参数完成倾斜处理,skew的核心思路就是上面提到的热点数据单独处理,只是做了平台级别的集成,方便用户一键解决数据倾斜问题。详细用法和详细原理可参考 《阿里云-SKEWJOIN HINT》[1] 。
INSERT OVERWRITE TABLE final_result_table PARTITION (dt = '${bizdate}')
SELECT /*+SKEWJOIN(a1)*/
a1.trade_no AS trade_no
,a1.dim_shop_id AS shop_id
,a2.shop_name AS shop_name
,a2.shop_type AS shop_type
FROM (SELECT * FROM main_table WHERE dt = '${bizdate}') a1
LEFT JOIN (SELECT * FROM dim_table_info WHERE dt = '${bizdate}') a2
ON a1.dim_shop_id = a2.dim_shop_id
;
2.5.方案总结
-
不难发现,上面几种方案核心都是在围绕解决数据重分发(即shuffle)导致的热点问题,一种是想方设法采用Mapjoin的方式避免热点数据重分发,一种是让数据重分发过程中尽可能得均匀。
-
不管是哪种思路,问题核心都还是在解决shuffle导致的数据分布不均匀的问题。所以,一切的“罪魁祸首”就是 shuffle 、 shuffle 、 shuffle ~
三、一种新的思路 WithDistmapjoin~
3.1.核心思路
3.2.代码实现
WITH -- STEP01:热点Key采集 tmp_hot_pid AS ( SELECT dim_shop_id,'Y' AS is_hot FROM main_table_detail WHERE dt = '${bizdate}' GROUP BY dim_shop_id HAVING COUNT(1) > 100000 ) -- STEP02:维表热点数据打标 ,tmp_dim_tbl AS ( SELECT /*+MAPJOIN(hot)*/ dim.* ,COALESCE(hot.is_hot,'N') AS is_hot FROM ( SELECT * FROM dim_table_info WHERE dt = '${bizdate}' ) dim LEFT OUTER JOIN tmp_hot_pid hot ON dim.dim_shop_id = hot.dim_shop_id ) -- STEP03:明细热点数据打标 ,tmp_dwd_tbl AS ( SELECT /*+MAPJOIN(hot)*/ base.* ,COALESCE(hot.is_hot,'N') AS is_hot FROM ( SELECT * FROM main_table_detail WHERE dt = '${bizdate}' ) base LEFT OUTER JOIN tmp_hot_pid hot ON base.dim_shop_id = hot.dim_shop_id )
– STEP04:数据合并处理,热点数据用Mapjoin,非热点数据用DISTMAPJOIN
INSERT OVERWRITE TABLE final_result_table PARTITION (dt = ‘${bizdate}’)
SELECT *
FROM (
– STEP04-1:非热点数据用DISTMAPJOIN
SELECT /+ DISTMAPJOIN(dim(shard_count=77)) /
dwd_tbl.trade_no AS trade_no
,dwd_tbl.trade_date AS trade_date
,dwd_tbl.shop_id AS shop_id
,dim.shop_name AS shop_name
,dim.shop_type AS shop_type
FROM (SELECT * FROM tmp_dwd_tbl WHERE is_hot = ‘N’) dwd_tbl
LEFT OUTER JOIN (SELECT * FROM tmp_dim_tbl WHERE is_hot = ‘N’) dim
ON dwd_tbl.partner_id = dim.partner_id
UNION ALL
– STEP04-1:热点数据用Mapjoin
SELECT /+MAPJOIN(dim)/
dwd_tbl.trade_no AS trade_no
,dwd_tbl.trade_date AS trade_date
,dwd_tbl.shop_id AS shop_id
,dim.shop_name AS shop_name
,dim.shop_type AS shop_type
FROM (SELECT *FROM tmp_dwd_tbl WHERE is_hot = ‘Y’) dwd_tbl
LEFT OUTER JOIN (SELECT *FROM tmp_dim_tbl WHERE is_hot = ‘Y’) dim
ON dwd_tbl.partner_id = dim.partner_id
) base
;
3.3.真实效果
四、方案总结
参考链接:
[2]https://help.aliyun.com/zh/maxcompute/user-guide/distributed-mapjoin?spm=a2c4g.11186623.0.i1#concept-2197457




