以下说明各种Id存在的必要性以及设计方法
MapTaskId:MapTask随着程序的运行,其状态也会相应变化(未分配、已分配、已完成),MapTaskId可以标识MapTask,同时,还可以为临时文件、中间文件、输出文件的命名提供方便。由于实验内容比较特殊(每个单独的文件作为一个task),可以将输入文件名的数组下标作为MaskId。如有8个输入文件,MapTaskId为0~7(含)
ReduceTaskId/Bucket_num:用于标识ReduceTask,同时,还可以为输出文件的命名提供方便。如有10个桶,ReduceTaskId为0~9(含)
以上两个可以合并为TaskId,并通过TaskKind加以区分
WorkerId:用于标识worker,同时,还可以为临时文件的命名提供方便,在我的实现中,临时文件的命名方式为tmp-${workerId}-${MapTaskId}-${bucket_num}-*.txt
.WorkerId在worker第一次询问任务时由coordinator生成,并由worker保留。WorkerId不能是pid,因为在分布式系统中,多台机器上的worker可能有相同的pid
以后每次worker使用RPC来调用coordinator中的函数时,都应该出示自己的身份信息:自己是谁(WorkerId)、自己的task是什么(TaskId),coordinator根据worker的身份信息以及自己维护的一些统计信息来决定下一步怎么做
type Coordinator struct { // 0表示未分配任务 // 1表示已分配任务但没有收到成功的回复 // 2表示已完成 // input_files map[string]int // 存输入文件以及完成情况 input_files []string // 存输入文件 complete_condition []int // MapTask完成情况 workerId_generator int // workerId生成器 map_MtaskId_WId map[int]int // Map task与workerId之间的对应关系 map_RtaskId_WId map[int]int // Reduce task与workerId之间的对应关系 bucket []int // bucket以及完成情况 nReduce int // 桶的数量 map_done bool // map工作全部完成 reduce_done bool // reduce工作全部完成 mux sync.Mutex // 这里直接用一把大锁 }
对一些字段做简单说明:
map_MtaskId_WId
:coordinator任务分配后,将MapTaskId和WorkerId建立联系,方便验收
map_RtaskId_WId
:建立ReduceTaskId和WorkerId的联系
只有当map_done为真时,才能开始reduce工作;当reduce_done为真,说明MapReduce job完成
worker执行的流程如下:
coordinator中的函数原型为:
func (c *Coordinator) AskForTask(args *AskForTaskArgs, reply *AskForTaskReply) error
由worker通过RPC调用,相关的数据结构为:
// rpc.go type AskForTaskArgs struct { WorkerId int } type AskForTaskReply struct { WorkerId int // 分配的workerId Task_kind int // task种类,0表示无任务、1表示Map、2表示Reduce Map_filename string // 需要执行Map的文件名,只在Task_kind=1时有效 MapTaskId int // Map task编号,只在Task_kind=1时有效 NReduce int // 桶的数量,只在Task_kind=1时有效 File_num int // 输入文件的数量,只在Task_kind=2时有效 Bucket_num int // 需要执行Reduce的桶号,只在Task_kind=2时有效 }
函数体中根据当前还剩下什么任务来分配,任务分配后,对coordinator中的相应字段做一些记录和修改,同时修改reply使worker知道自己的任务是什么
worker中函数原型为:
func Execute(task *AskForTaskReply, mapf func(string, string) []KeyValue, reducef func(string, []string) string)
这个阶段MapTask需要生成临时文件(temporary files)而不是中间文件(intermediate files),原因是:假设worker1(workerId=1)领取了task1(taskId=1),在函数返回之前,需要开启一个goroutine,监测worker1是否在规定时间内完成任务。如果没有完成,需要将这个任务分配给其他worker,例如worker2(workerId=2)。但此时worker1可能仍在运行,并且可能最终会完成它的任务。这就出现了一个问题:worker1和worker2都产生了若干份中间文件,并且两组文件的文件名相同(因为中间文件的命名方式是mr-X-Y,其中X表示Map task编号,Y表示Reduce task编号)理论上我们需要的worker2产生的中间文件,但是有可能被worker1重新写入。为了避免这种情况,可以使用临时文件,worker首先将内容写入临时文件,一旦某个worker完成了工作,先让coordinator验证,如果workerId与taskId对应,则采纳,并将临时文件重命名为标准格式;如果workerId与taskId不对应,则说明是废弃的worker进行了提交,忽略之。
ReduceTask也需要临时文件,原因同上。
coordinator中的函数原型为:
func (c *Coordinator) CommitTask(args *CommitTaskArgs, reply *CommitTaskReply) error
由worker通过RPC调用,相关的数据结构为:
type CommitTaskArgs struct { Task_kind int WorkerId int TaskId int } type CommitTaskReply struct { Ok bool // 验证是否通过 }
coordinator对TaskId和WorkerId进行验证,并返回验证结果
worker收到验证的结果后,如果结果是验证失败,则跳出循环,重新申请任务(在此之前,这里最好还要删除自己产生的所有临时文件);如果验证成功,则将原来的临时文件重命名为标准格式的中间文件(对于MapTask来说)或输出文件(对于ReduceTask来说)
完成任务:通过RPC来使coordinator知道任务已完成,并通知其修改相应的数据(如complete_condition、bucket等),同时做一些扩展工作,比如查看map task是否全部完成或reduce task是否全部完成
询问job已完成:查看reduce_done
比较重要的一点是对woker的监测。可以这么处理:在为一个worker分配完任务后,开启一个goroutine,该goroutine的作用是sleep十秒,然后查看该worker的任务的完成情况,如果完成了,goroutine返回,无事发生;如果没有完成,不管是什么原因,将该worker视为dead,并将原本分配给该worker的task状态重新修改为“未分配”,并取消该workerId与taskId的联系。
go func(taskId int, workerId int, c *Coordinator) { time.Sleep(time.Second * 10) c.mux.Lock() if c.complete_condition[taskId] != 2 { c.complete_condition[taskId] = 0 // 改为未分配 delete(c.map_MtaskId_WId, taskId) // 取消映射 fmt.Printf("worker %v failed! map_MtaskId_WId condition: %v\n", workerId, c.map_MtaskId_WId) } c.mux.Unlock() }(reply.MapTaskId, reply.WorkerId, c)
留给worker执行任务的时间是10秒,不要把这个时间改得太小(如1秒),否则worker可能永远无法在规定时间内完成任务。经检验,以我自己的电脑来跑,pgmetamorphosis.txt
和pg-sherlock_holmes.txt
两个文件是无法在1秒内跑完的(因为这两个文件最大,达到一万多行)。这样就会一直超时,一直重新分配任务,如此循环
临时文件和中间文件生成的目录。通过查看test-mr.sh
可以知道,工作目录是在mr-dmp下,并且在进行每个检查之前都会删除当前目录中的所有中间文件和输出文件。在代码中,最好将这些文件的生成位置设为当前目录,这样shell脚本会自动删除它们,从而不会对下一个检查造成影响