Hadoop2源码分析-YARN 的服务库和事件库

2024-05-27 12:32

本文主要是介绍Hadoop2源码分析-YARN 的服务库和事件库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.概述

  在《Hadoop2源码分析-YARN RPC 示例介绍》一文当中,给大家介绍了YARN 的 RPC 机制,以及相关代码的演示,今天我们继续去学习 YARN 的服务库和事件库,分享目录如下所示:

  • 服务库和事件库介绍
  • 使用示例
  • 截图预览

  下面开始今天的内容分享。

2.服务库和事件库介绍

2.1服务库

  YARN对于生命周期较长的对象使用服务的对象模型进行管理,主要特点如下:

  • 用于被服务化的对象包含4个状态,他们分别是:被创建、已初始化、已启动和已停止。源代码地址在 org.apache.hadoop.service 的 Service 接口中,内容如下所示:
复制代码
public enum STATE {/** Constructed but not initialized */NOTINITED(0, "NOTINITED"),/** Initialized but not started or stopped */INITED(1, "INITED"),/** started and not stopped */STARTED(2, "STARTED"),/** stopped. No further state transitions are permitted */STOPPED(3, "STOPPED");/*** An integer value for use in array lookup and JMX interfaces.* Although {@link Enum#ordinal()} could do this, explicitly* identify the numbers gives more stability guarantees over time.*/private final int value;/*** A name of the state that can be used in messages*/private final String statename;private STATE(int value, String name) {this.value = value;this.statename = name;}/*** Get the integer value of a state* @return the numeric value of the state*/public int getValue() {return value;}/*** Get the name of a state* @return the state's name*/@Overridepublic String toString() {return statename;}}
复制代码
public abstract class AbstractService implements Service {// ......

}

  通过阅读代码,我们可以看出,服务的对象它实现了接口Service,并定义了最基本的服务状态:创建、初始化、启动以及停止。对于 AbstractService 类来说,它实现了 Service 接口。

  • 任何服务状态的变化都可以触发其他的动作,例如:
复制代码
public void start() {if (isInState(STATE.STARTED)) {return;}//enter the started statesynchronized (stateChangeLock) {if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {try {startTime = System.currentTimeMillis();serviceStart();if (isInState(STATE.STARTED)) {//if the service started (and isn't now in a later state), notifyif (LOG.isDebugEnabled()) {LOG.debug("Service " + getName() + " is started");}notifyListeners();}} catch (Exception e) {noteFailure(e);ServiceOperations.stopQuietly(LOG, this);throw ServiceStateException.convert(e);}}}}
复制代码

  这里,我们会去触发一个监听动作,全局监听状态的改变,异常的捕捉监听等。

  • 可以通过组合的方式进行服务组合,这样做的好处是便于统一去管理:在 YARN 中,如果是非组合服务,可以直接继承 AbstractService 类,否则需继承 CompositeService。

2.2事件库

  在 YARN 中,核心服务其本质就是一个中央异步调度器,包含有ResourceManager、 NodeManager、MRAppMaster等内容,YARN 事件与事件处理器的关系在 

org.apache.hadoop.yarn.event  中。在使用 YARN 事件库的时候,需要先定义一个中央异步调度器 AsyncDispatcher,它负责事件的处理与转发,然后我们根据实际业务需求定义一系列事件 Event 与事件处理器 EventHandler,并将事件注册到中央异步调度器中用于完成事件统一管理和应用调度。流程如下图所示:

3.使用示例

  接下来,我们编写示例代码,去代码中理解这部分流程。

  • 首先是 JMRAppMaster 类:
复制代码
package cn.hadoop.task.exec;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;import cn.hadoop.task.CompositeService;
import cn.hadoop.task.JobEvent;
import cn.hadoop.task.JobEventType;
import cn.hadoop.task.TaskEvent;
import cn.hadoop.task.TaskEventType;/*** @Date Jul 22, 2015** @Author dengjie** @Note TODO*/
public class JMRAppMaster extends CompositeService {private Dispatcher dispatcher; // AsyncDispatcherprivate String jobID;private int taskNumber; // include numbersprivate String[] taskIDs; // include all taskpublic JMRAppMaster(String name, String jobID, int taskNumber) {super(name);this.jobID = jobID;this.taskNumber = taskNumber;taskIDs = new String[taskNumber];for (int i = 0; i < taskNumber; i++) {taskIDs[i] = new String(this.jobID + "_task_" + i);}}public void serviceInit(Configuration conf) throws Exception {dispatcher = new AsyncDispatcher();// default a AsyncDispatcherdispatcher.register(JobEventType.class, new JobEventDispatcher());// register a jobdispatcher.register(TaskEventType.class, new TaskEventDispatcher());// register a task
        addService((Service) dispatcher);super.serviceInit(conf);}public Dispatcher getDispatcher() {return dispatcher;}private class JobEventDispatcher implements EventHandler<JobEvent> {@SuppressWarnings("unchecked")public void handle(JobEvent event) {if (event.getType() == JobEventType.JOB_KILL) {System.out.println("Receive JOB_KILL event, killing all the tasks");for (int i = 0; i < taskNumber; i++) {dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));}} else if (event.getType() == JobEventType.JOB_INIT) {System.out.println("Receive JOB_INIT event, scheduling tasks");for (int i = 0; i < taskNumber; i++) {dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));}}}}private class TaskEventDispatcher implements EventHandler<TaskEvent> {public void handle(TaskEvent event) {if (event.getType() == TaskEventType.T_KILL) {System.out.println("Receive T_KILL event of task id " + event.getTaskID());} else if (event.getType() == TaskEventType.T_SCHEDULE) {System.out.println("Receive T_SCHEDULE event of task id " + event.getTaskID());}}}
}
复制代码

  另外,还需要添加一些其他类,这些类以来可以在 Hadoop 源码工程中找到,这里就不贴代码了,大家可以到 Hadoop 工程的源码中找到对应的类,相关类名如下图所示:

  接下来是一个测试类,去测试一下我们所编写的示例流程。

  • JMRAppMasterTest类:

复制代码
package cn.hadoop.rpc.test.yarn.task;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import cn.hadoop.task.JobEvent;
import cn.hadoop.task.JobEventType;
import cn.hadoop.task.exec.JMRAppMaster;/*** @Date Jul 22, 2015** @Author dengjie** @Note TODO*/
public class JMRAppMasterTest {@SuppressWarnings({ "unchecked", "resource" })public static void main(String[] args) {String jobID = "job_20150723_11";JMRAppMaster appMaster = new JMRAppMaster("Simple MRAppMaster Test", jobID, 10);YarnConfiguration conf = new YarnConfiguration(new Configuration());try {appMaster.serviceInit(conf);appMaster.serviceStart();} catch (Exception e) {e.printStackTrace();}appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT));}
}
复制代码

4.截图预览

  在编写完成相关流程代码后,我们运行代码来观察整个流程,截图如下所示:

5.总结

  在编写这部分流程代码时,可以参考 Hadoop YARN 部分的工程源码,通过运行调试代码,掌握对事件库和服务库的流程,以及它们的工作机制。另外,在编写的过程当中,最好将源码的文件引入到自己的工程,不要单独使用 JAR 包的方式导入,由于我们是独立运行某个模块,需要改动源代码的函数访问权限,若是直接引入 JAR 包地址,会导致函数修饰权限问题而不能运行,这里大家在运行调试的时候注意即可。

这篇关于Hadoop2源码分析-YARN 的服务库和事件库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007485

相关文章

基于Go语言实现Base62编码的三种方式以及对比分析

《基于Go语言实现Base62编码的三种方式以及对比分析》Base62编码是一种在字符编码中使用62个字符的编码方式,在计算机科学中,,Go语言是一种静态类型、编译型语言,它由Google开发并开源,... 目录一、标准库现状与解决方案1. 标准库对比表2. 解决方案完整实现代码(含边界处理)二、关键实现细

PostgreSQL 序列(Sequence) 与 Oracle 序列对比差异分析

《PostgreSQL序列(Sequence)与Oracle序列对比差异分析》PostgreSQL和Oracle都提供了序列(Sequence)功能,但在实现细节和使用方式上存在一些重要差异,... 目录PostgreSQL 序列(Sequence) 与 oracle 序列对比一 基本语法对比1.1 创建序

Android实现一键录屏功能(附源码)

《Android实现一键录屏功能(附源码)》在Android5.0及以上版本,系统提供了MediaProjectionAPI,允许应用在用户授权下录制屏幕内容并输出到视频文件,所以本文将基于此实现一个... 目录一、项目介绍二、相关技术与原理三、系统权限与用户授权四、项目架构与流程五、环境配置与依赖六、完整

Android实现定时任务的几种方式汇总(附源码)

《Android实现定时任务的几种方式汇总(附源码)》在Android应用中,定时任务(ScheduledTask)的需求几乎无处不在:从定时刷新数据、定时备份、定时推送通知,到夜间静默下载、循环执行... 目录一、项目介绍1. 背景与意义二、相关基础知识与系统约束三、方案一:Handler.postDel

慢sql提前分析预警和动态sql替换-Mybatis-SQL

《慢sql提前分析预警和动态sql替换-Mybatis-SQL》为防止慢SQL问题而开发的MyBatis组件,该组件能够在开发、测试阶段自动分析SQL语句,并在出现慢SQL问题时通过Ducc配置实现动... 目录背景解决思路开源方案调研设计方案详细设计使用方法1、引入依赖jar包2、配置组件XML3、核心配

Python开发文字版随机事件游戏的项目实例

《Python开发文字版随机事件游戏的项目实例》随机事件游戏是一种通过生成不可预测的事件来增强游戏体验的类型,在这篇博文中,我们将使用Python开发一款文字版随机事件游戏,通过这个项目,读者不仅能够... 目录项目概述2.1 游戏概念2.2 游戏特色2.3 目标玩家群体技术选择与环境准备3.1 开发环境3

Java NoClassDefFoundError运行时错误分析解决

《JavaNoClassDefFoundError运行时错误分析解决》在Java开发中,NoClassDefFoundError是一种常见的运行时错误,它通常表明Java虚拟机在尝试加载一个类时未能... 目录前言一、问题分析二、报错原因三、解决思路检查类路径配置检查依赖库检查类文件调试类加载器问题四、常见

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

SpringBoot基于配置实现短信服务策略的动态切换

《SpringBoot基于配置实现短信服务策略的动态切换》这篇文章主要为大家详细介绍了SpringBoot在接入多个短信服务商(如阿里云、腾讯云、华为云)后,如何根据配置或环境切换使用不同的服务商,需... 目录目标功能示例配置(application.yml)配置类绑定短信发送策略接口示例:阿里云 & 腾

如何为Yarn配置国内源的详细教程

《如何为Yarn配置国内源的详细教程》在使用Yarn进行项目开发时,由于网络原因,直接使用官方源可能会导致下载速度慢或连接失败,配置国内源可以显著提高包的下载速度和稳定性,本文将详细介绍如何为Yarn... 目录一、查询当前使用的镜像源二、设置国内源1. 设置为淘宝镜像源2. 设置为其他国内源三、还原为官方