一步一步完成 MIT-6.824-Lab1 : MapReduce 之一

2023-10-09 09:38

本文主要是介绍一步一步完成 MIT-6.824-Lab1 : MapReduce 之一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一步一步完成 MIT-6.824-Lab1 : MapReduce 之一

GitHub代码仓库:Mit-6.824-Lab1-MapReduce

回顾上一篇博文中提到了 MapReduce 论文, 本次的 MIT 的 Lab1-MapReduce, 可以根据论文中提到的完成一个 MapReduce 系统的步骤来完成,此处大致列下步骤,当然,当然,没有列出细节部分。本次实验就根据这个步骤来一步一步的完成:

  1. MapReduce 库将用户成据的输入分割成 M 快,每块通常 16-64 MB。
  2. 存在一个 Master, 和多个 Worker。存在 M 个 map 任务和 N 个 reduce 任务等待分配。 Master 从worker中选择空闲的worker来完成任务。
  3. 被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。
  4. 定期的,这些内存中的中间值会经过一个用户自定义的 Pratition 分割函数,分成 N 份,(即 reduce task 的数量)。然后写到本地的磁盘中。这些文件的存放位置需要发送给 Master, 以保证能够被正确找到,进行 reduce 任务。
  5. 当一个 worker 被分配了 reduce 任务后,通过远程程序调用,读取 map worker 存放在其本地的中间文件。当读取了所有的中间值后,reduce worker 对中间值按照键值对的 key 进行排序。 如果中间值太大以至于内存容纳不下,那么,一个可能就需要一个外部的排序。
  6. reduce worker 重复迭代排序后的中间值。reduce worker接下来对中间值经过用户自定义的 Reduce 来处理。得到一个 output 文件。
  7. 当所有的 Map 和 Reduce 完成后,程序正常退出。

接下来,我们按照上述的7个步骤一步一步地完成 MIT-6.824 Lab1:MapReduce。
works on ubuntu

步骤一

目标

MapReduce 库将用户成据的输入分割成 M 快,每块通常 16-64 MB。

我们的实现

通过查看实验文件,我们可以的得知,本次 MapReduce 实验的待处理输入文件是 /main 下的一系列以 pg-*.txt。阅读官网上的实验要求:

The pg-*.txt arguments to mrmaster.go are the input files; each file corresponds to one "split", and is the input to one Map task.

每一个 pg-*.txt 都已经是一个 "split" 后的,传给单个 Map task 的输入。所以说,此时我们的目标已经达成。文件中一共有8个txt待处理,所以本实验,正常情况存在 8 个 Map 任务。

步骤2

目标

存在一个 Master, 和多个 Worker。存在 M 个 map 任务和 N 个 reduce 任务等待分配。 Master 从worker中选择空闲的worker来完成任务。

我们的实现

查看项目文件,在 /main 下存在 mrmaster.go ,该 go 文件即会创建一个 Master。阅读官网说明,运行 Master的方法是:

go run mrmaster.go pg-*.txt

可以看出,mrmaster.go 接受一系列 pg-*.txt 文件作为输入。查看 mrmaster.go 内部:

m := mr.MakeMaster(os.Args[1:], 10)

在查看 package mr 下的 MakeMaster :

// create a Master.
// main/mrmaster.go calls this function.
//
func MakeMaster(files []string, nReduce int) *Master {m := Master{}// Your code here.m.server()return &m
}
创建 Master

可以看出,我们运行一个 master 的时候,传递的两个参数,第一个:os.Arg[1:]表示split 了的input文件。 第二个 10 代表 nReduce ,也就是值 Reduce 任务的数量。所以,我们此时创建了一个 master,并要求 reduce 的任务数量为 10。
接下来,我们完成 Masrer 需要的数据。

//在 master.go 文件中,完善现在这个阶段 Master struct// Master struct
type Master struct {// Your definitions here.AllFilesName        map[string]int   // splited filesMapTaskNumCount     int              // curr map task numberNReduce             int              // n reduce taskMapFinished         boolReduceTaskStatus    map[int]int      // about reduce tasks' statusReduceFinished      boolRWLock              *sync.RWMutex             
}

以上是现在这个阶段 Master 需要的一些数据。 用一个 map[string]int 类型的数据来存放 input 文件。其中,map的 key 就代表 inuput 文件名,int类型的 value 代表目前这个 input 文件的状态。同样,我们使用一个 map[int]int 的数据来存放 reduce 任务。 int 类型的 key 代表 reduce 任务的编号, int 类型的 value 代表目前这个编号的 reduce 任务的状态。 另外用 int 类型的数据存放 reduce 任务数 和当前 Map 任务编号。用 bool 类型的数据存放是否完成整个 Map 和 Reduce。
目前这个Master的结构还不完善,我们在后续的步骤中继续补充。

接下来,关于Map 和 Reduce 任务的状态, 我们用 int 类型来表示
// master.go// tasks' status
// UnAllocated    ---->     UnAllocated to any worker
// Allocated      ---->     be allocated to a worker
// Finished       ---->     worker finish the map task
const (UnAllocated = iotaAllocatedFinished
)

接下来,在函数 MakeMaster 中完成初始化:

// master.go  -----func MakeMaster()// your code here
m.AllFilesName = make(map[string]int)
m.MapTaskNumCount = 0
m.NReduce = nReduce
m.MapFinished = false
m.ReduceFinished = false
m.ReduceTaskStatus = make(map[int]int)
m.RWLock = new(sync.RWMutex)
for _,v := range files {m.AllFilesName[v] = UnAllocated
}for i := 0; i<nReduce; i++ {m.ReduceTaskStatus[i] = UnAllocated
}

到此,我们完善了一个基本的 Master 的结构。

master 生成 Map 任务

接下来, 我们让 master 来成生 map 与 reduce 任务。
为了提高任务生产 worker 领取任务的效率,我们此时使用两个 channel 适用与 map与reduce 任务。并让一个函数与主进程并行执行,该进程用来生成任务。

// master.go// global
var maptasks chan string          // chan for map task
var reducetasks chan int          // chan for reduce task// generateTask : create tasks
func (m *Master) generateTask() {for k,v := range m.AllFilesName {if v == UnAllocated {maptasks <- k          // add task to channel}}ok := falsefor !ok {ok = checkAllMapTask(m)    // check if all map tasks have finished}m.MapFinished = truefor k,v := range m.ReduceTaskStatus {if v == UnAllocated {reducetasks <- k}}ok = falsefor !ok {ok = checkAllReduceTask(m)}m.ReduceFinished = true
}// checkAllMapTask : check if all map tasks are finished
func checkAllMapTask(m *Master) bool {m.RWLock.RLock()defer m.RWLock.RUnlock()for _,v := range m.AllFilesName {if v != Finished {return false}}return true
}func checkAllReduceTask(m *Master) bool {m.RWLock.RLock()defer m.RWLock.RUnlock()for _, v := range m.ReduceTaskStatus {if v != Finished {return false}}return true
}

以上则是 master 用来生成 map 和 reduce 任务的相关代码。将生成的任务,写入到相应的channel中去。worker 在向 master 请求任务的时候, master 从channel中获取到任务,发送给 worker , worker执行。代码中还会一直监视任务的状态,来判断是否任务都完成了。

当然,我们还要初始化这两个 channel, 以及注册 RPC 服务, 并行运行 generateTask():

// master.go   ---  func server()// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {// init channels maptasks = make(chan string, 5)reducetasks = make(chan int, 5)rpc.Register(m)rpc.HandleHTTP()// parallel run generateTask()go m.generateTask()//l, e := net.Listen("tcp", ":1234")sockname := masterSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}

到现在,我们已经成功将 master 完成了 RPC 的注册,并完成了任务生成进程的运行。

接下来,初步完善任务分配部分。

完成本步骤最后的要求,对 worker 寻求任务的要求作出响应。我们通过创建一个RPC handler 来实现。 worker call这个handler, 获取任务。

// master.go// MyCallHandler func
// Your code here -- RPC handlers for the worker to call.
func (m *Master) MyCallHandler(args *MyArgs, reply *MyReply) error {msgType := args.MessageType          // worker 发送的消息的类型switch(msgType) {case MsgForTask:select {case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename)  // 定时函数,后面步骤解释return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}return nil
}

本函数目前也不完善,后续步骤中一步一步完善函数内容。本 handler 通过处理我们自己定义的 worker 与 master 之间的消息类型, 作出不同的处理。 此处, master 检查 worker 发送的消息是否是 MsgForTask(这是一个int类型数据,后文会提到),然后向 worker 发送任务。

接下来是步骤三

一步一步完成MIT-6.824-Lab1:MapReduce 之二

这篇关于一步一步完成 MIT-6.824-Lab1 : MapReduce 之一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/171935

相关文章

SpringBoot使用OkHttp完成高效网络请求详解

《SpringBoot使用OkHttp完成高效网络请求详解》OkHttp是一个高效的HTTP客户端,支持同步和异步请求,且具备自动处理cookie、缓存和连接池等高级功能,下面我们来看看SpringB... 目录一、OkHttp 简介二、在 Spring Boot 中集成 OkHttp三、封装 OkHttp

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

python安装完成后可以进行的后续步骤和注意事项小结

《python安装完成后可以进行的后续步骤和注意事项小结》本文详细介绍了安装Python3后的后续步骤,包括验证安装、配置环境、安装包、创建和运行脚本,以及使用虚拟环境,还强调了注意事项,如系统更新、... 目录验证安装配置环境(可选)安装python包创建和运行Python脚本虚拟环境(可选)注意事项安装

Java 后端接口入参 - 联合前端VUE 使用AES完成入参出参加密解密

加密效果: 解密后的数据就是正常数据: 后端:使用的是spring-cloud框架,在gateway模块进行操作 <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.0-jre</version></dependency> 编写一个AES加密

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi

2024年高教社杯数学建模国赛最后一步——结果检验-事关最终奖项

2024年国赛已经来到了最后一天,有必要去给大家讲解一下,我们不需要过多的去关注模型的结果,因为模型的结果的分值设定项最多不到20分。但是如果大家真的非常关注的话,那有必要给大家讲解一下论文结果相关的问题。很多的论文,上至国赛优秀论文下至不获奖的论文并不是所有的论文都可以进行完整的复现求解,大部分数模论文都为存在一个灰色地带。         白色地带即认为所有的代码均可运行、公开

如何完成本科毕业论文设计

完成本科毕业论文设计是一个系统性的工程,需要经过多个阶段的规划、执行和总结。以下是一个详细的步骤指南,帮助你顺利完成本科毕业论文设计。 ### 1. 选题与开题 - **选题**:选择一个有研究价值且你感兴趣的题目。与导师讨论,确保题目具有可行性和创新性。 - **开题报告**:撰写开题报告,包括研究背景、研究目的、研究内容、研究方法、预期成果等。 ### 2. 文献综述 - **文献检索**

LabVIEW环境中等待FPGA模块初始化完成

这个程序使用的是LabVIEW环境中的FPGA模块和I/O模块初始化功能,主要实现等待FAM(Field-Programmable Gate Array Module,FPGA模块)的初始化完成,并处理初始化过程中的错误。让我们逐步分析各部分的功能: 1. Wait for FAM Initialization框架 此程序框架用于等待I/O模块成功初始化。如果在5秒钟内模块没有完成配

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式

MapReduce算法 – 反转排序(Order Inversion)

译者注:在刚开始翻译的时候,我将Order Inversion按照字面意思翻译成“反序”或者“倒序”,但是翻译完整篇文章之后,我感觉到,将Order Inversion翻译成反序模式是不恰当的,根据本文的内容,很显然,Inversion并非是将顺序倒排的意思,而是如同Spring的IOC一样,表明的是一种控制权的反转。Spring将对象的实例化责任从业务代码反转给了框架,而在本文的模式中,在map