Spark RDD惰性计算的自主优化

2023-12-08 06:20

本文主要是介绍Spark RDD惰性计算的自主优化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原创/朱季谦

RDD(弹性分布式数据集)中的数据就如final定义一般,只可读而无法修改,若要对RDD进行转换或操作,那就需要创建一个新的RDD来保存结果。故而就需要用到转换和行动的算子。

Spark运行是惰性的,在RDD转换阶段,只会记录该转换逻辑而不会执行,只有在遇到行动算子时,才会触发真正的运算,若整个生命周期都没有行动算子,那么RDD的转换代码便不会运行。

这样的惰性计算,其实是有好处的,它在遇到行动算子需要对整个DAG(有向无环图)做优化,以下是一些优化说明——

本文的样本部分内容如下,可以基于这些数据做验证——

Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
Allison Carroll,28,男,18292,环保可持续,美妆护肤,唯品会,信用卡,8,免费赠品,日常使用
Robert Rice,47,男,5347,时尚潮流,图书音像,拼多多,微信支付,8,有优惠券,兴趣爱好
Jason Bradley,25,男,9480,性价比,汽车配件,拼多多,信用卡,5,折扣优惠,促销打折
Joel Small,18,女,15586,社交影响,食品饮料,亚马逊,支付宝,5,无优惠券,日常使用
Stephanie Austin,33,男,7653,舒适度,汽车配件,亚马逊,银联支付,3,无优惠券,跟风购买
Kathy Myers,33,男,18159,舒适度,美妆护肤,亚马逊,货到付款,4,无优惠券,商品推荐
Gabrielle Mccarty,57,男,19561,环保可持续,母婴用品,网易考拉,支付宝,5,免费赠品,日常使用
Joan Smith,43,女,11896,品牌追求,图书音像,亚马逊,支付宝,4,免费赠品,商品推荐
Monica Garcia,19,男,16665,时尚潮流,电子产品,京东,货到付款,7,免费赠品,商品推荐
Christopher Faulkner,55,男,3621,社交影响,美妆护肤,苏宁易购,支付宝,7,无优惠券,日常使用

一、减少不必要的计算

RDD的惰性计算可以通过优化执行计划去避免不必要的计算,同时可以将过滤操作下推到数据源或者其他转换操作之前,减少需要处理的数据量,进而达到计算的优化。

例如,执行以下这段spark代码时,

  def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("count")val ss = SparkSession.builder().config(conf).getOrCreate()val filePath: String = "transaction_data.csv"val lineRDD = ss.sparkContext.textFile(filePath)val value = lineRDD.map { x => {println(s"打印 $x")x.split(",")} }value.take(10).foreach(println)ss.stop()}

若Spark不是惰性计算的情况下,代码顺序运行到这行 val lineRDD = ss.sparkContext.textFile(filePath)代码时,就会将transaction_data.csv文件里的几万条数据全部加载出来,然后再做计算。

而在惰性计算的情况下,直至运行这行代码 value.take(10).foreach(println)而遇到foreach这个行动算子时,才会去执行前面的转换,这时它会基于RDD的转化自行做一个优化——在这个例子里,它会基于lineRDD.take(5)这行代码只会从transaction_data.csv取出前5行,避免了将文件里的几万条数据全部取出。

打印结果如下,发现lineRDD.map确实只处理了前5条数据——

打印 Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
打印 Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
打印 Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
打印 Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
打印 Allison Carroll,28,男,18292,环保可持续,美妆护肤,唯品会,信用卡,8,免费赠品,日常使用
[Ljava.lang.String;@3c87e6b7
[Ljava.lang.String;@77bbadc
[Ljava.lang.String;@3c3a0032
[Ljava.lang.String;@7ceb4478
[Ljava.lang.String;@7fdab70c

二、操作合并和优化

Spark在执行行动算子时,会自动将存在连续转换的RDD操作合并到更为高效的执行计划,这样可以减少中间不是必要的RDD数据的生成和传输,可以整体提高计算的效率。这很像是,摆在你面前是一条弯弯曲曲的道路,但是因为你手里有地图,知道这条路是怎么走的,因此,可以基于这样的地图,去尝试发掘下是否有更好的直径。

还是以一个代码案例说明,假如需要统计薪资在10000以上的人数。

运行的代码,是从transaction_data.csv读取了几万条数据,然后将每行数据按","分割成数组,再基于每个数组去过滤出满足薪资大于10000的数据,最后再做count统计出满足条件的人数。

以下是最冗余的代码,每个步骤都转换生成一个新的RDD,彼此之间是连续的,这些RDD是会占内存空间,同时增加了很多不必要的计算。

def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("count")val ss = SparkSession.builder().config(conf).getOrCreate()val filePath: String = "transaction_data.csv"val lineRDD = ss.sparkContext.textFile(filePath)val array = lineRDD.map(_.split(","))//过滤出薪资10000的数据val valueRdd = array.filter(x => x.apply(3).toInt > 10000)//统计薪资10000以上的人数val count = valueRdd.count()ss.stop()
}

Spark就可能会将这些存在连续的RDD进行优化,将其合并成一个单独的转换操作,直接就对原始RDD进行映射和过滤——

val value = ss.sparkContext.textFile(filePath).map(_.split(",")).filter(x =>{x.apply(3).toInt > 10000})
value.count()

这样优化同时避免了多次循环遍历,每个映射的数组只需要遍历一次即可。

可以通过coalesce(1)只设置一个分区,使代码串行运行,然后增加打印验证一下效果——

val value = ss.sparkContext.textFile(filePath).coalesce(1).map(x =>{println(s"分割打印 $x")x.split(",")
}).filter(x =>{println(s"过滤打印 ${x.apply(0)}")x.apply(3).toInt > 10000})
value.count()

打印部分结果,发现没每遍历一次,就把映射数组和过滤都完成了,没有像先前多个RDD那样需要每次都得遍历,这样就能达到一定优化效果——

分割打印 Amy Harris,39,男,18561,性价比,家居用品,天猫,微信支付,10,折扣优惠,品牌忠诚
过滤打印 Amy Harris
分割打印 Lori Willis,33,女,14071,功能性,家居用品,苏宁易购,货到付款,1,折扣优惠,日常使用
过滤打印 Lori Willis
分割打印 Jim Williams,61,男,14145,时尚潮流,汽车配件,淘宝,微信支付,3,免费赠品,礼物赠送
过滤打印 Jim Williams
分割打印 Anthony Perez,19,女,11587,时尚潮流,珠宝首饰,拼多多,支付宝,5,免费赠品,商品推荐
过滤打印 Anthony Perez
分割打印 Allison Carroll,28,男,18292,环保可持续,美妆护肤,唯品会,信用卡,8,免费赠品,日常使用
过滤打印 Allison Carroll
分割打印 Robert Rice,47,男,5347,时尚潮流,图书音像,拼多多,微信支付,8,有优惠券,兴趣爱好
过滤打印 Robert Rice

这样也提醒了我们,在遇到连续转换的RDD时,其实可以自行做代码优化,避免产生中间可优化的RDD和遍历操作。

三、窄依赖优化

RDD在执行惰性计算时,会尽可能进行窄依赖优化。

有窄依赖,便会有宽依赖,两者有什么区别呢?

窄依赖指的是父RDD的每个分区只需要通过简单的转换操作就可以计算出对应的子RDD分区,不涉及跨多个分区的数据交换,即父子之间每个分区都是一对一的。

前文提到的map、filter等转换都属于窄依赖的操作。

例如,array.filter(x => x.apply(3).toInt > 10000),父RDD有三个分区,那么三个分区就会分别执行array.filter(x => x.apply(3).toInt > 10000)将过滤的数据传给子RDD对应的分区——

image

宽依赖指父RDD的每个分区会通过跨区计算将原本同一个分区数据分发到不同子分区上,这中间涉及到shuffle重新洗牌操作,会存在较大的计算,父子之间分区是一对多的。可以看到,父RDD同一个分区的数据,在宽依赖情况下,会将相同的key传输到同一个分区里,这就意味着,同一个父RDD,如果存在多个不同的key,可能会分发到多个不同的子分区上,进而出现shuffle重新洗牌操作。

image

因此,RDD会尽可能的进行窄依赖优化,在无需跨区计算的情况下,就避免进行shuffle重新洗牌操作,将父分区一对一地传输给子分区。同时,窄依赖还有一个好处是,在子分区出现丢失数据异常时,只需要重新计算对应的父分区数据即可,无需将父分区全部数据进行计算。

这篇关于Spark RDD惰性计算的自主优化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/468835

相关文章

Python并行处理实战之如何使用ProcessPoolExecutor加速计算

《Python并行处理实战之如何使用ProcessPoolExecutor加速计算》Python提供了多种并行处理的方式,其中concurrent.futures模块的ProcessPoolExecu... 目录简介完整代码示例代码解释1. 导入必要的模块2. 定义处理函数3. 主函数4. 生成数字列表5.

Java计算经纬度距离的示例代码

《Java计算经纬度距离的示例代码》在Java中计算两个经纬度之间的距离,可以使用多种方法(代码示例均返回米为单位),文中整理了常用的5种方法,感兴趣的小伙伴可以了解一下... 目录1. Haversine公式(中等精度,推荐通用场景)2. 球面余弦定理(简单但精度较低)3. Vincenty公式(高精度,

SpringBoot中HTTP连接池的配置与优化

《SpringBoot中HTTP连接池的配置与优化》这篇文章主要为大家详细介绍了SpringBoot中HTTP连接池的配置与优化的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、HTTP连接池的核心价值二、Spring Boot集成方案方案1:Apache HttpCl

PyTorch高级特性与性能优化方式

《PyTorch高级特性与性能优化方式》:本文主要介绍PyTorch高级特性与性能优化方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、自动化机制1.自动微分机制2.动态计算图二、性能优化1.内存管理2.GPU加速3.多GPU训练三、分布式训练1.分布式数据

MySQL中like模糊查询的优化方案

《MySQL中like模糊查询的优化方案》在MySQL中,like模糊查询是一种常用的查询方式,但在某些情况下可能会导致性能问题,本文将介绍八种优化MySQL中like模糊查询的方法,需要的朋友可以参... 目录1. 避免以通配符开头的查询2. 使用全文索引(Full-text Index)3. 使用前缀索

C#实现高性能Excel百万数据导出优化实战指南

《C#实现高性能Excel百万数据导出优化实战指南》在日常工作中,Excel数据导出是一个常见的需求,然而,当数据量较大时,性能和内存问题往往会成为限制导出效率的瓶颈,下面我们看看C#如何结合EPPl... 目录一、技术方案核心对比二、各方案选型建议三、性能对比数据四、核心代码实现1. MiniExcel

windows和Linux使用命令行计算文件的MD5值

《windows和Linux使用命令行计算文件的MD5值》在Windows和Linux系统中,您可以使用命令行(终端或命令提示符)来计算文件的MD5值,文章介绍了在Windows和Linux/macO... 目录在Windows上:在linux或MACOS上:总结在Windows上:可以使用certuti

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

Python通过模块化开发优化代码的技巧分享

《Python通过模块化开发优化代码的技巧分享》模块化开发就是把代码拆成一个个“零件”,该封装封装,该拆分拆分,下面小编就来和大家简单聊聊python如何用模块化开发进行代码优化吧... 目录什么是模块化开发如何拆分代码改进版:拆分成模块让模块更强大:使用 __init__.py你一定会遇到的问题模www.

SpringBoot首笔交易慢问题排查与优化方案

《SpringBoot首笔交易慢问题排查与优化方案》在我们的微服务项目中,遇到这样的问题:应用启动后,第一笔交易响应耗时高达4、5秒,而后续请求均能在毫秒级完成,这不仅触发监控告警,也极大影响了用户体... 目录问题背景排查步骤1. 日志分析2. 性能工具定位优化方案:提前预热各种资源1. Flowable