Spark 源码解析 : DAGScheduler中的DAG划分与提交

2023-12-13 07:48

本文主要是介绍Spark 源码解析 : DAGScheduler中的DAG划分与提交,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

一、Spark 运行架构

 

Spark 运行架构如下图:

各个RDD之间存在着依赖关系,这些依赖关系形成有向无环图DAG,DAGScheduler对这些依赖关系形成的DAG,进行Stage划分,划分的规则很简单,从后往前回溯,遇到窄依赖加入本stage,遇见宽依赖进行Stage切分。完成了Stage的划分,DAGScheduler基于每个Stage生成TaskSet,并将TaskSet提交给TaskScheduler。TaskScheduler 负责具体的task调度,在Worker节点上启动task。

 

 

 

二、源码解析:DAGScheduler中的DAG划分

    当RDD触发一个Action操作(如:colllect)后,导致SparkContext.runJob的执行。而在SparkContext的run方法中会调用DAGScheduler的run方法最终调用了DAGScheduler的submit方法:

 
  1. def submitJob[T, U](
  2. rdd: RDD[T],
  3. func: (TaskContext, Iterator[T]) => U,
  4. partitions: Seq[Int],
  5. callSite: CallSite,
  6. resultHandler: (Int, U) => Unit,
  7. properties: Properties): JobWaiter[U] = {
  8. // Check to make sure we are not launching a task on a partition that does not exist.
  9. val maxPartitions = rdd.partitions.length
  10. partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  11. throw new IllegalArgumentException(
  12. "Attempting to access a non-existent partition: " + p + ". " +
  13. "Total number of partitions: " + maxPartitions)
  14. }
  15. val jobId = nextJobId.getAndIncrement()
  16. if (partitions.size == 0) {
  17. // Return immediately if the job is running 0 tasks
  18. return new JobWaiter[U](this, jobId, 0, resultHandler)
  19. }
  20. assert(partitions.size > 0)
  21. val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  22. val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  23. //给eventProcessLoop发送JobSubmitted消息
  24. eventProcessLoop.post(JobSubmitted(
  25. jobId, rdd, func2, partitions.toArray, callSite, waiter,
  26. SerializationUtils.clone(properties)))
  27. waiter
  28. }

 

DAGScheduler的submit方法中,像eventProcessLoop对象发送了JobSubmitted消息。eventProcessLoop是DAGSchedulerEventProcessLoop类的对象

 

 
  1. private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

 

DAGSchedulerEventProcessLoop,接收各种消息并进行处理,处理的逻辑在其doOnReceive方法中:

 

 
  1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  2.    //Job提交
  1. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
  2. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  3. case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
  4. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
  5. case StageCancelled(stageId) =>
  6. dagScheduler.handleStageCancellation(stageId)
  7. case JobCancelled(jobId) =>
  8. dagScheduler.handleJobCancellation(jobId)
  9. case JobGroupCancelled(groupId) =>
  10. dagScheduler.handleJobGroupCancelled(groupId)
  11. case AllJobsCancelled =>
  12. dagScheduler.doCancelAllJobs()
  13. case ExecutorAdded(execId, host) =>
  14. dagScheduler.handleExecutorAdded(execId, host)
  15. case ExecutorLost(execId) =>
  16. dagScheduler.handleExecutorLost(execId, fetchFailed = false)
  17. case BeginEvent(task, taskInfo) =>
  18. dagScheduler.handleBeginEvent(task, taskInfo)
  19. case GettingResultEvent(taskInfo) =>
  20. dagScheduler.handleGetTaskResult(taskInfo)
  21. case completion: CompletionEvent =>
  22. dagScheduler.handleTaskCompletion(completion)
  23. case TaskSetFailed(taskSet, reason, exception) =>
  24. dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
  25. case ResubmitFailedStages =>
  26. dagScheduler.resubmitFailedStages()
  27. }

 

可以把DAGSchedulerEventProcessLoop理解成DAGScheduler的对外的功能接口。它对外隐藏了自己内部实现的细节。无论是内部还是外部消息,DAGScheduler可以共用同一消息处理代码,逻辑清晰,处理方式统一。

 

接下来分析DAGScheduler的Stage划分,handleJobSubmitted方法首先创建ResultStage

 

 
  1. try {
  2. //创建新stage可能出现异常,比如job运行依赖hdfs文文件被删除
  3. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  4. } catch {
  5. case e: Exception =>
  6. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  7. listener.jobFailed(e)
  8. return
  9. }

 

然后调用submitStage方法,进行stage的划分。

 

 

 

首先由finalRDD获取它的父RDD依赖,判断依赖类型,如果是窄依赖,则将父RDD压入栈中,如果是宽依赖,则作为父Stage。

 

看一下源码的具体过程:

 

 
  1. private def getMissingParentStages(stage: Stage): List[Stage] = {
  2. val missing = new HashSet[Stage] //存储需要返回的父Stage
  3. val visited = new HashSet[RDD[_]] //存储访问过的RDD
  4. //自己建立栈,以免函数的递归调用导致
  5. val waitingForVisit = new Stack[RDD[_]]
  6.  
  7. def visit(rdd: RDD[_]) {
  8. if (!visited(rdd)) {
  9. visited += rdd
  10. val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
  11. if (rddHasUncachedPartitions) {
  12. for (dep <- rdd.dependencies) {
  13. dep match {
  14. case shufDep: ShuffleDependency[_, _, _] =>
  15. val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
  16. if (!mapStage.isAvailable) {
  17. missing += mapStage //遇到宽依赖,加入父stage
  18. }
  19. case narrowDep: NarrowDependency[_] =>
  20. waitingForVisit.push(narrowDep.rdd) //窄依赖入栈,
  21. }
  22. }
  23. }
  24. }
  25. }
  26.  
  27.    //回溯的起始RDD入栈
  28. waitingForVisit.push(stage.rdd)
  29. while (waitingForVisit.nonEmpty) {
  30. visit(waitingForVisit.pop())
  31. }
  32. missing.toList
  33. }

 

getMissingParentStages方法是由当前stage,返回他的父stage,父stage的创建由getShuffleMapStage返回,最终会调用newOrUsedShuffleStage方法返回ShuffleMapStage

 

 
  1. private def newOrUsedShuffleStage(
  2. shuffleDep: ShuffleDependency[_, _, _],
  3. firstJobId: Int): ShuffleMapStage = {
  4. val rdd = shuffleDep.rdd
  5. val numTasks = rdd.partitions.length
  6. val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
  7. if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
  8. //Stage已经被计算过,从MapOutputTracker中获取计算结果
  9. val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
  10. val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
  11. (0 until locs.length).foreach { i =>
  12. if (locs(i) ne null) {
  13. // locs(i) will be null if missing
  14. stage.addOutputLoc(i, locs(i))
  15. }
  16. }
  17. } else {
  18. // Kind of ugly: need to register RDDs with the cache and map output tracker here
  19. // since we can't do it in the RDD constructor because # of partitions is unknown
  20. logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
  21. mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  22. }
  23. stage
  24. }

 

现在父Stage已经划分好,下面看看你Stage的提交逻辑

 

 
  1. /** Submits stage, but first recursively submits any missing parents. */
  2. private def submitStage(stage: Stage) {
  3. val jobId = activeJobForStage(stage)
  4. if (jobId.isDefined) {
  5. logDebug("submitStage(" + stage + ")")
  6. if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
  7. val missing = getMissingParentStages(stage).sortBy(_.id)
  8. logDebug("missing: " + missing)
  9. if (missing.isEmpty) {
  10. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  11. //如果没有父stage,则提交当前stage
  12. submitMissingTasks(stage, jobId.get)
  13. } else {
  14. for (parent <- missing) {
  15. //如果有父stage,则递归提交父stage
  16. submitStage(parent)
  17. }
  18. waitingStages += stage
  19. }
  20. }
  21. } else {
  22. abortStage(stage, "No active job for stage " + stage.id, None)
  23. }
  24. }

 

提交的过程很简单,首先当前stage获取父stage,如果父stage为空,则当前Stage为起始stage,交给submitMissingTasks处理,如果当前stage不为空,则递归调用submitStage进行提交。

 

到这里,DAGScheduler中的DAG划分与提交就讲完了,下次解析这些stage是如果封装成TaskSet交给TaskScheduler以及TaskSchedule的调度过程。

 

这篇关于Spark 源码解析 : DAGScheduler中的DAG划分与提交的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/487648

相关文章

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

深度解析Java项目中包和包之间的联系

《深度解析Java项目中包和包之间的联系》文章浏览阅读850次,点赞13次,收藏8次。本文详细介绍了Java分层架构中的几个关键包:DTO、Controller、Service和Mapper。_jav... 目录前言一、各大包1.DTO1.1、DTO的核心用途1.2. DTO与实体类(Entity)的区别1

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

使用Python绘制3D堆叠条形图全解析

《使用Python绘制3D堆叠条形图全解析》在数据可视化的工具箱里,3D图表总能带来眼前一亮的效果,本文就来和大家聊聊如何使用Python实现绘制3D堆叠条形图,感兴趣的小伙伴可以了解下... 目录为什么选择 3D 堆叠条形图代码实现:从数据到 3D 世界的搭建核心代码逐行解析细节优化应用场景:3D 堆叠图

深度解析Python装饰器常见用法与进阶技巧

《深度解析Python装饰器常见用法与进阶技巧》Python装饰器(Decorator)是提升代码可读性与复用性的强大工具,本文将深入解析Python装饰器的原理,常见用法,进阶技巧与最佳实践,希望可... 目录装饰器的基本原理函数装饰器的常见用法带参数的装饰器类装饰器与方法装饰器装饰器的嵌套与组合进阶技巧

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

全面解析MySQL索引长度限制问题与解决方案

《全面解析MySQL索引长度限制问题与解决方案》MySQL对索引长度设限是为了保持高效的数据检索性能,这个限制不是MySQL的缺陷,而是数据库设计中的权衡结果,下面我们就来看看如何解决这一问题吧... 目录引言:为什么会有索引键长度问题?一、问题根源深度解析mysql索引长度限制原理实际场景示例二、五大解决

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实