Hadoop2源码分析-YARN 的服务库和事件库

2024-05-27 12:32

本文主要是介绍Hadoop2源码分析-YARN 的服务库和事件库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.概述

  在《Hadoop2源码分析-YARN RPC 示例介绍》一文当中,给大家介绍了YARN 的 RPC 机制,以及相关代码的演示,今天我们继续去学习 YARN 的服务库和事件库,分享目录如下所示:

  • 服务库和事件库介绍
  • 使用示例
  • 截图预览

  下面开始今天的内容分享。

2.服务库和事件库介绍

2.1服务库

  YARN对于生命周期较长的对象使用服务的对象模型进行管理,主要特点如下:

  • 用于被服务化的对象包含4个状态,他们分别是:被创建、已初始化、已启动和已停止。源代码地址在 org.apache.hadoop.service 的 Service 接口中,内容如下所示:
复制代码
public enum STATE {/** Constructed but not initialized */NOTINITED(0, "NOTINITED"),/** Initialized but not started or stopped */INITED(1, "INITED"),/** started and not stopped */STARTED(2, "STARTED"),/** stopped. No further state transitions are permitted */STOPPED(3, "STOPPED");/*** An integer value for use in array lookup and JMX interfaces.* Although {@link Enum#ordinal()} could do this, explicitly* identify the numbers gives more stability guarantees over time.*/private final int value;/*** A name of the state that can be used in messages*/private final String statename;private STATE(int value, String name) {this.value = value;this.statename = name;}/*** Get the integer value of a state* @return the numeric value of the state*/public int getValue() {return value;}/*** Get the name of a state* @return the state's name*/@Overridepublic String toString() {return statename;}}
复制代码
public abstract class AbstractService implements Service {// ......

}

  通过阅读代码,我们可以看出,服务的对象它实现了接口Service,并定义了最基本的服务状态:创建、初始化、启动以及停止。对于 AbstractService 类来说,它实现了 Service 接口。

  • 任何服务状态的变化都可以触发其他的动作,例如:
复制代码
public void start() {if (isInState(STATE.STARTED)) {return;}//enter the started statesynchronized (stateChangeLock) {if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {try {startTime = System.currentTimeMillis();serviceStart();if (isInState(STATE.STARTED)) {//if the service started (and isn't now in a later state), notifyif (LOG.isDebugEnabled()) {LOG.debug("Service " + getName() + " is started");}notifyListeners();}} catch (Exception e) {noteFailure(e);ServiceOperations.stopQuietly(LOG, this);throw ServiceStateException.convert(e);}}}}
复制代码

  这里,我们会去触发一个监听动作,全局监听状态的改变,异常的捕捉监听等。

  • 可以通过组合的方式进行服务组合,这样做的好处是便于统一去管理:在 YARN 中,如果是非组合服务,可以直接继承 AbstractService 类,否则需继承 CompositeService。

2.2事件库

  在 YARN 中,核心服务其本质就是一个中央异步调度器,包含有ResourceManager、 NodeManager、MRAppMaster等内容,YARN 事件与事件处理器的关系在 

org.apache.hadoop.yarn.event  中。在使用 YARN 事件库的时候,需要先定义一个中央异步调度器 AsyncDispatcher,它负责事件的处理与转发,然后我们根据实际业务需求定义一系列事件 Event 与事件处理器 EventHandler,并将事件注册到中央异步调度器中用于完成事件统一管理和应用调度。流程如下图所示:

3.使用示例

  接下来,我们编写示例代码,去代码中理解这部分流程。

  • 首先是 JMRAppMaster 类:
复制代码
package cn.hadoop.task.exec;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;import cn.hadoop.task.CompositeService;
import cn.hadoop.task.JobEvent;
import cn.hadoop.task.JobEventType;
import cn.hadoop.task.TaskEvent;
import cn.hadoop.task.TaskEventType;/*** @Date Jul 22, 2015** @Author dengjie** @Note TODO*/
public class JMRAppMaster extends CompositeService {private Dispatcher dispatcher; // AsyncDispatcherprivate String jobID;private int taskNumber; // include numbersprivate String[] taskIDs; // include all taskpublic JMRAppMaster(String name, String jobID, int taskNumber) {super(name);this.jobID = jobID;this.taskNumber = taskNumber;taskIDs = new String[taskNumber];for (int i = 0; i < taskNumber; i++) {taskIDs[i] = new String(this.jobID + "_task_" + i);}}public void serviceInit(Configuration conf) throws Exception {dispatcher = new AsyncDispatcher();// default a AsyncDispatcherdispatcher.register(JobEventType.class, new JobEventDispatcher());// register a jobdispatcher.register(TaskEventType.class, new TaskEventDispatcher());// register a task
        addService((Service) dispatcher);super.serviceInit(conf);}public Dispatcher getDispatcher() {return dispatcher;}private class JobEventDispatcher implements EventHandler<JobEvent> {@SuppressWarnings("unchecked")public void handle(JobEvent event) {if (event.getType() == JobEventType.JOB_KILL) {System.out.println("Receive JOB_KILL event, killing all the tasks");for (int i = 0; i < taskNumber; i++) {dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));}} else if (event.getType() == JobEventType.JOB_INIT) {System.out.println("Receive JOB_INIT event, scheduling tasks");for (int i = 0; i < taskNumber; i++) {dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));}}}}private class TaskEventDispatcher implements EventHandler<TaskEvent> {public void handle(TaskEvent event) {if (event.getType() == TaskEventType.T_KILL) {System.out.println("Receive T_KILL event of task id " + event.getTaskID());} else if (event.getType() == TaskEventType.T_SCHEDULE) {System.out.println("Receive T_SCHEDULE event of task id " + event.getTaskID());}}}
}
复制代码

  另外,还需要添加一些其他类,这些类以来可以在 Hadoop 源码工程中找到,这里就不贴代码了,大家可以到 Hadoop 工程的源码中找到对应的类,相关类名如下图所示:

  接下来是一个测试类,去测试一下我们所编写的示例流程。

  • JMRAppMasterTest类:

复制代码
package cn.hadoop.rpc.test.yarn.task;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import cn.hadoop.task.JobEvent;
import cn.hadoop.task.JobEventType;
import cn.hadoop.task.exec.JMRAppMaster;/*** @Date Jul 22, 2015** @Author dengjie** @Note TODO*/
public class JMRAppMasterTest {@SuppressWarnings({ "unchecked", "resource" })public static void main(String[] args) {String jobID = "job_20150723_11";JMRAppMaster appMaster = new JMRAppMaster("Simple MRAppMaster Test", jobID, 10);YarnConfiguration conf = new YarnConfiguration(new Configuration());try {appMaster.serviceInit(conf);appMaster.serviceStart();} catch (Exception e) {e.printStackTrace();}appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT));}
}
复制代码

4.截图预览

  在编写完成相关流程代码后,我们运行代码来观察整个流程,截图如下所示:

5.总结

  在编写这部分流程代码时,可以参考 Hadoop YARN 部分的工程源码,通过运行调试代码,掌握对事件库和服务库的流程,以及它们的工作机制。另外,在编写的过程当中,最好将源码的文件引入到自己的工程,不要单独使用 JAR 包的方式导入,由于我们是独立运行某个模块,需要改动源代码的函数访问权限,若是直接引入 JAR 包地址,会导致函数修饰权限问题而不能运行,这里大家在运行调试的时候注意即可。

这篇关于Hadoop2源码分析-YARN 的服务库和事件库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007485

相关文章

sysmain服务可以禁用吗? 电脑sysmain服务关闭后的影响与操作指南

《sysmain服务可以禁用吗?电脑sysmain服务关闭后的影响与操作指南》在Windows系统中,SysMain服务(原名Superfetch)作为一个旨在提升系统性能的关键组件,一直备受用户关... 在使用 Windows 系统时,有时候真有点像在「开盲盒」。全新安装系统后的「默认设置」,往往并不尽编

Python 基于http.server模块实现简单http服务的代码举例

《Python基于http.server模块实现简单http服务的代码举例》Pythonhttp.server模块通过继承BaseHTTPRequestHandler处理HTTP请求,使用Threa... 目录测试环境代码实现相关介绍模块简介类及相关函数简介参考链接测试环境win11专业版python

Nginx中配置使用非默认80端口进行服务的完整指南

《Nginx中配置使用非默认80端口进行服务的完整指南》在实际生产环境中,我们经常需要将Nginx配置在其他端口上运行,本文将详细介绍如何在Nginx中配置使用非默认端口进行服务,希望对大家有所帮助... 目录一、为什么需要使用非默认端口二、配置Nginx使用非默认端口的基本方法2.1 修改listen指令

SysMain服务可以关吗? 解决SysMain服务导致的高CPU使用率问题

《SysMain服务可以关吗?解决SysMain服务导致的高CPU使用率问题》SysMain服务是超级预读取,该服务会记录您打开应用程序的模式,并预先将它们加载到内存中以节省时间,但它可能占用大量... 在使用电脑的过程中,CPU使用率居高不下是许多用户都遇到过的问题,其中名为SysMain的服务往往是罪魁

Android 缓存日志Logcat导出与分析最佳实践

《Android缓存日志Logcat导出与分析最佳实践》本文全面介绍AndroidLogcat缓存日志的导出与分析方法,涵盖按进程、缓冲区类型及日志级别过滤,自动化工具使用,常见问题解决方案和最佳实... 目录android 缓存日志(Logcat)导出与分析全攻略为什么要导出缓存日志?按需过滤导出1. 按

解决若依微服务框架启动报错的问题

《解决若依微服务框架启动报错的问题》Invalidboundstatement错误通常由MyBatis映射文件未正确加载或Nacos配置未读取导致,需检查XML的namespace与方法ID是否匹配,... 目录ruoyi-system模块报错报错详情nacos文件目录总结ruoyi-systnGLNYpe

Linux中的HTTPS协议原理分析

《Linux中的HTTPS协议原理分析》文章解释了HTTPS的必要性:HTTP明文传输易被篡改和劫持,HTTPS通过非对称加密协商对称密钥、CA证书认证和混合加密机制,有效防范中间人攻击,保障通信安全... 目录一、什么是加密和解密?二、为什么需要加密?三、常见的加密方式3.1 对称加密3.2非对称加密四、

MySQL中读写分离方案对比分析与选型建议

《MySQL中读写分离方案对比分析与选型建议》MySQL读写分离是提升数据库可用性和性能的常见手段,本文将围绕现实生产环境中常见的几种读写分离模式进行系统对比,希望对大家有所帮助... 目录一、问题背景介绍二、多种解决方案对比2.1 原生mysql主从复制2.2 Proxy层中间件:ProxySQL2.3

Nginx进行平滑升级的实战指南(不中断服务版本更新)

《Nginx进行平滑升级的实战指南(不中断服务版本更新)》Nginx的平滑升级(也称为热升级)是一种在不停止服务的情况下更新Nginx版本或添加模块的方法,这种升级方式确保了服务的高可用性,避免了因升... 目录一.下载并编译新版Nginx1.下载解压2.编译二.替换可执行文件,并平滑升级1.替换可执行文件

python使用Akshare与Streamlit实现股票估值分析教程(图文代码)

《python使用Akshare与Streamlit实现股票估值分析教程(图文代码)》入职测试中的一道题,要求:从Akshare下载某一个股票近十年的财务报表包括,资产负债表,利润表,现金流量表,保存... 目录一、前言二、核心知识点梳理1、Akshare数据获取2、Pandas数据处理3、Matplotl