说到mr的Shuffle,那么首先要明确shuffle属于哪个阶段。shuffle就是从Map端输出到Reduce输入的整个过程,这个过程广义上称之为shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示:
Map的shuffle过程包括了spill、collect、sort、和Marge
首先每个map任务不断地以键值对的形式,把数据输出到内存中构造的一个环形数据结构中,这样可以有效地使用内存空间,在内存中放置尽可能过得数据,之后把数据安排partition值和key进行升序排序,这里只是移动了索引数据,其次是spill线程,会为spill过程创建一个磁盘文件,只有会将partition的数据放到这个文件中,一直把所有的partition遍历完。如果我们map任务输出数据量太大的话,可能会进行好几次的spill,这个时候会把这些文件进行合并,也就是merge过程。
Reduce的shuffle过程包括了copy、merge sort
reduce任务通过http向各个map任务拖取所需要的数据,其中一项就是响应reduce拖取map数据,当有mapoutput的请求过来时,就会读取响应的map输出文件中对应的这个reduce部分的数据,输出给到reduce,之后的megre和map端的megre过程一样,因为map的输出数据已经是有序的,megre进行一次合并排序,当reduce输入文件已经定了,那么整个shuffle才最终结束
尽早尽量过滤数据,减少每个阶段的数据量
减少job数
解决数据倾斜问题
列剪裁
尽量不适用select * from 表名,而是根据我们所需要的字段去展示,例如我们的test表中有 a,b,c三个字段,我们只需要a与c的时候我们使用
select a,b from test -----建议使用 select * from test -----不建议使用
分区剪裁
在查询的过程中减少不必要的分区,即尽量指定分区
利用hive的优化机制减少job数
不论是什么关联,如果join的key相同,不管有多少表,都会合并为一个MapReducer任 务,所以在多表join时尽量用到相同的字段
善用union all
对于同一张表的union all要比多重的insert快的多,因为hive本身对这种union all做过优化,即可以只扫描一次源表
join前过滤掉不需要的数据
在我们进行join操作的时候可能会出现要join的字段为空或者说是无效,这样会产生倾斜问题,我们可以把空值的key变成一个字符串加上随机数,这样能把倾斜的数据分到不同的reduce上,解决数据倾斜问题
select * from A a left outer join B b on case when a.id is null then concat('随意字符串',rand()) else a.id=b.id
小表放前大表放后
在编写带有join的代码语句时,应该将条目少的表/子查询放在join操作符的前面
因为在Reduce阶段,位于join操作符左边的表会先被加载到内存,载入条目较少的表可以有效的防止内存溢出(OOM)。所以对于同一个key来说,对应的value值小的放前面,大的放后面
使用动态分区
更换hive底层引擎
hive的底层引擎是mr,我们可以将底层引擎换成tez或者是spark等,提升效率
数据倾斜对于分布式大数据系统来将是非常可怕的。
在理想的状态下,随着节点数量的增加,应用整体耗时线性下降。如果一台机器处理大量数据需要60分钟,那么当机器数量增加到三台时,最理想的耗时是20分钟。如果想要做到这些,就必须要保证每台机器的任务量相等,但很多时候任务的分配是不均匀的,这个就是分布式环境下最大的问题。那我们如何缓解解决数据倾斜呢?
1、过滤异常的数据
如果导致数据清洗的key是异常数据,那么简单的过滤掉就可以了。
首先要对key进行分析,判断是哪些key导致的数据倾斜,我们可以通过抽样统计kry的出现次数来验证。
def.select("key").smaple(false,0.1) //数据采样 .(k => (k,1)).reduceByKey(_ + _) //统计key出现的次数 .map(k => (k._2,k._1)).sortByKey(false) //根据key出现的次数进行排序 .take(N) //提取前N个
解决的方案则是对数据进行过滤即可
2、提高shuffle并行度
调整shuffle时的并行度,使得原本被分配到同一个Task的不同Key发配到不同的Task上处理,降低原Task所需处理的数据量,从而缓解数据倾斜造成的短板问题
3、自定义 Partitioner
使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task。
4、大表 key 加盐,小表扩大 N 倍 jion
如果出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。