先进到defaultPartitioner里,self接收得是父RDD。
这是defaultPartitioner函数:
这一段代码的主要逻辑是分区器的选取问题,是用上游RDD的分区器还是新建一个分区器。
评判标准就是:
如果现有的最大分区器是合格的,或者其分区数大于或等于默认分区数,请使用现有分区器,关键就是看默认分区数的问题,下图就是代码逻辑。
那么如何找到最大分区器?
它会调用hasMaxPartitioner函数,先用hasPartitioner在上游的RDD集中过滤出分区数大于0的分区,然后在这里面找出最大分区数的分区器作为当前分区器。
那么默认的分区数(也就是并行度)哪来?
调用defaultNumPartitions函数,去判断父RDD是否有默认分区数。如果有就直接拿过来,如果没有就拿之前找到的最大分区器的分区数。
现在分区器怎么选的问题解决了,我们再来看看新建分区,new HashPartitioner 的问题。
分区器决定的是在shuffle的时候上游的数据会被下游的哪个task拉取
getPartition函数决定数据会被拉到哪个分区。如果key是null的,那么就直接甩到0号分区,其他的根据key的hashcode值/下游的分区数,具体决定它会被丢到哪个分区。
下图就是计算分区函数
那么我们能不能自定义一个分区器呢,我们进到本文首部的groupByKey
函数中去看一下
下图是代码逻辑
首先声明一下groupbyKey不是在map端进行全局聚合,map端只是各自分区里面分组来局部聚合。
其中主要就是三个函数
createCombiner:新建一个CompactBuffer集合(类似于ArrayBuffer)它把同一个分组的第一个value塞进去
mergeValue:把同一个分组内剩余的其他value一个个的塞进CompactBuffer
mergeCombiners:进行全局分组,把上游多个分区的相同value的分组的CompactBuffer集合加起来
可以看到combineByKeyWithClassTag函数把createCombiner,mergeValue,mergeCombiners,分区器,mapSideCombine不在map端全局聚合,这五个全丢进去了。
进到函数里面,其实就是新建了一个shuffledRdd 来把那三个函数塞进去。
让我们最后看一下groupbykey的逻辑执行流程
groupbukey也是一个算子针对的也是task对应的分区,上游有四个分区,每个分区有各自的数据,比如在Map端的0号分区,(只列出spark)groupbykey会针对它进行局部分组形成 (spark,[1,1])
在一号分区会形成(spark,[1,1]),然后shuffule,下游task来拉取,spark这个key被hashcode计算应该被丢到一号分区,所以shuffle端一号分区的task拉取这俩个分区的数据,进行全局分组在它这边聚合成(spark,[1,1,1,1])(图上写错了,应该是4个)