Spark - Resilient Distributed Datasets (RDDs)介绍

2023-11-02 09:51

本文主要是介绍Spark - Resilient Distributed Datasets (RDDs)介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

RDD介绍

RDD构建

RDD分区数

RDD算子

Transformation算子 

Action算子

RDD持久化

RDD缓存

检查点

RDD共享变量

广播变量

累加器Accumulator


RDD介绍

Resilient Distributed DataSets,弹性分布式数据集,可以把RDD看作一种分布式集合。其RDD本身不存储数据,数据实际存储在内存或磁盘上。同时RDD中的数据不可更改,只能通过算子生成一个新的RDD实现对数据的修改。一个父RDD可以被多个下游RDD依赖,为避免父RDD重复计算,可对父RDD的计算结果缓存。RDD可定义分区规则,实现多线程并发处理RDD中的数据

RDD构建

spark-shell启动spark客户端 sh spark-shell --master local

本地集合,通过sc.parallelize转变为RDD

本地集合,通过sc.makeRDD转变为RDD

本地文件,通过sc.textFile转变为RDD

外部文件,通过sc.textFile转变为RDD

RDD分区数

RDD的分区数建议与CPU核数保持一致,为充分利用CPU性能,可设置为CPU核数的2~3倍,启动时,通过CPU核数确定线程并行度参数   

spark.default.parallelism = 指定的CPU核数(集群模式最小2)

等同于 --master local[指定的CPU核数]

  • 对于本地集合方式构建RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 8)

       若指定分区数,采用指定的分区数;若未指定,采用spark.default.parallelism参数值

  • 对于本地/外部文件方式构建RDD
    rdd = sc.textFile(name="file:///export/data/workspace/parent/spark_core/data/", minPartitions=3)
    

    若指定了分区数 minPartitions,就采用指定的分区数;若没有指定分区数,

       defaultMinPartition = (spark.default.parallelism,2)

       RDD分区数 = max(本地file的分片数,defaultMinPartition)

       RDD分区数 =  max(hdfs文件的block数量,defaultMinPartition)

  • wholeTextFiles

        默认,一个目录有多少文件,就对应多少分区,为避免分区数过多,增加线程和IO负                     担,wholeTextFiles专门用于处理小文件,其尽可能减少分区数

rdd = sc.wholeTextFiles(path="file:///export/data/workspace/parent/spark_core/data")
  • 调整分区数       

repartition、coalesce、partitionBy 三个算子可以调整RDD分区数
# 增大分区
rdd = rdd.repartition(5).glom().collect()# 减小分区
rdd = rdd.repartition(5).glom().collect()
# repartition本质上是coalesce的一种当参数2为True的简写方案,# coalesce
# 参数1: 分区数
# 参数2:表示是否存在shuffle, 默认为false
# 参数2为False 只能减少分区,为True可以增大分区
rdd = rdd.coalesce(5).glom().collect()# partitionBy: 专门针对kv类型重分区的函数
# 默认: 根据key进行Hash取模划分操作 ,也可自定义分区规则rm = sc.parallelize([(1, 'c01'), (2, 'c02'), (3, 'c03'),(4, 'c04'), (5, 'c05'), (6, 'c06'),(7, 'c07'), (8, 'c08'), (9, 'c09'),(10, 'c10')], 3)rdd = rm.partitionBy(5).glom().collect()# 自定义分区规则rm1 = rm.partitionBy(2, lambda key: 0 if key <= 5 else 1).glom().collect()

RDD算子

RDD对象中提供的一系列具有特定动能的函数

Transformation算子 

定义RDD的计算规则,不会触发JOB执行,须通过Action算子触发执行

Action算子

产生一个Job任务,触发执行,运行这个Action算子所依赖的所有Transformation算子

RDD持久化

为避免RDD重复计算或RDD数据损坏,其RDD计算结果可以持久化,即RDD数据可以保存到内存或磁盘

RDD缓存

即RDD计算结果的临时缓存, 缓存数据可以保存到内存(executor内存空间),也可以保存到磁盘, 甚至支持将缓存数据保存到堆外内存(executor以外的内存空间)

由于临时存储, 可能存在数据丢失, 当缓存失效后, 可以基于原有依赖关系重新计算。因为缓存操作, 并不会将RDD之间的依赖关系给截断

缓存的API都是LAZY的, 如果需要触发缓存操作, 必须后续跟上一个action算子, 一般建议使用count。 如果不添加action算子, 只有当后续遇到第一个action算子后, 才会触发缓存

  • 设置缓存

    rdd.cache(): 仅能将数据缓存到内存中
    rdd.persist(缓存的级别(位置)): 默认将数据缓存到内存中, 也支持自定义缓存位置

例如:

rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()


   

  • 清理缓存:

    rdd.unpersist()

     默认情况下, Spark程序执行完成后, 缓存会自动失效, 自动删除

  • 使用场景

当某个RDD被使用多次的时候,建议缓存此RDD数据

当某个RDD计算结果复杂,并且使用不止一次,建议缓存此RDD数据

检查点

checkpoint将数据保存到HDFS上,借助HDFS的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

构建checkpoint操作后, 会将RDD之间的依赖关系截断, 后续计算如果出现了问题, 直接从检查点的位置恢复数据

集群模式中, checkpoint的保存路径地址必须是HDFS, 如果是local模式, 可以支持本地路径, checkpoint数据不会自动删除, 必须通过手动方式将其删除掉

Checkpoint相关的API:
 设置检查点的保存数据位置 
        sc.setCheckpointDir('路径地址') 默认路径为HDFS
        
  对应RDD上开启检查点
        rdd.checkpoint()
        rdd.count()

Spark容错机制:首先会查看RDD是否被Cache,如果被Cache到内存或磁盘,直接获取,否则查看Checkpoint所指定的HDFS中是否缓存数据,如果都没有则直接从父RDD开始重新计算还原。

一般建议将两种持久化的方案一同作用于项目环境中, 先设置缓存, 然后在设置检查点, 最后统一触发执行(先将数据缓存到内存中, 然后将内存数据写入到磁盘结束, 在使用的时候, 程序会自动优先读取内存, 内存没有读取磁盘)

 # 先设置缓存rdd4.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)# 接着设置checkpointrdd4.checkpoint()# 最后统一触发rdd4.count()

RDD共享变量

默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task) 和 任务控制节点(Driver Program) 之间共享变量

广播变量

在Driver端定义一个共享的变量, 如果不使用广播变量。 各个线程在运行Task的时候, 都需要将这个变量拷贝到自己的线程中, 对网络传输, 内存的使用都是一种浪费, 而且影响效率
    
如果使用广播变量。 会将变量在每个executor上放置一份, 各个线程直接读取executor上的变量,不需要拉取到Task中。 减少副本的数量, 对内存和网络都降低了, 从而提升效率


    
 广播变量是只读的, 各个Task只能读取数据, 不能修改

 

累加器Accumulator

一般,使用RDD的 map() 函数或者用 filter() 传条件时,可以使用驱动器driver程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。若保持driver程序中变量的原子性,可以使用Accumulator

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。 

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

累加器只能在Driver端定义,在Executor端更新,不能在Executor端定义,不能在Executor端.value获取值。

累加器在遇到多次action操作的时候会出现重复累加求和的问题,解决办法是在调用累加器后的RDD上, 对其设置缓存操作

 # 定义一个公共变量 --> spark的累加器实现acc = sc.accumulator(0)# 2. 执行相关的操作# 2.1 初始化一份数据rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])# 2.2 对每一个元素累加一个指定变量的值的操作:# 需求: 对每一个元素进行 +10返回, 在执行过程中, 请统计共计对多少个数据进行了 +10 操作def fn1(e):acc.add(1)return e + 10rdd_map = rdd_init.map(fn1)# 设置缓存rdd_map.cache().count()# 触发一个Actionrdd_map.reduceByKey()# 触发另一个Actionrdd_filter = rdd_map.filter(lambda e : e > 5)# 2.3 打印结果print(rdd_filter.collect())

RDD依赖

窄依赖

父RDD与子RDD之间的分区一对一,即父RDD中,一个分区内的数据不能分割,只能由子RDD中的一个分区整个利用

宽依赖

父RDD中一个分区的数据被子RDD中的多个分区接收,中间必然存在Shuffle操作;

Shuffle是判断宽窄依赖的重要依据,同时也是划分stage的依据

这篇关于Spark - Resilient Distributed Datasets (RDDs)介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/329796

相关文章

MybatisPlus service接口功能介绍

《MybatisPlusservice接口功能介绍》:本文主要介绍MybatisPlusservice接口功能介绍,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友... 目录Service接口基本用法进阶用法总结:Lambda方法Service接口基本用法MyBATisP

MySQL复杂SQL之多表联查/子查询详细介绍(最新整理)

《MySQL复杂SQL之多表联查/子查询详细介绍(最新整理)》掌握多表联查(INNERJOIN,LEFTJOIN,RIGHTJOIN,FULLJOIN)和子查询(标量、列、行、表子查询、相关/非相关、... 目录第一部分:多表联查 (JOIN Operations)1. 连接的类型 (JOIN Types)

java中BigDecimal里面的subtract函数介绍及实现方法

《java中BigDecimal里面的subtract函数介绍及实现方法》在Java中实现减法操作需要根据数据类型选择不同方法,主要分为数值型减法和字符串减法两种场景,本文给大家介绍java中BigD... 目录Java中BigDecimal里面的subtract函数的意思?一、数值型减法(高精度计算)1.

Pytorch介绍与安装过程

《Pytorch介绍与安装过程》PyTorch因其直观的设计、卓越的灵活性以及强大的动态计算图功能,迅速在学术界和工业界获得了广泛认可,成为当前深度学习研究和开发的主流工具之一,本文给大家介绍Pyto... 目录1、Pytorch介绍1.1、核心理念1.2、核心组件与功能1.3、适用场景与优势总结1.4、优

Java实现本地缓存的常用方案介绍

《Java实现本地缓存的常用方案介绍》本地缓存的代表技术主要有HashMap,GuavaCache,Caffeine和Encahche,这篇文章主要来和大家聊聊java利用这些技术分别实现本地缓存的方... 目录本地缓存实现方式HashMapConcurrentHashMapGuava CacheCaffe

Spring Security介绍及配置实现代码

《SpringSecurity介绍及配置实现代码》SpringSecurity是一个功能强大的Java安全框架,它提供了全面的安全认证(Authentication)和授权(Authorizatio... 目录简介Spring Security配置配置实现代码简介Spring Security是一个功能强

JSR-107缓存规范介绍

《JSR-107缓存规范介绍》JSR是JavaSpecificationRequests的缩写,意思是Java规范提案,下面给大家介绍JSR-107缓存规范的相关知识,感兴趣的朋友一起看看吧... 目录1.什么是jsR-1072.应用调用缓存图示3.JSR-107规范使用4.Spring 缓存机制缓存是每一

Java中 instanceof 的用法详细介绍

《Java中instanceof的用法详细介绍》在Java中,instanceof是一个二元运算符(类型比较操作符),用于检查一个对象是否是某个特定类、接口的实例,或者是否是其子类的实例,这篇文章... 目录引言基本语法基本作用1. 检查对象是否是指定类的实例2. 检查对象是否是子类的实例3. 检查对象是否

什么是ReFS 文件系统? ntfs和refs的优缺点区别介绍

《什么是ReFS文件系统?ntfs和refs的优缺点区别介绍》最近有用户在Win11Insider的安装界面中发现,可以使用ReFS来格式化硬盘,这是不是意味着,ReFS有望在未来成为W... 数十年以来,Windows 系统一直将 NTFS 作为「内置硬盘」的默认文件系统。不过近些年来,微软还在研发一款名

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式