Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别

2024-05-03 05:48

本文主要是介绍Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.Spark性能调优:checkPoint的使用

https://blog.csdn.net/leen0304/article/details/78718346

 

概述

    checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面,计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)。

 

使用Checkpoint

    使用checkpoint 需要 先设置 checkpoint 的目录,例如如下代码:

val sparkConf = new SparkConfsparkConf.setAppName("JOINSkewedData").set("spark.sql.autoBroadcastJoinThreshold", "1048576") //1M broadcastJOIN//.set("spark.sql.autoBroadcastJoinThreshold", "104857600") //100M broadcastJOIN.set("spark.sql.shuffle.partitions", "3")if (args.length > 0 && args(0).equals("ide")) {sparkConf.setMaster("local[3]")}val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")sparkContext.setCheckpointDir("file:///D:/checkpoint/")

不同环境的设置代码

有的时候需要本地调试,需要设置为windows 或者 linux 的本地目录

windows 

 sparkContext.setCheckpointDir("file:///D:/checkpoint/")

linux 

sparkContext.setCheckpointDir("file:///tmp/checkpoint")

hdfs

sparkContext.setCheckpointDir("hdfs://leen:8020/checkPointDir")

 

调用checkpoint 

使用 checkpoint 的时候,需要在建立 checkpoint 的 rdd 上进行函数调用即可

rdd.checkpoint

 

注意 :

  使用 checkpoint 的时候,建议先将 rdd.cache 一次,因为 checkpoint 是 transform 算子,

  执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()

rdd.checkpoint()

rdd.collect

 

 

Sparkstreaming 中的 checkpoint

   在streaming中使用checkpoint主要包含以下两点:设置checkpoint目录,初始化StreamingContext时调用getOrCreate方法,即当checkpoint目录没有数据时,则新建streamingContext实例,并且设置checkpoint目录,否则从checkpoint目录中读取相关配置和数据创建streamingcontext。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...)   // new contextval lines = ssc.socketTextStream(...) // create DStreams...ssc.checkpoint(checkpointDirectory)   // set checkpoint directoryssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

 

 

Checkpoint 与 cache 区别

  checkpoint 与 cache 是不一样的,checkpoint 会切除前面算子的rdd 依赖, 而 cache 是将数据暂存在一个具体的位置。

 

rdd 的 checkpoint 实现

/*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}}

 

dataframe 的 checkpoint 实现

 /*** Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate* the logical plan of this Dataset, which is especially useful in iterative algorithms where the* plan may grow exponentially. It will be saved to files inside the checkpoint* directory set with `SparkContext#setCheckpointDir`.** @group basic* @since 2.1.0*/@Experimental@InterfaceStability.Evolvingdef checkpoint(): Dataset[T] = checkpoint(eager = true)

 

 

这篇关于Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/955958

相关文章

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与

Python使用smtplib库开发一个邮件自动发送工具

《Python使用smtplib库开发一个邮件自动发送工具》在现代软件开发中,自动化邮件发送是一个非常实用的功能,无论是系统通知、营销邮件、还是日常工作报告,Python的smtplib库都能帮助我们... 目录代码实现与知识点解析1. 导入必要的库2. 配置邮件服务器参数3. 创建邮件发送类4. 实现邮件

Go语言中Recover机制的使用

《Go语言中Recover机制的使用》Go语言的recover机制通过defer函数捕获panic,实现异常恢复与程序稳定性,具有一定的参考价值,感兴趣的可以了解一下... 目录引言Recover 的基本概念基本代码示例简单的 Recover 示例嵌套函数中的 Recover项目场景中的应用Web 服务器中

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

CnPlugin是PL/SQL Developer工具插件使用教程

《CnPlugin是PL/SQLDeveloper工具插件使用教程》:本文主要介绍CnPlugin是PL/SQLDeveloper工具插件使用教程,具有很好的参考价值,希望对大家有所帮助,如有错... 目录PL/SQL Developer工具插件使用安装拷贝文件配置总结PL/SQL Developer工具插

SpringBoot3中使用虚拟线程的完整步骤

《SpringBoot3中使用虚拟线程的完整步骤》在SpringBoot3中使用Java21+的虚拟线程(VirtualThreads)可以显著提升I/O密集型应用的并发能力,这篇文章为大家介绍了详细... 目录1. 环境准备2. 配置虚拟线程方式一:全局启用虚拟线程(Tomcat/Jetty)方式二:异步

使用Python实现base64字符串与图片互转的详细步骤

《使用Python实现base64字符串与图片互转的详细步骤》要将一个Base64编码的字符串转换为图片文件并保存下来,可以使用Python的base64模块来实现,这一过程包括解码Base64字符串... 目录1. 图片编码为 Base64 字符串2. Base64 字符串解码为图片文件3. 示例使用注意

使用Python实现获取屏幕像素颜色值

《使用Python实现获取屏幕像素颜色值》这篇文章主要为大家详细介绍了如何使用Python实现获取屏幕像素颜色值,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 一、一个小工具,按住F10键,颜色值会跟着显示。完整代码import tkinter as tkimport pyau

Linux使用scp进行远程目录文件复制的详细步骤和示例

《Linux使用scp进行远程目录文件复制的详细步骤和示例》在Linux系统中,scp(安全复制协议)是一个使用SSH(安全外壳协议)进行文件和目录安全传输的命令,它允许在远程主机之间复制文件和目录,... 目录1. 什么是scp?2. 语法3. 示例示例 1: 复制本地目录到远程主机示例 2: 复制远程主