Hadoop2.6.0运行mapreduce之Uber模式验证

2023-10-25 03:18

本文主要是介绍Hadoop2.6.0运行mapreduce之Uber模式验证,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

在有些情况下,运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加Hadoop集群的资源消耗,并且因为创建分配Container本身的开销,还会增加这些任务的运行时延。如果能将这些小任务都放入少量的Container中执行,将会解决这些问题。好在Hadoop本身已经提供了这种功能,只需要我们理解其原理,并应用它。

Uber运行模式就是解决此类问题的现成解决方案。本文旨在通过测试手段验证Uber运行模式的效果,在正式的生成环境下,还需要大家具体情况具体对待。

Uber运行模式

Uber运行模式对小作业进行优化,不会给每个任务分别申请分配Container资源,这些小任务将统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行。那么什么样的任务,mapreduce框架会认为它是小任务呢?

  • 地图任务的数量不大于mapreduce.job.ubertask.maxmaps参数(默认值是9)的值;
  • 减少任务的数量不大于mapreduce.job.ubertask.maxreduces参数(默认值是1)的值;
  • 输入文件大小不大于mapreduce.job.ubertask.maxbytes参数(默认为1个Block的字节大小)的值;
  • map任务和reduce任务需要的资源量不能大于MRAppMaster(mapreduce作业的ApplicationMaster)可用的资源总量;
我们可以使用在《 Hadoop2.6.0配置参数查看小工具》一文中制作的小工具,查看 Uber相关参数及其默认值:

上面显示的参数mapreduce.job.ubertask.enable用来控制是否开启 Uber运行模式,默认为false。

优化

为简单起见,我们还是以WordCount例子展开。输入数据及输出结果目录的构造过程可以参照《 Hadoop2.6.0的FileInputFormat的任务切分原理分析》一文,本文不再赘述。

限制任务划分数量

我们知道WordCount例子中的reduce任务的数量通过Job.setNumReduceTasks(int)方法已经设置为1,因此满足mapreduce.job.ubertask.maxreduces参数的限制。所以我们首先控制下map任务的数量,我们通过设置mapreduce.input.fileinputformat.split.maxsize参数来限制。看看在满足小任务前提,但是不开启 Uber运行模式时的执行情况。执行命令如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 /wordcount/input /wordcount/output/result1  
观察执行结果,可以看到没有启用Uber模式,作业划分为6个分片,如下图:

还可以看到一共是6个地图任务和1个减少任务,如下图:

我在任务执行过程中,在web界面对于分配的Container进行截图,可以看到一共分配了7个Container:

如果阅读了《 Hadoop2.6.0的FileInputFormat的任务切分原理分析》一文,你会知道输入源/wordcount/input目录下2个文件的大小总和为177字节,为了这么小的数据量和简单的WordCount而分配这么多资源的确很不划算。

开启Uber模式


现在我们开启mapreduce.job.ubertask.enable参数并使用Uber运行模式,命令如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result2  

然后观察执行结果,可以看到已经启用了Uber模式,如下图:

依然是6个地图任务和1个减少任务,但是之前的数据本地地图任务= 6一行信息已经变为当地的其他地图tasks=6。此外还增加了TOTAL_LAUNCHED_UBERTASKS、NUM_UBER_SUBMAPS、NUM_UBER_SUBREDUCES等信息,如下图所示:

以下列出这几个信息的含义:
输出字段描述
TOTAL_LAUNCHED_UBERTASKS启动的Uber任务数
NUM_UBER_SUBMAPSUber任务中的地图任务数
NUM_UBER_SUBREDUCESUber中减少任务数
因此我们知道这7个任务都在Uber模式下运行,其中包含6个map任务和1个reduce任务。
即便如此,有人依然会担心真正分配了多少Container资源,请看我在web界面的截图:

其它测试

由于我主动控制了分片大小,导致分片数量是6,这小于mapreduce.job.ubertask.maxmaps参数的默认值9。按照之前的介绍,当map任务数量大于9时,那么这个作业就不会被认为小任务。所以我们先将分片大小调整为20字节,使得map任务的数量刚好等于9,然后执行以下命令:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 20 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result3  
任务划分相关的信息截图如下:

。我们看到的确将输入数据划分为9份了其它信息如下:

我们看到一共10个Uber模式运行的任务,其中包括9个地图任务和1个减少任务。
最后,我们再将分片大小调整为19字节,使得map任务数量等于10,然后执行以下命令:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 19 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result4  
任务划分相关的信息截图如下:

。我们看到的确将输入数据划分为10份了其它信息如下:

可以看到又重新显示了数据的本地地图 地图
此外,还可以通过调整reduce任务数量或者输入数据大小等方式,使得Uber失效,有兴趣的同学可以自行测试。

源码分析

本文的最后,我们从源码实现的角度来具体分析下Uber运行机制。有经验的Hadoop工程师,想必知道当mapreduce任务提交给ResourceManager后,由RM负责向NodeManger通信启动一个Container用于执行MRAppMaster。启动MRAppMaster实际也是通过调用其main方法,最终会调用MRAppMaster实例的serviceStart方法,其实现如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. protected void serviceStart() throws Exception {  
  2.   
  3.   // 省略无关代码  
  4.   job = createJob(getConfig(), forcedState, shutDownMessage);  
  5.   
  6.   // 省略无关代码  
  7.   if (!errorHappenedShutDown) {  
  8.     JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);  
  9.   
  10.     jobEventDispatcher.handle(initJobEvent);  
  11.   
  12.     // 省略无关代码  
  13.   
  14.     if (job.isUber()) {  
  15.       speculatorEventDispatcher.disableSpeculation();  
  16.     } else {  
  17.       dispatcher.getEventHandler().handle(  
  18.           new SpeculatorEvent(job.getID(), clock.getTime()));  
  19.     }  
  20.   
  21.   }  

serviceStart方法的执行步骤如下:
  1. 调用createJob方法创建JobImpl实例。
  2. 发送JOB_INIT事件,然后处理此事件。
  3. 使用Uber运行模式的一个附加动作——即一旦满足Uber运行的四个条件,那么将不会进行推断执行优化。
createJob方法的代码实现如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. protected Job createJob(Configuration conf, JobStateInternal forcedState,   
  2.     String diagnostic) {  
  3.   
  4.   // create single job  
  5.   Job newJob =  
  6.       new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),  
  7.           taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,  
  8.           completedTasksFromPreviousRun, metrics,  
  9.           committer, newApiCommitter,  
  10.           currentUser.getUserName(), appSubmitTime, amInfos, context,   
  11.           forcedState, diagnostic);  
  12.   ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);  
  13.   
  14.   dispatcher.register(JobFinishEvent.Type.class,  
  15.       createJobFinishEventHandler());       
  16.   return newJob;  
  17. }  

从以上代码可以看到创建了一个JobImpl对象,此对象自身维护了一个状态机(有关状态机转换的实现原理可以参阅《 Hadoop2.6.0中YARN底层状态机实现分析》一文的内容),用于在接收到事件之后进行状态转移并触发一些动作。JobImpl新建后的状态forcedState是JobStateInternal.NEW。最后将此JobImpl对象放入AppContext接口的实现类RunningAppContext的类型为Map<JobId,工作>的缓存上下文中。
JobEventDispatcher的handle方法用来处理JobEvent。之前说到serviceStart方法主动创建了一个类型是JobEventType.JOB_INIT的JobEvent,并且交由JobEventDispatcher的handle方法处理。handle方法的实现如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. private class JobEventDispatcher implements EventHandler<JobEvent> {  
  2.   @SuppressWarnings("unchecked")  
  3.   @Override  
  4.   public void handle(JobEvent event) {  
  5.     ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);  
  6.   }  
  7. }  

处理方法从AppContext的实现类RunningAppContext中获取JobImpl对象,代码如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. @Override  
  2. public Job getJob(JobId jobID) {  
  3.   return jobs.get(jobID);  
  4. }  
最后调用JobImpl实例的句柄方法,其实现如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. public void handle(JobEvent event) {  
  2.   if (LOG.isDebugEnabled()) {  
  3.     LOG.debug("Processing " + event.getJobId() + " of type "  
  4.         + event.getType());  
  5.   }  
  6.   try {  
  7.     writeLock.lock();  
  8.     JobStateInternal oldState = getInternalState();  
  9.     try {  
  10.        getStateMachine().doTransition(event.getType(), event);  
  11.     } catch (InvalidStateTransitonException e) {  
  12.       LOG.error("Can't handle this event at current state", e);  
  13.       addDiagnostic("Invalid event " + event.getType() +   
  14.           " on Job " + this.jobId);  
  15.       eventHandler.handle(new JobEvent(this.jobId,  
  16.           JobEventType.INTERNAL_ERROR));  
  17.     }  
  18.     //notify the eventhandler of state change  
  19.     if (oldState != getInternalState()) {  
  20.       LOG.info(jobId + "Job Transitioned from " + oldState + " to "  
  21.                + getInternalState());  
  22.       rememberLastNonFinalState(oldState);  
  23.     }  
  24.   }  
  25.     
  26.   finally {  
  27.     writeLock.unlock();  
  28.   }  
  29. }  
处理方法的处理步骤如下:
  1. 获取修改JobImpl实例的锁;
  2. 获取JobImpl实例目前所处的状态
  3. 状态机状态转换;
  4. 释放修改JobImpl实例的锁。
getInternalState方法用于获取JobImpl实例当前的状态,其实现如下:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. @Private  
  2. public JobStateInternal getInternalState() {  
  3.   readLock.lock();  
  4.   try {  
  5.     if(forcedState != null) {  
  6.       return forcedState;  
  7.     }  
  8.    return getStateMachine().getCurrentState();  
  9.   } finally {  
  10.     readLock.unlock();  
  11.   }  
  12. }  
我们之前介绍过,在创建JobImpl实例时,其forcedState字段应当是JobStateInternal.NEW。
JobImpl状态机转移时,处理的JobEvent的类型是JobEventType.JOB_INIT,因此经过状态机转换最终会调用InitTransition的transition方法。有关状态机转换的实现原理可以参阅《 Hadoop2.6.0中YARN底层状态机实现分析》一文的内容。
InitTransition的transition方法处理Uber运行模式的关键代码是
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. @Override  
  2. public JobStateInternal transition(JobImpl job, JobEvent event) {  
  3.     // 省略无关代码  
  4.     job.makeUberDecision(inputLength);  
  5.       
  6.     // 省略无关代码  
  7. }  
最后我们看看JobImpl实例的makeUberDecision方法的实现:
[java]  view plain copy
在CODE上查看代码片 派生到我的代码片
  1. private void makeUberDecision(long dataInputLength) {  
  2.   //FIXME:  need new memory criterion for uber-decision (oops, too late here;  
  3.   // until AM-resizing supported,  
  4.   // must depend on job client to pass fat-slot needs)  
  5.   // these are no longer "system" settings, necessarily; user may override  
  6.   int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);  
  7.   
  8.   int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);  
  9.   
  10.   long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,  
  11.       fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from  
  12.                                  // [File?]InputFormat and default block size  
  13.                                  // from that  
  14.   
  15.   long sysMemSizeForUberSlot =  
  16.       conf.getInt(MRJobConfig.MR_AM_VMEM_MB,  
  17.           MRJobConfig.DEFAULT_MR_AM_VMEM_MB);  
  18.   
  19.   long sysCPUSizeForUberSlot =  
  20.       conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,  
  21.           MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);  
  22.   
  23.   boolean uberEnabled =  
  24.       conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);  
  25.   boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);  
  26.   boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);  
  27.   boolean smallInput = (dataInputLength <= sysMaxBytes);  
  28.   // ignoring overhead due to UberAM and statics as negligible here:  
  29.   long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);  
  30.   long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);  
  31.   long requiredMB = Math.max(requiredMapMB, requiredReduceMB);  
  32.   int requiredMapCores = conf.getInt(  
  33.           MRJobConfig.MAP_CPU_VCORES,   
  34.           MRJobConfig.DEFAULT_MAP_CPU_VCORES);  
  35.   int requiredReduceCores = conf.getInt(  
  36.           MRJobConfig.REDUCE_CPU_VCORES,   
  37.           MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);  
  38.   int requiredCores = Math.max(requiredMapCores, requiredReduceCores);      
  39.   if (numReduceTasks == 0) {  
  40.     requiredMB = requiredMapMB;  
  41.     requiredCores = requiredMapCores;  
  42.   }  
  43.   boolean smallMemory =  
  44.       (requiredMB <= sysMemSizeForUberSlot)  
  45.       || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);  
  46.     
  47.   boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;  
  48.   boolean notChainJob = !isChainJob(conf);  
  49.   
  50.   // User has overall veto power over uberization, or user can modify  
  51.   // limits (overriding system settings and potentially shooting  
  52.   // themselves in the head).  Note that ChainMapper/Reducer are  
  53.   // fundamentally incompatible with MR-1220; they employ a blocking  
  54.   // queue between the maps/reduces and thus require parallel execution,  
  55.   // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks  
  56.   // and thus requires sequential execution.  
  57.   isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks  
  58.       && smallInput && smallMemory && smallCpu   
  59.       && notChainJob;  
  60.   
  61.   if (isUber) {  
  62.     LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"  
  63.         + numReduceTasks + "r tasks (" + dataInputLength  
  64.         + " input bytes) will run sequentially on single node.");  
  65.   
  66.     // make sure reduces are scheduled only after all map are completed  
  67.     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,  
  68.                       1.0f);  
  69.     // uber-subtask attempts all get launched on same node; if one fails,  
  70.     // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,  
  71.     // limit attempts to 1 (or at most 2?  probably not...)  
  72.     conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);  
  73.     conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);  
  74.   
  75.     // disable speculation  
  76.     conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);  
  77.     conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);  
  78.   } else {  
  79.     StringBuilder msg = new StringBuilder();  
  80.     msg.append("Not uberizing ").append(jobId).append(" because:");  
  81.     if (!uberEnabled)  
  82.       msg.append(" not enabled;");  
  83.     if (!smallNumMapTasks)  
  84.       msg.append(" too many maps;");  
  85.     if (!smallNumReduceTasks)  
  86.       msg.append(" too many reduces;");  
  87.     if (!smallInput)  
  88.       msg.append(" too much input;");  
  89.     if (!smallCpu)  
  90.       msg.append(" too much CPU;");  
  91.     if (!smallMemory)  
  92.       msg.append(" too much RAM;");  
  93.     if (!notChainJob)  
  94.       msg.append(" chainjob;");  
  95.     LOG.info(msg.toString());  
  96.   }  
  97. }  
如果你认真阅读以上代码的实现,就知道这正是我在本文一开始说的Uber运行模式判断mapreduce作业是否采用Uber模式执行的4个条件,缺一不可。一旦判定为Uber运行模式,那么还告诉我们以下几点:
  1. 设置当map任务全部运行结束后才开始reduce任务(参数mapreduce.job.reduce.slowstart.completedmaps设置为1.0)。
  2. 将当前Job的最大map任务尝试执行次数(参数mapreduce.map.maxattempts)和最大reduce任务尝试次数(参数mapreduce.reduce.maxattempts)都设置为1。
  3. 取消当前Job的map任务的推断执行(参数mapreduce.map.speculative设置为false)和reduce任务的推断执行(参数mapreduce.reduce.speculative设置为false)。

这篇关于Hadoop2.6.0运行mapreduce之Uber模式验证的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/279659

相关文章

eclipse如何运行springboot项目

《eclipse如何运行springboot项目》:本文主要介绍eclipse如何运行springboot项目问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目js录当在eclipse启动spring boot项目时出现问题解决办法1.通过cmd命令行2.在ecl

SQL Server身份验证模式步骤和示例代码

《SQLServer身份验证模式步骤和示例代码》SQLServer是一个广泛使用的关系数据库管理系统,通常使用两种身份验证模式:Windows身份验证和SQLServer身份验证,本文将详细介绍身份... 目录身份验证方式的概念更改身份验证方式的步骤方法一:使用SQL Server Management S

使用nohup和--remove-source-files在后台运行rsync并记录日志方式

《使用nohup和--remove-source-files在后台运行rsync并记录日志方式》:本文主要介绍使用nohup和--remove-source-files在后台运行rsync并记录日... 目录一、什么是 --remove-source-files?二、示例命令三、命令详解1. nohup2.

Spring Boot项目打包和运行的操作方法

《SpringBoot项目打包和运行的操作方法》SpringBoot应用内嵌了Web服务器,所以基于SpringBoot开发的web应用也可以独立运行,无须部署到其他Web服务器中,下面以打包dem... 目录一、打包为JAR包并运行1.打包为可执行的 JAR 包2.运行 JAR 包二、打包为WAR包并运行

Redis高可用-主从复制、哨兵模式与集群模式详解

《Redis高可用-主从复制、哨兵模式与集群模式详解》:本文主要介绍Redis高可用-主从复制、哨兵模式与集群模式的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录Redis高可用-主从复制、哨兵模式与集群模式概要一、主从复制(Master-Slave Repli

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Python如何精准判断某个进程是否在运行

《Python如何精准判断某个进程是否在运行》这篇文章主要为大家详细介绍了Python如何精准判断某个进程是否在运行,本文为大家整理了3种方法并进行了对比,有需要的小伙伴可以跟随小编一起学习一下... 目录一、为什么需要判断进程是否存在二、方法1:用psutil库(推荐)三、方法2:用os.system调用

Nginx location匹配模式与规则详解

《Nginxlocation匹配模式与规则详解》:本文主要介绍Nginxlocation匹配模式与规则,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、环境二、匹配模式1. 精准模式2. 前缀模式(不继续匹配正则)3. 前缀模式(继续匹配正则)4. 正则模式(大

Linux内核参数配置与验证详细指南

《Linux内核参数配置与验证详细指南》在Linux系统运维和性能优化中,内核参数(sysctl)的配置至关重要,本文主要来聊聊如何配置与验证这些Linux内核参数,希望对大家有一定的帮助... 目录1. 引言2. 内核参数的作用3. 如何设置内核参数3.1 临时设置(重启失效)3.2 永久设置(重启仍生效