Spark Checkpoint读操作代码分析

2024-05-27 12:58

本文主要是介绍Spark Checkpoint读操作代码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Spark RDD缓存代码分析》
   《Spark Task序列化代码分析》
   《Spark分区器HashPartitioner和RangePartitioner代码详解》
   《Spark Checkpoint读操作代码分析》
   《Spark Checkpoint写操作代码分析》

  上次介绍了RDD的Checkpint写过程(《Spark Checkpoint写操作代码分析》),本文将介绍RDD如何读取已经Checkpint的数据。在RDD Checkpint完之后,Checkpint的信息(比如数据存放的目录)都由RDDCheckpointData去管理,所以当下次计算依赖了这个RDD的时候,首先是根据依赖关系判断出当前这个RDD是否被Checkpint了,主要是通过RDD的dependencies决定:

final def dependencies : Seq[Dependency[ _ ]] = {
   checkpointRDD.map(r = > List( new OneToOneDependency(r))).getOrElse {
     if (dependencies _ == null ) {
       dependencies _ = getDependencies
     }
     dependencies _
   }
}

  如果RDD被Checkpint了,那么checkpointRDD为Some(CheckpointRDD[T])了,所以依赖的RDD变成了CheckpointRDD。在计算数据的过程中会调用RDD的iterator方法:

final def iterator(split : Partition, context : TaskContext) : Iterator[T] = {
   if (storageLevel ! = StorageLevel.NONE) {
     < span class = "wp_keywordlink_affiliate" >< a href = "http://www.iteblog.com/archives/tag/spark" title = "" target = "_blank" data-original-title = "View all posts in Spark" > Spark < /a >< /span > Env.get.cacheManager.getOrCompute( this , split, context, storageLevel)
   } else {
     computeOrReadCheckpoint(split, context)
   }
}
private [spark] def computeOrReadCheckpoint(split : Partition, context : TaskContext) : Iterator[T] =
{
    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}

  计算的过程中首先会判断RDD是否被Checkpint了,而RDD Checkpint写之后这个条件肯定是true的。而firstParent已经变成了CheckpointRDD,所以会调用CheckpointRDD的iterator方法, 该方法最终会调用ReliableCheckpointRDD的compute方法:

override def compute(split : Partition, context : TaskContext) : Iterator[T] = {
   val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
   ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}

  在compute方法中会通过ReliableCheckpointRDD的readCheckpointFile方法来从file路径里面读出已经Checkpint的数据,readCheckpointFile的实现如下:

def readCheckpointFile[T](
     path : Path,
     broadcastedConf : Broadcast[SerializableConfiguration],
     context : TaskContext) : Iterator[T] = {
   val env = < span class = "wp_keywordlink_affiliate" >< a href = "http://www.iteblog.com/archives/tag/spark" title = "" target = "_blank" data-original-title = "View all posts in Spark" > Spark < /a >< /span > Env.get
   val fs = path.getFileSystem(broadcastedConf.value.value)
   val bufferSize = env.conf.getInt( "spark.buffer.size" , 65536 )
   val fileInputStream = fs.open(path, bufferSize)
   val serializer = env.serializer.newInstance()
   val deserializeStream = serializer.deserializeStream(fileInputStream)
   // Register an on-task-completion callback to close the input stream.
   context.addTaskCompletionListener(context = > deserializeStream.close())
   deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}

最后数据就回被全部读取出来,整个Checkpint读过程完成了。


 转载自过往记忆(http://www.iteblog.com/)

这篇关于Spark Checkpoint读操作代码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007540

相关文章

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

Python中pywin32 常用窗口操作的实现

《Python中pywin32常用窗口操作的实现》本文主要介绍了Python中pywin32常用窗口操作的实现,pywin32主要的作用是供Python开发者快速调用WindowsAPI的一个... 目录获取窗口句柄获取最前端窗口句柄获取指定坐标处的窗口根据窗口的完整标题匹配获取句柄根据窗口的类别匹配获取句

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

Python位移操作和位运算的实现示例

《Python位移操作和位运算的实现示例》本文主要介绍了Python位移操作和位运算的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 位移操作1.1 左移操作 (<<)1.2 右移操作 (>>)注意事项:2. 位运算2.1

利用Python调试串口的示例代码

《利用Python调试串口的示例代码》在嵌入式开发、物联网设备调试过程中,串口通信是最基础的调试手段本文将带你用Python+ttkbootstrap打造一款高颜值、多功能的串口调试助手,需要的可以了... 目录概述:为什么需要专业的串口调试工具项目架构设计1.1 技术栈选型1.2 关键类说明1.3 线程模

Python ZIP文件操作技巧详解

《PythonZIP文件操作技巧详解》在数据处理和系统开发中,ZIP文件操作是开发者必须掌握的核心技能,Python标准库提供的zipfile模块以简洁的API和跨平台特性,成为处理ZIP文件的首选... 目录一、ZIP文件操作基础三板斧1.1 创建压缩包1.2 解压操作1.3 文件遍历与信息获取二、进阶技

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

Java中字符串转时间与时间转字符串的操作详解

《Java中字符串转时间与时间转字符串的操作详解》Java的java.time包提供了强大的日期和时间处理功能,通过DateTimeFormatter可以轻松地在日期时间对象和字符串之间进行转换,下面... 目录一、字符串转时间(一)使用预定义格式(二)自定义格式二、时间转字符串(一)使用预定义格式(二)自

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

Java程序进程起来了但是不打印日志的原因分析

《Java程序进程起来了但是不打印日志的原因分析》:本文主要介绍Java程序进程起来了但是不打印日志的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java程序进程起来了但是不打印日志的原因1、日志配置问题2、日志文件权限问题3、日志文件路径问题4、程序