一步一步完成 MIT-6.824-Lab1 : MapReduce 之三

2023-10-09 09:38

本文主要是介绍一步一步完成 MIT-6.824-Lab1 : MapReduce 之三,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一步一步完成 MIT-6.824-Lab1 : MapReduce 之三

GitHub代码仓库:Mit-6.824-Lab1-MapReduce

接上文
一步一步完成 MIT-6.824-Lab1 : MapReduce 之二

步骤四

目标

定期的,这些内存中的中间值会经过一个用户自定义的 Partition 分割函数,分成 N 份,(即 reduce task 的数量)。然后写到本地的磁盘中。这些文件的存放位置需要发送给 Master, 以保证能够被正确找到,进行 reduce 任务。

我们的实现

中间值经过 Partition 函数,分成N份

紧接着步骤三,此处,我们介绍步骤三里提到的 worker 处理 map 任务需要用到的 Partition 函数:

// Partition : divide intermedia keyvalue pairs into nReduce buckets
func Partition(kva []KeyValue, nReduce int) [][]KeyValue {kvas := make([][]KeyValue,nReduce)for _,kv := range kva {v := ihash(kv.Key) % nReducekvas[v] = append(kvas[v], kv)}return kvas
}

参数是中间值和 reduce 任务的数量。将中间值的 key 通过 hash 函数来分割,分成N份。对应到 n 个 reduce 任务。

将中间值写入到本地磁盘。

官网上提到,建议使用json格式存放中间值,文件名称也有规定。在用户处理 map 任务的函数中,通过 WriteToJSONFile 函数将分割后的中间值写入到本地磁盘:

// WriteToJSONFile : write intermediate KeyValue pairs to a Json file
func WriteToJSONFile(intermediate []KeyValue, mapTaskNum, reduceTaskNUm int) string {filename := "mr-"+strconv.Itoa(mapTaskNum)+"-"+strconv.Itoa(reduceTaskNUm)jfile, _ := os.Create(filename)enc := json.NewEncoder(jfile)for _, kv := range intermediate {err := enc.Encode(&kv)if(err != nil) {log.Fatal("error: ",err)}}return filename
}

参数1是某个分割的中间值的某一项,也就是用于某一个 reduce 任务执行的一项数据。参数2代表本 map 任务的编号。这个编号是由 master 分配的。参数3是代表这一项分割后的数据,适用于的 reduce 任务的编号。这样,在函数内部生产文件,将内容写入文件,返回中间文件名。此处,我们使用的文件名代表文件的 location ,因为本实验的 worker 都是在一台计算机上运行的。

将中间文件location发送给master

通过使用我们之前写好的 SendInterFiles 的调用,调用 master 提供的 RPC 接口,消息类型就为 MsgForInterFileLoc。将中间文件名,该中间文件适用的 reduce 任务的编号发送给 master。

通知master,该任务完成

最后,worker在完成任务后,需要发送消息给 mster 以告知我以完成本任务。这样,master 方便修改任务的状态为 Finished 。

完成map过程

这样,配合我们之前完成的代码,我们即可完成 worker 的 map 部分。

步骤五

目标

当一个 worker 被分配了 reduce 任务后,通过远程程序调用,读取 map worker 存放在其本地的中间文件。当读取了所有的中间值后,reduce worker 对中间值按照键值对的 key 进行排序。 如果中间值太大以至于内存容纳不下,那么,一个可能就需要一个外部的排序。

我们的实现

这里,我们开始完成 worker 的 reduce 部分。
上面的步骤,已经完成了 map 部分的处理。然后,我们在 master 上生产任务时的设计是,只有所有的 map 任务执行完成后,才会开始生产 reduce 任务。我们这个设计也正是我们所需要的。
接下来就是对 reduce 的处理:

// reduceInWroker : workers do the reduce phase
func reduceInWorker(reply *MyReply, reducef func(string, []string) string) {intermediate := []KeyValue{}// read intermediate key/value pairsfor _,v := range reply.ReduceFileList {file, err := os.Open(v)defer file.Close()if err != nil {log.Fatalf("cannot open %v", v)}dec := json.NewDecoder(file)for {var kv KeyValueif err := dec.Decode(&kv); err != nil {break}intermediate = append(intermediate, kv)}}// sort valuesort.Sort(ByKey(intermediate))oname := "mr-out-"+strconv.Itoa(reply.ReduceNumAllocated)ofile, _ := os.Create(oname)i := 0// reducefor i < len(intermediate) {j := i+1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, intermediate[k].Value)}output := reducef(intermediate[i].Key, values)fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)i = j}_ = CallForTask(MsgForFinishReduce, strconv.Itoa(reply.ReduceNumAllocated))
}

如代码所示, worker 读取所有发送过来的中间文件的内容,然后通过一个 Sort 函数完成排序。(我们需要提供相应的方法,才能完成排序):

// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

Sort结束后,便是开始 reduce 工作。

步骤六

目标

reduce worker 重复迭代排序后的中间值。reduce worker接下来对中间值经过用户自定义的 Reduce 来处理。得到一个 output 文件。

我们的实现

Sort结束后,迭代中间值,对同一 key 的中间值进行 reduce 统计 value , 因为我们已经 Sort 过了, 所以, 相同 key 的中间值对自然是在一块的。
这样完成 worker 的 reduce 部分, 然后写入到最终文件中。 文件名在官网上也有规定。
上面的代码以达到目的, 对 intermediate 的 for 循环, 迭代中间值,并进行 reduce 。然后写入到输出文件:

// WriteToReduceOutput : write to final file
func WriteToReduceOutput(key, values string, nReduce int) {filename := "mr-out-"+strconv.Itoa(nReduce)ofile, err := os.Open(filename)if err != nil {fmt.Println("no such file")ofile, _ = os.Create(filename)}fmt.Fprintf(ofile, "%v %v\n", key, values)
}

写入完成后, worker 向 master 发送 MsgForFinishReduce 类型的 Reduce 任务完成的消息。

步骤七

目标

当所有的 Map 和 Reduce 完成后,程序正常退出。

我们的实现

到现在,我们已经基本完成了大部分工作,接下来,我们完善其他要求的细节

worker 一直运行

我们的 worker 不能在执行了一个任务后就退出,而是要完成任务后,接着请求下一个任务。直到 master 因为所有任务完成而退出,worker 才退出:

// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.for(true) {reply := CallForTask(MsgForTask,"")if(reply.TaskType == "") {break}switch(reply.TaskType) {case "map":mapInWorker(&reply, mapf)	case "reduce":reduceInWorker(&reply, reducef)}}
}

如上述所示,我们一直发送 MsgForTask 类型的请求,如果返回回来的reply没有内容,我们可以判断 master 已经推出,此时, worker 也可以推出。

master 退出的时机

通过观察 mrmaster.go, 我们知道, 通过监视 master 的 Done 方法,判断是否运行结束。所以,我们需要在所有任务运行结束后,让 Done 返回 true:

// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {ret := false// Your code here.ret = m.ReduceFinishedreturn ret
}

步骤八

目标

完成 crash 的处理

我们的实现

要完成实验,我们还需要完成对 crash 情况的处理。

本实验在测试的时候,会让 worker 在 map 或 reduce 的时候随机退出, 我们要做的是,worker 退出后,它的任务就被当作未完成, master 需要将这些任务重新分配,保证任务的完成。

这里,就需要用到我之前提到的 timerForWorker 函数了。 按照我们 MyCallHandler 里的设计,我们在每分配一个任务后,即开始一个计时进程, 按照要求,以10秒为界限,监视任务是否被完成:

// TimerForWorker : monitor the worker
func (m *Master)timerForWorker(taskType, identify string) {ticker := time.NewTicker(10 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:if taskType == "map" {m.RWLock.Lock()m.AllFilesName[identify] = UnAllocatedm.RWLock.Unlock()// 重新让任务加入channelmaptasks <- identify} else if taskType == "reduce" {index, _ := strconv.Atoi(identify)m.RWLock.Lock()m.ReduceTaskStatus[index] = UnAllocatedm.RWLock.Unlock()// 重新将任务加入channelreducetasks <- index}returndefault:if taskType == "map" {m.RWLock.RLock()if m.AllFilesName[identify] == Finished {m.RWLock.RUnlock()return} else {m.RWLock.RUnlock()}} else if taskType == "reduce" {index, _ := strconv.Atoi(identify)m.RWLock.RLock()if m.ReduceTaskStatus[index] == Finished {m.RWLock.RUnlock()return} else {m.RWLock.RUnlock()}}}}
}

通过持续监听任务,如果一旦任务失败,超时,那么则将本任务重新加回到任务队列的 channel 里。供其他的 worker 调用。

到此为止

到现在,我们的代码已经基本完成, 可以通过所有的 Tests 。
works on ubuntu

这篇关于一步一步完成 MIT-6.824-Lab1 : MapReduce 之三的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/171939

相关文章

SpringBoot使用OkHttp完成高效网络请求详解

《SpringBoot使用OkHttp完成高效网络请求详解》OkHttp是一个高效的HTTP客户端,支持同步和异步请求,且具备自动处理cookie、缓存和连接池等高级功能,下面我们来看看SpringB... 目录一、OkHttp 简介二、在 Spring Boot 中集成 OkHttp三、封装 OkHttp

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

python安装完成后可以进行的后续步骤和注意事项小结

《python安装完成后可以进行的后续步骤和注意事项小结》本文详细介绍了安装Python3后的后续步骤,包括验证安装、配置环境、安装包、创建和运行脚本,以及使用虚拟环境,还强调了注意事项,如系统更新、... 目录验证安装配置环境(可选)安装python包创建和运行Python脚本虚拟环境(可选)注意事项安装

Java 后端接口入参 - 联合前端VUE 使用AES完成入参出参加密解密

加密效果: 解密后的数据就是正常数据: 后端:使用的是spring-cloud框架,在gateway模块进行操作 <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.0-jre</version></dependency> 编写一个AES加密

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi

2024年高教社杯数学建模国赛最后一步——结果检验-事关最终奖项

2024年国赛已经来到了最后一天,有必要去给大家讲解一下,我们不需要过多的去关注模型的结果,因为模型的结果的分值设定项最多不到20分。但是如果大家真的非常关注的话,那有必要给大家讲解一下论文结果相关的问题。很多的论文,上至国赛优秀论文下至不获奖的论文并不是所有的论文都可以进行完整的复现求解,大部分数模论文都为存在一个灰色地带。         白色地带即认为所有的代码均可运行、公开

如何完成本科毕业论文设计

完成本科毕业论文设计是一个系统性的工程,需要经过多个阶段的规划、执行和总结。以下是一个详细的步骤指南,帮助你顺利完成本科毕业论文设计。 ### 1. 选题与开题 - **选题**:选择一个有研究价值且你感兴趣的题目。与导师讨论,确保题目具有可行性和创新性。 - **开题报告**:撰写开题报告,包括研究背景、研究目的、研究内容、研究方法、预期成果等。 ### 2. 文献综述 - **文献检索**

LabVIEW环境中等待FPGA模块初始化完成

这个程序使用的是LabVIEW环境中的FPGA模块和I/O模块初始化功能,主要实现等待FAM(Field-Programmable Gate Array Module,FPGA模块)的初始化完成,并处理初始化过程中的错误。让我们逐步分析各部分的功能: 1. Wait for FAM Initialization框架 此程序框架用于等待I/O模块成功初始化。如果在5秒钟内模块没有完成配

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

MapReduce算法 – 反转排序(Order Inversion)

译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,但是翻译完整篇文章之后,我感觉到,将Order Inversion翻译成反序模式是不恰当的,根据本文的内容,很显然,Inversion并非是将顺序倒排的意思,而是如同Spring的IOC一样,表明的是一种控制权的反转。Spring将对象的实例化责任从业务代码反转给了框架,而在本文的模式中,在map