Mark : Spark RDD 内部结构(二) RDD分区

2024-04-06 20:08

本文主要是介绍Mark : Spark RDD 内部结构(二) RDD分区,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RDD 分区

分区

先回答第一个问题:RDD 内部,如何表示并行计算的一个计算单元。答案是使用分区(Partition)。

RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合,这样的每一个子集合我们将其称为分区,分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务中进行,因此并行任务的个数,也是由 RDD(实际上是一个阶段的末 RDD,调度章节会介绍)分区的个数决定的,我会在 1.2 小节以及第二章中,具体说明分区与并行计算的关系。

在后文中,我会用下图所示图形来表示 RDD 以及 RDD 内部的分区,RDD 上方文字表示该 RDD 的类型或者名字,分区颜色为紫红色表示该 RDD 内数据被缓存到存储介质中,蓝色表示该 RDD 为普通 RDD。 —— 话说这配色真的很丑么……

RDD and Partition

分区实现

分区的源码实现为 Partition 类。

/*** An identifier for a partition in an RDD.*/
trait Partition extends Serializable {/*** Get the partition's index within its parent RDD*/def index: Int// A better default implementation of HashCodeoverride def hashCode(): Int = index
}

RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,通过 RDD 编号 + 分区编号可以唯一确定该分区对应的块编号,利用底层数据存储层提供的接口,就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据。

RDD 抽象类中定义了 _partitions 数组成员和 partitions 方法,partitions 方法提供给外部开发者调用,用于获取 RDD 的所有分区。partitions 方法会调用内部 getPartitions 接口,RDD 的子类需要自行实现 getPartitions 接口。

  @transient private var partitions_ : Array[Partition] = null/*** Implemented by subclasses to return the set of partitions in this RDD. This method will only* be called once, so it is safe to implement a time-consuming computation in it.*/protected def getPartitions: Array[Partition]/*** Get the array of partitions of this RDD, taking into account whether the* RDD is checkpointed or not.*/final def partitions: Array[Partition] = {checkpointRDD.map(_.partitions).getOrElse {if (partitions_ == null) {partitions_ = getPartitions}partitions_}}

以 map 转换操作生成 MapPartitionsRDD 类中的 getPartitions 方法为例。

  override def getPartitions: Array[Partition] = firstParent[T].partitions

可以看到,MapPartitionsRDD 的分区实际上与父 RDD 的分区完全一致,这也符合我们对 map 转换操作的认知。

分区个数

RDD 分区的一个分配原则是:尽可能使得分区的个数,等于集群核心数目。

RDD 可以通过创建操作或者转换操作得到。转换操作中,分区的个数会根据转换操作对应多个 RDD 之间的依赖关系确定,窄依赖子 RDD 由父 RDD 分区个数决定,Shuffle 依赖由子 RDD 分区器决定。

创建操作中,程序开发者可以手动指定分区的个数,例如 sc.parallelize (Array(1, 2, 3, 4, 5), 2) 表示创建得到的 RDD 分区个数为 2,在没有指定分区个数的情况下,Spark 会根据集群部署模式,来确定一个分区个数默认值。

分别讨论 parallelize 和textFile 两种通过外部数据创建生成RDD的方法。

对于 parallelize 方法,默认情况下,分区的个数会受 Apache Spark 配置参数 spark.default.parallelism 的影响,官方对该参数的解释是用于控制 Shuffle 过程中默认使用的任务数量,这也符合我们之间对分区个数与任务个数之间关系的理解。

  /** Distribute a local Scala collection to form an RDD.** @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call* to parallelize and before the first action on the RDD, the resultant RDD will reflect the* modified collection. Pass a copy of the argument to avoid this.* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.*/def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {assertNotStopped()new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())}

无论是以本地模式、Standalone 模式、Yarn 模式或者是 Mesos 模式来运行 Apache Spark,分区的默认个数等于对 spark.default.parallelism 的指定值,若该值未设置,则 Apache Spark 会根据不同集群模式的特征,来确定这个值。

对于本地模式,默认分区个数等于本地机器的 CPU 核心总数(或者是用户通过 local[N] 参数指定分配给 Apache Spark 的核心数目,见 LocalBackend 类),显然这样设置是合理的,因为把每个分区的计算任务交付给单个核心执行,能够保证最大的计算效率。

  override def defaultParallelism() =scheduler.conf.getInt("spark.default.parallelism", totalCores)

若使用 Apache Mesos 作为集群的资源管理系统,默认分区个数等于 8(对 Apache Mesos 不是很了解,根据这个 TODO,个人猜测 Apache Spark 暂时还无法获取 Mesos 集群的核心总数)(见 MesosSchedulerBackend 类)。

  // TODO: query Mesos for number of coresoverride def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)

其他集群模式(Standalone 或者 Yarn),默认分区个数等于集群中所有核心数目的总和,或者 2,取两者中的较大值(见 CoarseGrainedSchedulerBackend 类)。

  override def defaultParallelism(): Int = {conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))}

对于 textFile 方法,默认分区个数等于 min(defaultParallelism, 2)(见 SparkContext 类),而 defaultParallelism 实际上就是 parallelism 方法的默认分区值。

  /*** Read a text file from HDFS, a local file system (available on all nodes), or any* Hadoop-supported file system URI, and return it as an RDD of Strings.*/def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions).map(pair => pair._2.toString)}

分区内部记录个数

分区分配的另一个分配原则是:尽可能使同一 RDD 不同分区内的记录的数量一致。

对于转换操作得到的 RDD,如果是窄依赖,则分区记录数量依赖于父 RDD 中相同编号分区是如何进行数据分配的,如果是 Shuffle 依赖,则分区记录数量依赖于选择的分区器,哈希分区器无法保证数据被平均分配到各个分区,而范围分区器则能做到这一点。这部分内容我会在 1.6 小节中讨论。

parallelize 方法通过把输入的数组做一次平均分配,尝试着让每个分区的记录个数尽可能大致相同(见 ParallelCollectionRDD 类)。

private object ParallelCollectionRDD {/*** Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection* is an inclusive Range, we use inclusive range for the last slice.*/def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {if (numSlices < 1) {throw new IllegalArgumentException("Positive number of slices required")}// Sequences need to be sliced at the same set of index positions for operations// like RDD.zip() to behave as expecteddef positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {(0 until numSlices).iterator.map(i => {val start = ((i * length) / numSlices).toIntval end = (((i + 1) * length) / numSlices).toInt(start, end)})}seq match {case r: Range => {positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>// If the range is inclusive, use inclusive range for the last sliceif (r.isInclusive && index == numSlices - 1) {new Range.Inclusive(r.start + start * r.step, r.end, r.step)}else {new Range(r.start + start * r.step, r.start + end * r.step, r.step)}}).toSeq.asInstanceOf[Seq[Seq[T]]]}case nr: NumericRange[_] => {// For ranges of Long, Double, BigInteger, etcval slices = new ArrayBuffer[Seq[T]](numSlices)var r = nrfor ((start, end) <- positions(nr.length, numSlices)) {val sliceSize = end - startslices += r.take(sliceSize).asInstanceOf[Seq[T]]r = r.drop(sliceSize)}slices}case _ => {val array = seq.toArray // To prevent O(n^2) operations for List etcpositions(array.length, numSlices).map({case (start, end) =>array.slice(start, end).toSeq}).toSeq}}}
}

textFile 方法分区内数据的大小则是由 Hadoop API 接口 FileInputFormat.getSplits 方法决定(见 HadoopRDD 类),得到的每一个分片即为 RDD 的一个分区,分片内数据的大小会受文件大小、文件是否可分割、HDFS 中块大小等因素的影响,但总体而言会是比较均衡的分配。

  override def getPartitions: Array[Partition] = {val jobConf = getJobConf()// add the credentials here as this can be called before SparkContext initializedSparkHadoopUtil.get.addCredentials(jobConf)val inputFormat = getInputFormat(jobConf)if (inputFormat.isInstanceOf[Configurable]) {inputFormat.asInstanceOf[Configurable].setConf(jobConf)}val inputSplits = inputFormat.getSplits(jobConf, minPartitions)val array = new Array[Partition](inputSplits.size)for (i <- 0 until inputSplits.size) {array(i) = new HadoopPartition(id, i, inputSplits(i))}array}

参考资料

  1. Spark Configuration - Spark 1.2.0 Documentation
  2. FileInputFormat (Apache Hadoop Main 2.6.0 API)
  3. Spark:RDD 理解

这篇关于Mark : Spark RDD 内部结构(二) RDD分区的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880717

相关文章

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

MySQL数据库实现批量表分区完整示例

《MySQL数据库实现批量表分区完整示例》通俗地讲表分区是将一大表,根据条件分割成若干个小表,:本文主要介绍MySQL数据库实现批量表分区的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考... 目录一、表分区条件二、常规表和分区表的区别三、表分区的创建四、将既有表转换分区表脚本五、批量转换表为分区

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念

Mysql表如何按照日期字段的年月分区

《Mysql表如何按照日期字段的年月分区》:本文主要介绍Mysql表如何按照日期字段的年月分区的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、创键表时直接设置分区二、已有表分区1、分区的前置条件2、分区操作三、验证四、注意总结一、创键表时直接设置分区

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

数据库系统 第41节 数据库分区简介

数据库分区是一种数据库设计技术,用于将大型表或索引的数据分布到不同的物理区域,以提高查询性能、优化数据管理、简化维护任务,并提高数据的可用性。下面我将详细介绍每种分区类型,并结合伪代码或概念性的源代码来说明其实现方式。 1. 范围分区 (Range Partitioning) 范围分区是根据某个列的值范围来划分数据。例如,可以按照日期或数值范围来分区。 示例场景:一个订单表,按年份分区。

Windows环境下SD卡多分区 隐藏分区 解决python裸读写扇区失败

SD卡分区 右键“我的电脑”->“管理”->“磁盘管理”; 如果SD卡有文件系统,点击"删除卷",重新做卡; 删除文件系统后如下图,点击“新建简单卷”; 在导航页“指定卷大小”,设置分区的大小,留出剩余的空间; python读写SD卡 这里需要注意的是,写SD卡需要加锁,否则写不成功 #带文件系统,需要锁定卷,否则无法写扇区@contextlib.contextmanagerd