Worker不断请求Map任务,Coordinator将Map Task分发给Worker(一个原始输入文件对应一个Map Task)
a) Worker处理输入文件,Map函数输入为(filename string,content string),其中filename为输入文件名,content为该文件的内容,输出为KV数组;
b) 我们需要将相同Key的二元组聚集到一起,然后根据Lab1提供的ihash函数将ihash(Key)%ReduceN相同的二元组写到同一中间文件;其中ReduceN为Lab1设定的Reduce job数目。如果我们输入文件为M,那么总的中间文件数目应小于等于M*ReduceN
Worker不断请求Rudce任务,Coordinator将Reduce Task分发给Worker(一个ReduceID对应一个Reduce Job,这里的Job我理解的是Master一次性发给Worker的批量数据,在下次请求Job前,Work需要先把这次的数据处理完)
a) Worker处理ReduceId对应的所有文件,由于一个中间文件中可能有不同的Key,我们需要先聚集相同Key的二元组,然后分别给Reduce处理
b) Reduce完成后,写入最终文件即可
2.1 Worker端
我们的Worker会不断的向Master请求任务,Master会将自己的状态(进行到哪一步)同步给Worker,Worker根据自己的状态决定请求Map Task还是Reduce Task
func DoMap(reduceMax int, mapDone *bool, mapf func(string, string) []KeyValue) { //get map task reply := GetMapFileReply{} getMapFile(&reply, mapDone) //get content if reply.MaptaskNumber >= 0 { file, err := os.Open(reply.Filename) if err != nil { log.Fatalf("cannot open %v", reply.Filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", reply.Filename) } file.Close() //call application map function kva := mapf(reply.Filename, string(content)) sort.Sort(ByKey(kva)) i := 0 mapOutFileName := []string{} mapOutTmp2Final := make(map[string]string) mapOutFileContent := make(map[int][]MapOut) //split the content by key for i < len(kva) { j := i + 1 for j < len(kva) && kva[j].Key == kva[i].Key { j++ } mapout := MapOut{} for k := i; k < j; k++ { mapout.Value = append(mapout.Value, kva[k].Value) } mapout.Key = kva[i].Key reduceId := ihash(kva[i].Key) % reduceMax _, ok := mapOutFileContent[reduceId] if !ok { filename := "mr-" + strconv.Itoa(reply.MaptaskNumber) + "-" + strconv.Itoa(reduceId) //fmt.Println("file name is " + filename) mapOutFileName = append(mapOutFileName, filename) } mapOutFileContent[reduceId] = append(mapOutFileContent[reduceId], mapout) i = j } //generate the intermediate file for _, filename := range mapOutFileName { //get reduceId according to intermediate file name suffix := strings.Split(filename, "-") reduceId, _ := strconv.Atoi(suffix[2]) //file, _ = os.Create(filename) file, _ = ioutil.TempFile("", filename+"*") mapOutTmp2Final[file.Name()] = filename enc := json.NewEncoder(file) for _, content := range mapOutFileContent[reduceId] { err := enc.Encode(&content) if err != nil { fmt.Println("encode failed " + err.Error()) } } file.Close() } //send one map file done sendMapDone(reply.MaptaskNumber, reply.Filename, mapOutTmp2Final) } else if *mapDone == false && reply.MaptaskNumber < 0 { //wait for all map finish time.Sleep(1) } }
func DoReduce(reduceDone *bool, reducef func(string, []string) string) { reply := GetReduceFileReply{} //get reduce task getReduceFile(&reply, reduceDone) kva := []MapOut{} ReduceOutTmp2Final := make(map[string]string) if !*reduceDone && reply.ReduceId >= 0 { //create out file outFileName := "mr-out-" + strconv.Itoa(reply.ReduceId) outfile, _ := ioutil.TempFile("", outFileName+"*") ReduceOutTmp2Final[outfile.Name()] = outFileName defer outfile.Close() //read from intermediate file for _, filename := range reply.Filename { file, _ := os.Open(filename) defer file.Close() dec := json.NewDecoder(file) for { var kv MapOut if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } } sort.Sort(MapOutByKey(kva)) //split content by key i := 0 for i < len(kva) { j := i + 1 for j < len(kva) && kva[j].Key == kva[i].Key { j++ } intermediate := []string{} for k := i; k < j; k++ { intermediate = append(intermediate, kva[k].Value...) } reduceRes := reducef(kva[i].Key, intermediate) fmt.Fprintf(outfile, "%v %v\n", kva[i].Key, reduceRes) i = j } //call application reduce sendReduceDone(reply.ReduceId, ReduceOutTmp2Final) //wait for all reduce done } else if !*reduceDone && reply.ReduceId < 0 { time.Sleep(1) } }
2.2 Coordinator端
type Coordinator struct { //map task : not started / running / finished mapTaskState map[string]int //used for timeout check mapTaskTime map[string]int64 //idicated map task mapTaskNumber int //if in map step mapDone bool //intermediate map out file mapOutFileArray map[int][]string //if in reduce step reduceDone bool //reduce task : not started / running / finished reduceTaskState map[int]int //used for timeout check reduceTaskTime map[int]int64 //given by caller,indicated the reduce job nReduce int //task state lock taskStatLock sync.Mutex //step state lock taskDone sync.Mutex }
func (c *Coordinator) GetMapInFile(args *GetMapFileArgs, reply *GetMapFileReply) error { //get map file c.taskDone.Lock() if !c.mapDone { c.taskDone.Unlock() c.taskStatLock.Lock() for task, _ := range c.mapTaskState { if c.mapTaskState[task] == -1 { reply.Filename = task reply.MapDone = false reply.MaptaskNumber = c.mapTaskNumber c.mapTaskNumber++ c.mapTaskState[task] = 0 c.mapTaskTime[task] = time.Now().Unix() c.taskStatLock.Unlock() return nil } } reply.MapDone = false } else { reply.MapDone = true } reply.MaptaskNumber = -1 return nil }
在收到一个Map Task完成后,记录任务状态并检查是否所有任务完成能进入下一状态
func (c *Coordinator) MapSingleFileDone(args *MapDoneArgs, reply *MapDoneReply) error { //set this map task done c.taskStatLock.Lock() c.mapTaskState[args.Filename] = 1 c.taskStatLock.Unlock() //c.mapTaskDoneCollection = append(c.mapTaskDoneCollection, args.MaptaskNumber) //record reduceid <-> intermediate file name for tmpfile, filename := range args.MapOutTmp2Final { os.Rename(tmpfile, filename) suffix := strings.Split(filename, "-") reduceN, _ := strconv.Atoi(suffix[2]) c.mapOutFileArray[reduceN] = append(c.mapOutFileArray[reduceN], filename) } //test if all map done reply.Y = args.MaptaskNumber for _, i := range c.mapTaskState { if i != 1 { return nil } } //if all map done,set reduce task state c.taskStatLock.Lock() for key := range c.mapOutFileArray { c.reduceTaskState[key] = -1 } c.taskStatLock.Unlock() c.taskDone.Lock() c.mapDone = true c.taskDone.Unlock() return nil }
//创建临时文件,临时文件不可见 file, _ = ioutil.TempFile("", filename+"*") //创建 临时文件 到 最终文件名的映射 mapOutTmp2Final[file.Name()] = filename ... //修改为最终文件,因为前两步在Worker中进行,这一步在Master进行,所以使用map来缓存临时文件的文件路径 os.Rename(ReduceOutTmp2Final[tmpName], filename)
在go run时,使用-race来判断是否有竞态,及时加锁