关于Maptask任务单线程与多线程执行器解读

2023-12-01 22:32

本文主要是介绍关于Maptask任务单线程与多线程执行器解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

相比Mpareduce 老版本的API, 新版本的API 在maptask执行map任务的接口设计上有比较大的改动。  

在老版的API中, MapRunner的run函数中:

public void run(RecordReader input, OutputCollector output, Reporter reporter)
        throws IOException
    {
        Object key = input.createKey();
        Object value = input.createValue();
        do
        {
            if(!input.next(key, value))
                break;
            mapper.map(key, value, output, reporter);
            if(incrProcCount)
                reporter.incrCounter("SkippingTaskCounters", "MapProcessedRecords", 1L);
        } while(true);
        mapper.close();
        break MISSING_BLOCK_LABEL_91;
        Exception exception;
        exception;
        mapper.close();
        throw exception;
    }

从代码中可以发现,对于当前maptask的input,就是一个recoredreader,调用nextkey函数,获取key value对, 然后直接交给mapper的map函数来执行。

新版的API中,在maptask的runNewMapper函数里面,先获取当前的mapper类, 

mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

然后直接调用mapper对象的run函数:

mapper.run(mapperContext);

在run函数里面,会将每个key value对 交给map函数进行处理。


一般情况下,我们都使用单线程处理器来完成map任务,mappreunner就是一个单线程处理器,同时hadoop也为maptask定义了多线程处理器,适合cpu多核条件下。MultithreadedMapRunner的代码如下:


 /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.mapred.lib;
020    
021    import org.apache.hadoop.util.ReflectionUtils;
022    import org.apache.hadoop.classification.InterfaceAudience;
023    import org.apache.hadoop.classification.InterfaceStability;
024    import org.apache.hadoop.mapred.MapRunnable;
025    import org.apache.hadoop.mapred.JobConf;
026    import org.apache.hadoop.mapred.Mapper;
027    import org.apache.hadoop.mapred.RecordReader;
028    import org.apache.hadoop.mapred.OutputCollector;
029    import org.apache.hadoop.mapred.Reporter;
030    import org.apache.hadoop.mapred.SkipBadRecords;
031    import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    import java.io.IOException;
036    import java.util.concurrent.*;
037    
038    /**
039     * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
040     * <p>
041     * It can be used instead of the default implementation,
042     * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
043     * bound in order to improve throughput.
044     * <p>
045     * Map implementations using this MapRunnable must be thread-safe.
046     * <p>
047     * The Map-Reduce job has to be configured to use this MapRunnable class (using
048     * the JobConf.setMapRunnerClass method) and
049     * the number of thread the thread-pool can use with the
050     * <code>mapred.map.multithreadedrunner.threads</code> property, its default
051     * value is 10 threads.
052     * <p>
053     */
054    @InterfaceAudience.Public
055    @InterfaceStability.Stable
056    public class MultithreadedMapRunner<K1, V1, K2, V2>
057        implements MapRunnable<K1, V1, K2, V2> {
058    
059      private static final Log LOG =
060        LogFactory.getLog(MultithreadedMapRunner.class.getName());
061    
062      private JobConf job;
063      private Mapper<K1, V1, K2, V2> mapper;
064      private ExecutorService executorService;
065      private volatile IOException ioException;
066      private volatile RuntimeException runtimeException;
067      private boolean incrProcCount;
068    
069      @SuppressWarnings("unchecked")
070      public void configure(JobConf jobConf) {
071        int numberOfThreads =
072          jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
073        if (LOG.isDebugEnabled()) {
074          LOG.debug("Configuring jobConf " + jobConf.getJobName() +
075                    " to use " + numberOfThreads + " threads");
076        }
077    
078        this.job = jobConf;
079        //increment processed counter only if skipping feature is enabled
080        this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
081          SkipBadRecords.getAutoIncrMapperProcCount(job);
082        this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
083            jobConf);
084    
085        // Creating a threadpool of the configured size to execute the Mapper
086        // map method in parallel.
087        executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
088                                                 0L, TimeUnit.MILLISECONDS,
089                                                 new BlockingArrayQueue
090                                                   (numberOfThreads));
091      }
092    
093      /**
094       * A blocking array queue that replaces offer and add, which throws on a full
095       * queue, to a put, which waits on a full queue.
096       */
097      private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
098     
099        private static final long serialVersionUID = 1L;
100        public BlockingArrayQueue(int capacity) {
101          super(capacity);
102        }
103        public boolean offer(Runnable r) {
104          return add(r);
105        }
106        public boolean add(Runnable r) {
107          try {
108            put(r);
109          } catch (InterruptedException ie) {
110            Thread.currentThread().interrupt();
111          }
112          return true;
113        }
114      }
115    
116      private void checkForExceptionsFromProcessingThreads()
117          throws IOException, RuntimeException {
118        // Checking if a Mapper.map within a Runnable has generated an
119        // IOException. If so we rethrow it to force an abort of the Map
120        // operation thus keeping the semantics of the default
121        // implementation.
122        if (ioException != null) {
123          throw ioException;
124        }
125    
126        // Checking if a Mapper.map within a Runnable has generated a
127        // RuntimeException. If so we rethrow it to force an abort of the Map
128        // operation thus keeping the semantics of the default
129        // implementation.
130        if (runtimeException != null) {
131          throw runtimeException;
132        }
133      }
134    
135      public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
136                      Reporter reporter)
137        throws IOException {
138        try {
139          // allocate key & value instances these objects will not be reused
140          // because execution of Mapper.map is not serialized.
141          K1 key = input.createKey();
142          V1 value = input.createValue();
143    
144          while (input.next(key, value)) {
145    
146            executorService.execute(new MapperInvokeRunable(key, value, output,
147                                    reporter));
148    
149            checkForExceptionsFromProcessingThreads();
150    
151            // Allocate new key & value instances as mapper is running in parallel
152            key = input.createKey();
153            value = input.createValue();
154          }
155    
156          if (LOG.isDebugEnabled()) {
157            LOG.debug("Finished dispatching all Mappper.map calls, job "
158                      + job.getJobName());
159          }
160    
161          // Graceful shutdown of the Threadpool, it will let all scheduled
162          // Runnables to end.
163          executorService.shutdown();
164    
165          try {
166    
167            // Now waiting for all Runnables to end.
168            while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
169              if (LOG.isDebugEnabled()) {
170                LOG.debug("Awaiting all running Mappper.map calls to finish, job "
171                          + job.getJobName());
172              }
173    
174              // NOTE: while Mapper.map dispatching has concluded there are still
175              // map calls in progress and exceptions would be thrown.
176              checkForExceptionsFromProcessingThreads();
177    
178            }
179    
180            // NOTE: it could be that a map call has had an exception after the
181            // call for awaitTermination() returing true. And edge case but it
182            // could happen.
183            checkForExceptionsFromProcessingThreads();
184    
185          } catch (IOException ioEx) {
186            // Forcing a shutdown of all thread of the threadpool and rethrowing
187            // the IOException
188            executorService.shutdownNow();
189            throw ioEx;
190          } catch (InterruptedException iEx) {
191            throw new RuntimeException(iEx);
192          }
193    
194        } finally {
195          mapper.close();
196        }
197      }
198    
199    
200      /**
201       * Runnable to execute a single Mapper.map call from a forked thread.
202       */
203      private class MapperInvokeRunable implements Runnable {
204        private K1 key;
205        private V1 value;
206        private OutputCollector<K2, V2> output;
207        private Reporter reporter;
208    
209        /**
210         * Collecting all required parameters to execute a Mapper.map call.
211         * <p>
212         *
213         * @param key
214         * @param value
215         * @param output
216         * @param reporter
217         */
218        public MapperInvokeRunable(K1 key, V1 value,
219                                   OutputCollector<K2, V2> output,
220                                   Reporter reporter) {
221          this.key = key;
222          this.value = value;
223          this.output = output;
224          this.reporter = reporter;
225        }
226    
227        /**
228         * Executes a Mapper.map call with the given Mapper and parameters.
229         * <p>
230         * This method is called from the thread-pool thread.
231         *
232         */
233        public void run() {
234          try {
235            // map pair to output
236            MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
237            if(incrProcCount) {
238              reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
239                  SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
240            }
241          } catch (IOException ex) {
242            // If there is an IOException during the call it is set in an instance
243            // variable of the MultithreadedMapRunner from where it will be
244            // rethrown.
245            synchronized (MultithreadedMapRunner.this) {
246              if (MultithreadedMapRunner.this.ioException == null) {
247                MultithreadedMapRunner.this.ioException = ex;
248              }
249            }
250          } catch (RuntimeException ex) {
251            // If there is a RuntimeException during the call it is set in an
252            // instance variable of the MultithreadedMapRunner from where it will be
253            // rethrown.
254            synchronized (MultithreadedMapRunner.this) {
255              if (MultithreadedMapRunner.this.runtimeException == null) {
256                MultithreadedMapRunner.this.runtimeException = ex;
257              }
258            }
259          }
260        }
261      }
262    
263    }

这篇关于关于Maptask任务单线程与多线程执行器解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/442938

相关文章

SpringBoot集成XXL-JOB实现任务管理全流程

《SpringBoot集成XXL-JOB实现任务管理全流程》XXL-JOB是一款轻量级分布式任务调度平台,功能丰富、界面简洁、易于扩展,本文介绍如何通过SpringBoot项目,使用RestTempl... 目录一、前言二、项目结构简述三、Maven 依赖四、Controller 代码详解五、Service

Linux系统管理与进程任务管理方式

《Linux系统管理与进程任务管理方式》本文系统讲解Linux管理核心技能,涵盖引导流程、服务控制(Systemd与GRUB2)、进程管理(前台/后台运行、工具使用)、计划任务(at/cron)及常用... 目录引言一、linux系统引导过程与服务控制1.1 系统引导的五个关键阶段1.2 GRUB2的进化优

Python多线程实现大文件快速下载的代码实现

《Python多线程实现大文件快速下载的代码实现》在互联网时代,文件下载是日常操作之一,尤其是大文件,然而,网络条件不稳定或带宽有限时,下载速度会变得很慢,本文将介绍如何使用Python实现多线程下载... 目录引言一、多线程下载原理二、python实现多线程下载代码说明:三、实战案例四、注意事项五、总结引

Python多线程应用中的卡死问题优化方案指南

《Python多线程应用中的卡死问题优化方案指南》在利用Python语言开发某查询软件时,遇到了点击搜索按钮后软件卡死的问题,本文将简单分析一下出现的原因以及对应的优化方案,希望对大家有所帮助... 目录问题描述优化方案1. 网络请求优化2. 多线程架构优化3. 全局异常处理4. 配置管理优化优化效果1.

Python Flask实现定时任务的不同方法详解

《PythonFlask实现定时任务的不同方法详解》在Flask中实现定时任务,最常用的方法是使用APScheduler库,本文将提供一个完整的解决方案,有需要的小伙伴可以跟随小编一起学习一下... 目录完js整实现方案代码解释1. 依赖安装2. 核心组件3. 任务类型4. 任务管理5. 持久化存储生产环境

Qt中实现多线程导出数据功能的四种方式小结

《Qt中实现多线程导出数据功能的四种方式小结》在以往的项目开发中,在很多地方用到了多线程,本文将记录下在Qt开发中用到的多线程技术实现方法,以导出指定范围的数字到txt文件为例,展示多线程不同的实现方... 目录前言导出文件的示例工具类QThreadQObject的moveToThread方法实现多线程QC

C语言自定义类型之联合和枚举解读

《C语言自定义类型之联合和枚举解读》联合体共享内存,大小由最大成员决定,遵循对齐规则;枚举类型列举可能值,提升可读性和类型安全性,两者在C语言中用于优化内存和程序效率... 目录一、联合体1.1 联合体类型的声明1.2 联合体的特点1.2.1 特点11.2.2 特点21.2.3 特点31.3 联合体的大小1

Python标准库datetime模块日期和时间数据类型解读

《Python标准库datetime模块日期和时间数据类型解读》文章介绍Python中datetime模块的date、time、datetime类,用于处理日期、时间及日期时间结合体,通过属性获取时间... 目录Datetime常用类日期date类型使用时间 time 类型使用日期和时间的结合体–日期时间(

C语言中%zu的用法解读

《C语言中%zu的用法解读》size_t是无符号整数类型,用于表示对象大小或内存操作结果,%zu是C99标准中专为size_t设计的printf占位符,避免因类型不匹配导致错误,使用%u或%d可能引发... 目录size_t 类型与 %zu 占位符%zu 的用途替代占位符的风险兼容性说明其他相关占位符验证示

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多