大家好,我是大圣,很高兴又和大家见面。
上篇文章,我们详细说了 Flink keyBy 算子的源码和设计理念,今天结合 keyBy 算子的底层给大家说一种解决 Flink 数据倾斜的方案。
话不多说,咱们现在就开始。
当我们使用 keyBy 算子指定 key 的时候,底层是用你指定的 key ,然后去计算这个key 所对应的 keyGroupId,然后再利用 keyGroupId 来计算这个 key 被分配到哪个并行子任务上面。
关键的就是下面这两行代码。
keyGroupId = MathUtils.murmurHash(keyHash) % maxParallelism; keyGroupId * parallelism / maxParallelism;
在这种分配机制的情况下,如果某个 key 的数据量特别大的话,可能就会出现数据倾斜。
keyBy 是 Flink 中的数据分区操作,它根据 key 把数据分到不同的子任务。但当某 key 数据过多,它的子任务会处理更大负担,导致性能下降,而其他子任务可能闲置。这种不平衡导致整体效率降低,这样就会产生数据倾斜,一句话总结:旱的旱死,涝的涝死。
如下图所示:
设想一个超市的收银区,有多个收银台。为了加快结账速度,超市规定,根据顾客购买的商品种类(如:蔬菜、水果、零食等)来决定他们应到哪个收银台结账。比如,买蔬菜的顾客去第一个收银台,买水果的顾客去第二个收银台,以此类推。
然而,某一天,由于某种促销活动,购买零食的顾客数量急剧增加,这导致负责零食的收银台排起了长队,而其他的收银台则相对清闲。
如下图所示:
与 Flink 的映射:
在此例中,超市的每个收银台对应于 Flink 中的一个子任务。顾客购买的商品种类则代表数据的 key。当某个 key(如“零食”)的数据量突增时,对应的子任务(即处理零食的收银台)面临巨大的压力,而其他子任务(其他收银台)则轻松许多。这就是 Flink 中的数据倾斜问题。
既然多个并行子任务处理的数据量有很大的差异,我们手动干预让每个并行子任务处理的数据尽可能的均匀,这样应该就可以在一定程度上避免数据倾斜。
public class RebalanceKeyCreator { private int defaultParallelism; private int parallelism; public RebalanceKeyCreator(int parallelism) { this.parallelism = parallelism; } public Integer[] generateRebalanceKeys() { int maxParallel = KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism); int maxKey = parallelism * 12; Map<Integer, Integer> subIndexKeyMap = new HashMap<>(); for (int key = 0; key < maxKey; key++) { int subtaskIdx = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallel, parallelism); if (!subIndexKeyMap.containsKey(subtaskIdx)) { subIndexKeyMap.put(subtaskIdx, key); } } return subIndexKeyMap.values().toArray(new Integer[0]); } }
大家可以先看一下上面这段代码的 generateRebalanceKeys() 方法,比较晦涩难懂。
我先举个例子给大家说一下这段代码的设计理念,然后再给大家讲解这个代码怎么解决数据倾斜的。
如下图:
想象一个课堂里有5个小组,每个小组都需要选择一个代表来参加一个特别的活动。但是,选择代表的方式有些特殊:老师会随机叫出一个学生的名字,然后查看这个学生属于哪个小组。
如果这个小组还没有代表,那么这个学生就成为这个小组的代表。但是,如果这个小组已经有了代表,那么老师就继续随机叫名。
这个方法的目标是确保每个小组都有一个代表,而不是某个小组有多个代表。
记住上面这个例子,我下面把上面的代码和这个例子对应上来讲一遍。
课堂里的5个小组:
这可以对应于一个并行任务或“subtask”。
选择代表的特殊方式:这与如何为每个并行任务或subtask分配一个唯一的key相对应。
老师随机叫出学生的名字:这与在一个范围内随机选择一个key相对应。
查看这个学生属于哪个小组:这与确定给定的key应该分配给哪个并行任务或subtask相对应。
maxParallel: 计算可能的最大并行性。这是所有subtask的最大数量。
maxKey: 定义了key的最大范围,它是默认并行性与12的乘积。
subIndexKeyMap: 用于存储每个subtask及其对应的key。
循环遍历所有可能的key,从0到maxKey - 1。
subtaskIdx: 对于每个key,它使用KeyGroupRangeAssignment.assignKeyToParallelOperator()方法来确定key应该分配给哪个subtask。
使用一个条件判断来检查是否已为该subtask分配了一个key。如果没有,则将key添加到subIndexKeyMap中。
最后,代码返回subIndexKeyMap中所有的key值,确保每个subtask都有一个唯一的key。
在我们的课堂例子中,subIndexKeyMap确保每个小组(或subtask)只有一个代表(或key)。这与老师确保每个小组都有一个代表的方式相似。这段代码的主要目的是为每个并行任务或subtask分配一个唯一的key,确保均衡的key分配,从而实现均衡的数据分布。
大家记住一点,上面的代码是保证每个并行子任务里面都有key。
如果是传统的 keyBy 操作的话,如下例子:
假设你有一个流,其中包含一个用户ID作为键,并且我们有3个并行任务来处理它。如果用户活跃度差异很大,那么数据可能如下
UserIDs: 1, 2, 3, 1, 1, 3, 2, 1, 1, 3, 3, 3, ...
当你使用keyBy操作时,用户数据会根据其ID被发送到相应的并行任务。假设用户1非常活跃,而用户2则相对不活跃,你可能会得到以下的分配:
Task 1: 1, 1, 1, 1, 1, ... Task2 Task 3: 3, 3, 3, 3, 2,2...
可以看到,Task 1处理的数据量比其他任务多得多,因此可能会成为一个瓶颈。
而我们上面的代码,如下
Map<Integer, Integer> subIndexKeyMap = new HashMap<>(); for (int key = 0; key < maxKey; key++) { int subtaskIdx = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallel, parallelism); if (!subIndexKeyMap.containsKey(subtaskIdx)) { subIndexKeyMap.put(subtaskIdx, key); } }
现在,我们希望重新分配数据以避免上述情况。我们将从一系列随机键中选择一些代表性的键,这些键将代表我们的并行任务。在上述方法中,我们尝试为每个任务找到一个最佳的键。
首先,为了使事情变得简单,假设我们的maxKey是6(在真实场景中,这个值会更大,如parallelism * 12),那么我们的随机键集合可能是:[0, 1, 2, 3, 4, 5]。
我们试图为每个任务分配一个独特的键(这确保数据均匀地分布),例如:
Task 1: New Key 0 Task 2: New Key 2 Task 3: New Key 4
现在,当数据流入时,我们不是直接使用用户ID作为键,而是使用我们为每个任务分配的键。这样,即使某个用户非常活跃,我们也会将他们的数据均匀地分布在所有任务中。
不使用generateRebalanceKeys方法的keyBy可能会导致某些任务处理的数据量比其他任务多,因为数据是基于其原始键(如用户ID)进行分组的。
使用generateRebalanceKeys方法可以帮助我们重新分配数据,从而更均匀地在任务之间分配它,以避免某些任务过载。
大家切记一点generateRebalanceKeys方法里面的操作是和我们的真实的数据没有任何关系的,它就是定义了 int maxKey = parallelism * 12;
因为假如我们Flink 的并行度是 5 的话,我们的并行子任务 subtaskIdx 的值就是 0 ~ 4 , 然后从 0 到 maxKey 去循环,然后 maxKey 作为 key 去计算 maxKey 所在的并行子任务的 subtaskIdx 的值。
因为我们如果传入的是数据里面真实的key 的话,比如上面那个 传统的keyBy操作 例子中,
我们把 UserIDs: 1, 2, 3, 1, 1, 3, 2, 1, 1, 3, 3, 3, … 去计算的话,task2 对应的并行子任务的id,也就是我们程序里面的subtaskIdx 是没有值的,导致task2 没有数据处理。
然后把 subtaskIdx 作为 subIndexKeyMap 的key,value 就是 maxKey。当 subtaskIdx 计算过了,就 continue 跳过,计算下一个 subtaskIdx 的值,直到 subtaskIdx 中 0~ 4 中都有对应的 maxKey,这样保证每个并行度上面都有key 去计算。
这样后面执行 keyBy 算子的时候就不会像我们没有优化的keyBy 那样出现某个并行子任务上面没有数据。
为什么要 int maxKey = parallelism * 12; 乘以 12,这是为了循环更多次,找到更多模拟的 maxKey,就比如如果 int maxKey = parallelism 的话,如果 parallelism = 5,那就循环 5 次,可能还会出现 上面那个 传统的keyBy操作 例子中的情况。
这篇只讲了怎么让Flink 的每个并行子任务都有对应的 key ,那么这和我们真的的数据还没联系在一起。还有这种方法的优势和弊端,在什么场合下面使用等等,等下篇文章我们接着说。
要理解上面代码的设计理念和思想的话,我觉得还是有点难度的,我也理解了好久,才彻底理解。