当 Shuffle 时出现了数据倾斜,我们一般的问题排查步骤
① 查看 WEB-UI
页面,查看各个 Job
的 Stage
中 Task
的执行情况,是否有明显执行时间过长的情况
② 如果任务报错,查看对应的日志异常堆栈信息,是否有内存溢出的情况
③ 抽样查看倾斜的 key
val result = rdd // withReplacement:表示抽出样本后是否在放回去,true表示会放回去,这也就意味着抽出的样本可能有重复 // fraction:抽取的数据比例 // seed:表示一个种子,根据这个seed随机抽取 // 抽取 50% 的数据 .sample(true, 0.5) .map((_, 1)) .reduceByKey(_ + _) // 查看倾斜 key 的 Top3 .take(3)
场景说明
导致数据倾斜的是 Hive
表,如果在 Hive
表中的数据本身分布不是均匀的(比如某个 key
对应的数据有 100w,其他的 key
对应的数据却只有几十条),此时我们需要通过 Spark
对数据做频繁的分析,那么就会导致数据倾斜的发生
方案说明
此时我们需要评估下,是否可以通过 Hive
做数据的预处理( 在 ETL 的时候或者预先和其他表 Join ),在接下来的 Spark 作业中,由于已经做过预处理了,我们就不需要使用原先的操作
方案的优缺点
① 优点
把 Spark 的倾斜转移到了 Hive 的 ETL 阶段
② 缺点
会导致 Hive 的 ETL 阶段发生倾斜,请查考 Hive SQL 优化
场景说明
少数的 key
导致了数据倾斜
方案说明
如果倾斜的 key 是无用的数据或者过滤掉倾斜的 key
对应的数据不影响结果,此时可以考虑直接将倾斜的 key 过滤掉
inputRdd.filter(_.equals("xxx"))
方案的优缺点
① 优点
直接避免了倾斜 key 的发生
② 缺点
这种场景比较少
场景说明
需要对倾斜的 key 做处理,此时提高并行度是优先的方案
方案说明
在执行 shuffle 算子时,直接对该算子增加并行度,该种设置的优先级最高。将原本执行倾斜 key 的 task 的数量增加,从而提高并行度,减少计算的时间
val result = rdd .map((_, 1)) // 将并行度提高至 500 .reduceByKey(_ + _,500)
方案的优缺点
① 优点
有效缓减和减轻数据倾斜的影响
② 缺点
没有彻底根除数据倾斜,只是缓解而已。可能会出现极端情况:无论怎样增加 task 的数量,最终倾斜的 key 仍然被分配到某个 task
一般结合其他方案使用
方案说明
局部聚合:先给每个 key 添加一个随机数,此时 key 就会发生变化
# 添加随机数前 (hello,1)(hello,1)(hello,1)(hello,1)(hello,1) # 添加随机数后 (1_hello,1)(1_hello,1)(2_hello,1)(2_hello,1)(3_hello,1) # 局部聚合后的结果,例如执行 reduceByKey 操作 (1_hello,2)(2_hello,2)(3_hello,1)
全局聚合:将随机值去掉,然后进行全局聚合操作
# 局部聚合后的结果,例如执行 reduceByKey 操作 (1_hello,2)(2_hello,2)(3_hello,1) # 全局聚合 (hello,5)
代码示例
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate() val sc: SparkContext = spark.sparkContext // 假设倾斜的 key 为 A val inputRdd: RDD[String] = sc.parallelize(Array( "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "A", "B", "B", "B", "B", "B", "B", "B", "B", "C", "D", "E", "F", "G", "B", "B", "B" )) val random = new Random(10) // 局部聚合 val mapRdd: RDD[(String, Int)] = inputRdd // 每个 key 增加随机值 .map(ele => (random.nextInt() + "_" + ele, 1)) // 聚合 .reduceByKey(_ + _, 1000) // 全局聚合 val resultRdd: RDD[(String, Int)] = mapRdd // 每个 key 去除随机值 .map(ele => (ele._1.split("_")(1), ele._2)) // 聚合 .reduceByKey(_ + _) println(resultRdd.collect().toBuffer) sc.stop() }
方案的优缺点
① 优点
对于聚合类的 shuffle ,可以直接解决数据倾斜,或者大幅度缓减数据倾斜的问题,将 Spark 的作业性能显著提升
② 缺点
适用的场景比较少,对于 Join
产生的 shuffle
数据倾斜无法解决
使用场景(大表关联小表)
在对 RDD 使用 Join 类操作,或者是在 Spark SQL 中使用 Join 语句时,其中的一个 RDD 或表的数据量比较小(比如几百 M 或者一两 G)
方案说明
不使用 Join 算子进行连接操作,使用 Broadcast 变量与 map 类算子实现 Join 操作,进而完全规避掉shuffle 类的操作,彻底避免数据倾斜的发生和出现
即:将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个Broadcast 变量,广播给其他 Executor 节点
代码示例
def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate() val sc: SparkContext = spark.sparkContext // 小表 val student: List[(String, String)] = List( ("1", "Kyle"), ("2", "Jack"), ("3", "Lucy"), ("4", "Amy") ) // 大表 val score: RDD[(String, Int)] = sc.parallelize(List( ("1", 90), ("2", 80), ("3", 65), ("4", 77), ("5", 68), ("6", 69), ("7", 57), ("8", 99) )) // 广播 val broadcast: Broadcast[List[(String, String)]] = sc.broadcast(student) val result: RDD[(String, Int)] = score .map { case (id, score) => var temp = "" // 获取广播中的数据,手动进行关联匹配 for ((k, v) <- broadcast.value) { if (k.equals(id)) { // 根据 id 获取学生的姓名 temp = v } } (temp, score) } // 过滤掉没有关联到的数据 .filter(_._1.nonEmpty) println(result.collect().toBuffer) sc.stop() }
方案优缺点
① 优点
对于 Join 操作导致的数据倾斜效果非常好,因为根本没有发生 shuffle ,所以也就没有发生数据倾斜
② 缺点
适用的场景比较少,只适用于一个大表和一个大表的情况。当我们将小表数据进行广播,Driver
和 Executer
都会保留一份小 RDD
的数据,如果数据过大则会出现 OOM
的情况。因此,改种情况不符合大表 Join
大表的情况。
方案说明
两个 RDD/Hive表
进行 Join 的时候,如果两方的数据量都比较大,那么可以检查两个 RDD/Hive表
中的 key
的分布情况
如果出现数据倾斜,是因为其中某一个 RDD/Hive表
中的少数几个 key 的数据量过大,而另一个 RDD/Hive表
中的所有 key 分布都比较均匀,此时采用该方案
实现思路
注意:rdd1 和 rdd2 的数据量都很大,但是 rdd1 是倾斜的,rdd2 是均匀的
① 对包含少量倾斜 key
的 RDD
进行采样,从而获得发生倾斜的 key
val skewedKey: Array[String] = inputRdd .sample(false, 0.5) .map((_, 1)) .reduceByKey(_ + _) .sortBy(_._2, false) // 按照 key 对应的数据量排序,并获取 Top3 的倾斜 key .take(3) .map(_._1)
② 从包含少量倾斜 key
的 RDD
中过滤出倾斜 key
的数据,形成独立的 RDD
val sc: SparkContext = spark.sparkContext // 包含倾斜 key 的 RDD,假设它的数据量很大 val rdd1: RDD[(String, String)] = sc.parallelize(List( ("aa", "Kyle"), ("bb", "Jack"), ("cc", "Lucy"), ("aa", "Amy") )) // 筛选出倾斜 key 对应的数据 val skewedRdd: RDD[(String, String)] = rdd1.filter(ele => skewedKey.contains(ele._1))
③ 从包含少量倾斜 key
的 RDD
中过滤出非倾斜 key
的数据,形成独立的 RDD
// 筛选出非倾斜 key 对应的数据 val notSkewedRdd: RDD[(String, String)] = rdd1.filter(ele => !skewedKey.contains(ele._1))
④ 从非倾斜的 RDD
中过滤出包含倾斜的 key
的数据,扩大 n倍
并形成独立的 RDD
// 筛选出倾斜 key 对应的数据并扩大 val expandRdd: RDD[(String, String)] = rdd2 .filter(ele => skewedKey.contains(ele._1)) .flatMap(ele => { import scala.collection.mutable.ListBuffer val temp: ListBuffer[(String, String)] = ListBuffer() // 将均匀 RDD 中对应的倾斜 key 的数据扩大 100 倍 for (i <- 1 to 100) { temp += (i + "_" + ele._1, ele._2) } temp })
⑤ 将倾斜的 rdd1 中的数据和扩大后的数据进行关联
val joinRdd1: RDD[(String, String)] = skewedRdd .map(ele => (random.nextInt(100) + "_" + ele._1, ele._2)) .join(expandRdd) .map(ele => (ele._2._1.split("_")(1), ele._2._1))
⑥ 将 rdd1 中非倾斜的数据和 rdd2 进行关联
val joinRdd2: RDD[(String, String)] = notSkewedRdd.join(rdd2)
⑦ 将两个 join 的结果进行合并
val result: RDD[(String, String)] = joinRdd1.union(joinRdd2)
方案优缺点
① 优点
针对少部分倾斜的 key 可以使用此方案
② 缺点
如果倾斜的 key 的数量非常多,那么不适用该方案
方案说明
两个 RDD/Hive表
进行 Join 的时候,如果两方的数据量都比较大,其中一方包含多个倾斜的 key
,每个倾斜的 key
都可能对应超过 1w+
的数据,此时我们就需要使用该方案
实现思路
① 将分布比较均匀的 RDD
扩大 n
倍
// 筛选出倾斜 key 对应的数据并扩大 val expandRdd: RDD[(String, String)] = rdd1 .flatMap(ele => { import scala.collection.mutable.ListBuffer val temp: ListBuffer[(String, String)] = ListBuffer() // 将均匀 RDD 中对应的倾斜 key 的数据扩大 100 倍 for (i <- 1 to 100) { temp += (i + "_" + ele._1, ele._2) } temp })
② 将发生倾斜的 RDD
扩大每条数据都打上随机值
val skewedRdd: RDD[(String, String)] = rdd2 .map(ele => (random.nextInt(100) + "_" + ele._1, ele._2))
③ 将两个 RDD
进行关联
val result: RDD[(String, String)] = expandRdd.union(skewedRdd)
方案优缺点
① 优点
效果提升显著
② 缺点
扩容后,对资源的消耗比较大