一步一步完成 MIT-6.824-Lab1 : MapReduce 之二

2023-10-09 09:38

本文主要是介绍一步一步完成 MIT-6.824-Lab1 : MapReduce 之二,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一步一步完成 MIT-6.824-Lab1 : MapReduce 之二

GitHub代码仓库:Mit-6.824-Lab1-MapReduce

接上文
一步一步完成MIT-6.824-Lab1:MapReduce 之一

步骤三

目标

被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。

我们的实现

我们目前未完善的
// master.go
Master 的 Struct 还未完全完善
MyCallHandler() 还未完全完善
完善 worker 与 master 的通信数据格式

接下来我们在 rpc.go 完善 worker 与 master 之间的数据通讯格式。

首先,我们列举出可能的消息类型,比如:请求一个任务,完成了一个任务等等。

// rpc.go// Add your RPC definitions here.
const (MsgForTask = iota        // ask a taskMsgForInterFileLoc       // send intermediate files' location to masterMsgForFinishMap          // finish a map taskMsgForFinishReduce       // finish a reduce task
)

如上,类型的用途见注释。
接下来,定义 RPC 发送消息的 struct 和 接受回复的 struct。 按照我们的上面代码中的消息类型, 我们吧发送消息的 struct 写成两类, 一类单独用来传中间文件的位置给master。

// Add your RPC definitions here.type MyArgs struct {MessageType     intMessageCnt      string
}// send intermediate files' filename to master
type MyIntermediateFile struct {MessageType     intMessageCnt      stringNReduceType     int
}

MyIntermediateFile 专用于发送中间文件的位置到master。即消息类型为MsgForInterFileLoc的使用 MyIntermediateFile。其他消息类型使用 MyArgs。注意:其中NReduceType字段是值经过我们自定义的分割函数后,得到了分割后的intermediate 文件交由哪类 reduce 任务的编号。关于该partation函数后文介绍。
然后定义 reply 的struct:

type MyReply struct {Filename           string         // get a filenameMapNumAllocated    intNReduce            intReduceNumAllocated int TaskType           string         // refer a task type : "map" or "reduce"ReduceFileList     []string       // File list about 
}

所有 RPC 请求的 reply 均使用该类型的 reply。
struct 中包括: 被分配的任务的类型,map 任务的话,FIlename字段装载input文件名, reduce 任务的话, ReduceFileList 字段装载文件名的list。 MapNumAllocated 代表 map 任务被分配的任务的编号。 ReduceNumAllocated 代表 reduce 任务被分配的任务的编号。NReduce 字段代表总 reduce 任务数。

完善 master 结构体和 MyCallHandler

定义了 worker 与 master 之间的消息格式, 接下来,我们需要再完善一下 Master 结构体和 MyCallHandler。
我们注意到,我们的消息中包括中间值的存放位置,这也是 worker 发送给 master 的。所以,我们需要在 master 中对这些位置做记录:

// master.go   ---- Master struct
// Master struct
type Master struct {// Your definitions here.AllFilesName        map[string]intMapTaskNumCount     intNReduce             int               // n reduce task// InterFIlename       [][]string        // store location of intermediate filesMapFinished         boolReduceTaskStatus    map[int]int      // about reduce tasks' statusReduceFinished      bool              // Finish the reduce taskRWLock              *sync.RWMutex
}

我们在 MakeMaster 中初始化。

// master.go   ----- func MakeMaster()// Your code here
m.InterFIlename = make([][]string, m.NReduce)

然后,完善 MyCallHandler()。我们的消息类型还有 MsgForFinishMap; MsgForFinishReduce; MsgForInterFileLoc这些,我们也需要对这些进行处理:

// master.go    ----- func MyCallHandler()switch(msgType) {
case MsgForTask:select {case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename)return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}
case MsgForFinishMap:// finish a map taskm.RWLock.Lock()defer m.RWLock.Unlock()m.AllFilesName[args.MessageCnt] = Finished   // set status as finish
case MsgForFinishReduce:// finish a reduce task index, _ := strconv.Atoi(args.MessageCnt)m.RWLock.Lock()defer m.RWLock.Unlock()m.ReduceTaskStatus[index] = Finished        // set status as finish
}

如上, master 里判断 args里的消息类型。 如果是 MsgForTask 的话,就向 worker 传一个 task。该 task 也是由 master 生产的。很明显,在map执行完之前,reduce任务是不会执行的。这个从我们之前的代码 generateTask 中可以看出。
如果消息类型是 MsgForFinishMapMsgForFinishReduce 的话,将对应的 task 的状态设置为 Finished。
如果消息类型是 MsgForInterFileLoc 的话, 我们这里另外写一个函数,供 worker 调用,处理该消息类型:

// master.go   ----- func MyInnerFileHandler// MyInnerFileHandler : intermediate files' handler
func (m *Master) MyInnerFileHandler(args *MyIntermediateFile, reply *MyReply) error {nReduceNUm := args.NReduceType;filename := args.MessageCnt;// store themmm.InterFIlename[nReduceNUm] = append(m.InterFIlename[nReduceNUm], filename)return nil
}

通过读取参数中的 NReduceType 字段,获取该文件应该由哪个编号的 reduce 任务处理,存放在相应的位置。

另外,MyCallHandler 代码中的 go m.timerForWorker()后文再做介绍。

接下来完成 worker 的 map 部分

worker 从 master 那里获取到 map 任务后,即开始自己的 map 任务。
首先,关于 worker 向 master 请求任务的细节:
我们观察 /main 下的 mrworker.go, 按照官网方法运行一个 mrworker.go, 只会产生一个 worker。最开始, Worker 里的代码是这样:

// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.// uncomment to send the Example RPC to the master.//CallExample()// RPC callCallForTask()
}

很明显,这样的worker会在一次call后就退出,这样肯定不对,worker应该一直运行,向master获取任务,然后执行,然后再获取,直到所有任务执行完,或者master断开。Worker的完善我们将在后面的步骤给出
另外,我们需要写好我们自己定义的与 master 交流了的函数,即发送 MsgForTask, MsgForInnerFileLoc 等等。

// CallForTask : my RPC call function
func CallForTask(msgType int,msgCnt string) MyReply {args := MyArgs{}args.MessageType = msgTypeargs.MessageCnt = msgCntreply := MyReply{}// callres := call("Master.MyCallHandler", &args, &reply)if res {fmt.Printf("reply.type %v\n",reply.TaskType)} else {return MyReply{TaskType:""}}return reply
}// SendInterFiles : send intermediate files' location (filenames here) to master
func SendInterFiles(msgType int, msgCnt string, nReduceType int) MyReply {args := MyIntermediateFile{}args.MessageType = msgTypeargs.MessageCnt = msgCntargs.NReduceType = nReduceTyperepley := MyReply{}res := call("Master.MyInnerFileHandler", &args, &repley)if !res {fmt.Println("error sending intermediate files' location")}return repley
}

上述两个函数,即代表 worker 向 master 交流的函数。 CallForTask 是 MsgForTask,MsgForFinishMapMsgForFinishReduce使用。 SendInterFiles 是 MsgForInnerFileLoc使用。两个函数中的call函数原有代码已经给出了。 CallForTask 的对于请求任务的消息的reply中会包含任务相关的信息,对于告知master 任务完成的reply消息中不会有内容,而且我们此时不会用到这个reply。 SendInterFiles 的 reply 也是没有内容的,因为我们 master 不需要此处不需要 reply。
接下来就可以进行对任务的处理了。
我们这里先完成对 map 任务的处理:

// worker.go// mapInWorker : workers do the map phase
func mapInWorker(reply *MyReply,mapf func(string, string) []KeyValue) {file, err := os.Open(reply.Filename)defer file.Close()if err != nil {log.Fatalf("cannot open %v", reply.Filename)}content, err := ioutil.ReadAll(file)if err != nil {log.Fatalf("cannot read %v", reply.Filename)}// map function, get intermediate keyvalue pairskva := mapf(reply.Filename, string(content))// partition function. finish the partition taskkvas := Partition(kva, reply.NReduce)// write to temp local filefor i := 0; i<reply.NReduce; i++ {filename := WriteToJSONFile(kvas[i], reply.MapNumAllocated, i)_ = SendInterFiles(MsgForInterFileLoc, filename, i)}_ = CallForTask(MsgForFinishMap, reply.Filename)
}

以上则是 map 任务的处理函数。读取文件内容, 然后执行事先制定好了的 plugin 中的 map 函数,生成中间值对。 此时,就需要用到我们的分割函数了,将中间值经过分割函数分割,得到用于不同 reduce 任务的中间值。然后写入到本地磁盘中,并将文件location 发送给 master .
关于分割函数,发送location 这些内容,我们接下来在步骤4讨论。

步骤4

一步一步完成MIT-6.824-Lab1:MapReduce 之三

这篇关于一步一步完成 MIT-6.824-Lab1 : MapReduce 之二的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/171937

相关文章

SpringBoot使用OkHttp完成高效网络请求详解

《SpringBoot使用OkHttp完成高效网络请求详解》OkHttp是一个高效的HTTP客户端,支持同步和异步请求,且具备自动处理cookie、缓存和连接池等高级功能,下面我们来看看SpringB... 目录一、OkHttp 简介二、在 Spring Boot 中集成 OkHttp三、封装 OkHttp

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

python安装完成后可以进行的后续步骤和注意事项小结

《python安装完成后可以进行的后续步骤和注意事项小结》本文详细介绍了安装Python3后的后续步骤,包括验证安装、配置环境、安装包、创建和运行脚本,以及使用虚拟环境,还强调了注意事项,如系统更新、... 目录验证安装配置环境(可选)安装python包创建和运行Python脚本虚拟环境(可选)注意事项安装

如何编写Linux PCIe设备驱动器 之二

如何编写Linux PCIe设备驱动器 之二 功能(capability)集功能(capability)APIs通过pci_bus_read_config完成功能存取功能APIs参数pos常量值PCI功能结构 PCI功能IDMSI功能电源功率管理功能 功能(capability)集 功能(capability)APIs int pcie_capability_read_wo

Java 后端接口入参 - 联合前端VUE 使用AES完成入参出参加密解密

加密效果: 解密后的数据就是正常数据: 后端:使用的是spring-cloud框架,在gateway模块进行操作 <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.0-jre</version></dependency> 编写一个AES加密

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi

2024年高教社杯数学建模国赛最后一步——结果检验-事关最终奖项

2024年国赛已经来到了最后一天,有必要去给大家讲解一下,我们不需要过多的去关注模型的结果,因为模型的结果的分值设定项最多不到20分。但是如果大家真的非常关注的话,那有必要给大家讲解一下论文结果相关的问题。很多的论文,上至国赛优秀论文下至不获奖的论文并不是所有的论文都可以进行完整的复现求解,大部分数模论文都为存在一个灰色地带。         白色地带即认为所有的代码均可运行、公开

Pr 入门系列之二:导入与管理素材(下)

◆  ◆  ◆ 管理素材 导入素材后,项目面板中每一个媒体都只是原始素材的“链接”。 所以,视频编辑过程中一般情况下都不会破坏原始素材。 1、在不同视图模式下组织素材 项目面板提供了三大视图 View供选用:列表视图、图标视图以及自由格式视图。 A. 锁定 B. 列表视图 C. 图标视图 D. 自由格式视图 E. 缩放滑块 F. 排序图标 G. 自动匹配序列 H. 查找 I. 新建素材箱 J.

如何完成本科毕业论文设计

完成本科毕业论文设计是一个系统性的工程,需要经过多个阶段的规划、执行和总结。以下是一个详细的步骤指南,帮助你顺利完成本科毕业论文设计。 ### 1. 选题与开题 - **选题**:选择一个有研究价值且你感兴趣的题目。与导师讨论,确保题目具有可行性和创新性。 - **开题报告**:撰写开题报告,包括研究背景、研究目的、研究内容、研究方法、预期成果等。 ### 2. 文献综述 - **文献检索**

LabVIEW环境中等待FPGA模块初始化完成

这个程序使用的是LabVIEW环境中的FPGA模块和I/O模块初始化功能,主要实现等待FAM(Field-Programmable Gate Array Module,FPGA模块)的初始化完成,并处理初始化过程中的错误。让我们逐步分析各部分的功能: 1. Wait for FAM Initialization框架 此程序框架用于等待I/O模块成功初始化。如果在5秒钟内模块没有完成配