C/C++教程

03_MapReduce框架原理_3.11 MapReduce 内核源码解析

本文主要是介绍03_MapReduce框架原理_3.11 MapReduce 内核源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1. MapTask 工作机制

  1.  流程图

      2.  流程说明

源码 2. 流程说明 1. Read阶段 2. Map阶段 3. Collect收集阶段 4. Split阶段(溢写阶段) 5. Merge阶段 MapTask 通过 InputFormat 获取 RecodeReader 并从 输入的切片对象(InputSplit)中 解析出一个个key,value 示例 HelloScala : key=1=行偏移量 value=HelloScala=行数据 将步骤1 中 读取到的key,value 交给 map()函数处理 ,并产生 新的key,value 将 map处理后的key,value 数据, 通过 OutputCollector.collect(key,value,partitioner) 写入到环形缓冲区 将key,value 通过Partitions的获取所在分区编号 1. 说明 当环形缓冲区满后,MapTask 会将数据写出到本地磁盘上,生成一个临时文件(溢写文件) 写出前,会将要写出的数据进行一次 排序、合并压缩等操作 2. 溢写详情 1. 对缓冲区要溢写的数据排序 排序算法 :快排 排序字段 : 先按 分区编号排序 再按 key排序 溢写文件 : 以分区为单位聚集,且同一分区所有数据按key排序 2. 将排序好的数据写出到临时文件 文件名称 : output/spillN.out(N 表示当前溢写次数) 合并 : 如果用户设置了 合并器(Combiner),则写入文件前 对每个分区中的数据再进行一次聚合操作 3. 将 记录分区信息 的元数据 存储到内存索引数据结构SplitRecord 分区元数据 : 1. 该分区在临时文件中的偏移量 2. 压缩前数据大小 3. 压缩后数据大小 如果内存索引大小超过1MB,将内存中会写到 output/spillN.out.index 中 1. 说明 当所有 数据处理完后,MapTask 对所有临时文件进行合并,确保只生成一个数据文件 2. 合并文件 数据文件 : 将 output/spillN.out(N 表示当前溢写次数) 合并成 一个大文件 output/file.out 索引文件 : 并生 output/file.out 的索引文件 output/file.out.index 3. 合并方式 分区为单位 进行合并, 对于某个分区,他将采取 多轮递归合并的方式 1. 每轮合并数 mapreduce.task.io.sort.factor(默认为10个文件) 2. 将合并后的文件 重新加入待合并列表中,对文件排序,重复合并 3. 生成最终的合并文件

      3.  输入&输出

      4.  MapTask 源码解析流程

MapTask 源码解析流程
=================== MapTask ===================
map方法
    context.write(text, intWritable) //自定义的 map 方法的写出,进入 
output.write(key, value);
MapTask 类

//MapTask727 行,收集方法,进入两次    (1. 进入分区函数,返回分区编号 2.进入收集方法)

collector.collect(key, value,partitioner.getPartition(key, value, partitions));

HashPartitioner(); //默认分区器



collect() //MapTask1082 行 map 端所有的 kv 全部写出后会走下面的 close 方法

close() //MapTask732 行



collector.flush() // 溢出刷写方法,MapTask735 行,提前打个断点,进入

sortAndSpill() //溢写排序,MapTask1505 行,进入

sorter.sort() QuickSort //溢写排序方法,MapTask1625 行,进入



mergeParts(); //合并文件,MapTask1527 行,进入

/tmp/hadoop/mapred/local/localRunner/jobcache/job_local386912288_0001/attempt_local386912288_0001_m_000000_0/output/

file.out

file.out.index
collector.close(); //MapTask739 行,收集器关闭,即将进入 ReduceTask

 

2. ReduceTask 工作机

  1. 流程图

       

 

      2. 流程说明

      3. reduceTask并行度(个数) 决定机制 

 

 4. ReduceTask 源码解析流程

ReduceTask 源码解析流程
=================== ReduceTask =================== 
if (isMapOrReduce()) //reduceTask324 行,提前打断点

initialize(job, getJobID(), reporter, useNewApi) // reduceTask333 行,进入

shuffleConsumerPlugin.init(shuffleContext); // reduceTask375 行,走到这需要先给下面的打断点

    totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl 第 120 行,提前打断点 
    merger = createMergeManager(context); //合并方法,Shuffle 第 80 行
    // MergeManagerImpl 第 232 235 行,提前打断点 
    this.inMemoryMerger = createInMemoryMerger(); //内存合并 
    this.onDiskMerger = new OnDiskMerger(this); //磁盘合并

rIter = shuffleConsumerPlugin.run(); // 第 377 行
    eventFetcher.start(); //开始抓取数据,Shuffle 第 107 行,提前打断点 
    eventFetcher.shutDown(); //抓取结束,Shuffle 第 141 行,提前打断点 
    copyPhase.complete(); //copy 阶段完成,Shuffle 第 151 行 
    taskStatus.setPhase(TaskStatus.Phase.SORT); //开始排序阶段,Shuffle 第 152 行

sortPhase.complete(); //排序阶段完成,即将进入 reduce 阶段 reduceTask382 行 

reduce();  //reduce 阶段调用的就是我们自定义的 reduce 方法,会被调用多次

cleanup(context); //reduce 完成之前,会最后调用一次 Reducer 里面的 cleanup 方法

 

 

 

 

 

 

 

这篇关于03_MapReduce框架原理_3.11 MapReduce 内核源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!