一、数据倾斜产生的原因
spark job中绝大多数task执行得非常快,但个别task执行缓慢。或者原本线上运行的job是正常,但在某天由于特殊原因报出OOM的异常,观察发现是因为代码本身造成的。
一般来说,发生数据倾斜是在程序进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的task来进行处理,此时如果某个key的数据量远远大于其他的key,则会发生数据倾斜;
在程序中定位数据倾斜的代码块:倾斜只会发生在shuffle的过程中,比如:distinct,reduceByKey,groupByKey,join,cogroup,repartition等算子;
二、Spark中数据倾斜的解决方案
1)上游预处理:一般我们处理的数据存在hive里面,对于存在热key可以在hive里面进行预处理,但这种方法仍然在hive里面会引起数据倾斜,请看hive中数据倾斜优化;
2)过滤导致倾斜的key:如果热key只是少数几个,对于计算本身没有影响的话,可以选择过滤掉该key;
3)提高shuffle的并行度:在使用shuffle算子时,可以增加算子的并行度,reduceByKey(200), 200即为算子在执行时的reduce task的数量,在spark sql中可以设置参数:spark.sql.shuffle.partitions,改值默认是200;当然此方案只是能够缓解数据倾斜,但最终对于热key还是无法解决,热key仍然只会发送到一个task里面;
4)两阶段聚合:加盐,第一次局部聚合,给每个key打上一个随机数,对含有随机数key的数据进行reduceByKey操作,再将前缀随机数去掉,在进行全局聚合,这种方法只能用于聚合类算子;
5)map join
6)采样倾斜key并分拆join操作:sample算子