spark shuffle的演进过程

2024-06-13 12:08
文章标签 过程 spark 演进 shuffle

本文主要是介绍spark shuffle的演进过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

spark各版本shuffle的变化

  • Spark 0.8及以前 Hash Based Shuffle
  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
  • Spark 1.4 引入Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle退出历史舞台

接下来详细研究版本演化的驱动因素

Hash Based Shuffle

最开始的时候使用的是 Hash Based Shuffle, 这时候每一个Mapper会根据Reducer的数量创建出相应的bucket,bucket的数量是MR ,其中M是MapTask的个数,R是ReduceTask的个数。这样会产生大量的小文件,对文件系统压力很大,而且也不利于IO吞吐量。如下图
在这里插入图片描述
后面做了优化,把在同一个core上运行的多个Mapper task 输出合并到同一个文件,这样文件数目就变成了 cores
R 个了
在这里插入图片描述

Sort Based Shuffle

经过FileConsolidation之后,同一个core上会产出

SortShuffleManager代码解析
override def registerShuffle[K, V, C](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't// need map-side aggregation, then write numPartitions files directly and just concatenate// them at the end. This avoids doing serialization and deserialization twice to merge// together the spilled files, which would happen with the normal code path. The downside is// having multiple files open at a time and thus more memory allocated to buffers.new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:new SerializedShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])} else {// Otherwise, buffer map outputs in a deserialized form:new BaseShuffleHandle(shuffleId, numMaps, dependency)}}

在这里插入图片描述

SortShuffleWriter代码解析

SortShuffleWriter使用ExternalSorter,write的方法接收的参数为Iterator[Product2[K, V]],一个KEY、VALUE的集合,经过ExternalSorter排序之后,向

/** Write a bunch of records to this task's output */override def write(records: Iterator[Product2[K, V]]): Unit = {sorter = if (dep.mapSideCombine) {require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)} else {// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)}sorter.insertAll(records)// Don't bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)val tmp = Utils.tempFileWith(output)try {val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)val partitionLengths = sorter.writePartitionedFile(blockId, tmp)shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)} finally {if (tmp.exists() && !tmp.delete()) {logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")}}}

这篇关于spark shuffle的演进过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1057262

相关文章

Pytorch介绍与安装过程

《Pytorch介绍与安装过程》PyTorch因其直观的设计、卓越的灵活性以及强大的动态计算图功能,迅速在学术界和工业界获得了广泛认可,成为当前深度学习研究和开发的主流工具之一,本文给大家介绍Pyto... 目录1、Pytorch介绍1.1、核心理念1.2、核心组件与功能1.3、适用场景与优势总结1.4、优

Redis指南及6.2.x版本安装过程

《Redis指南及6.2.x版本安装过程》Redis是完全开源免费的,遵守BSD协议,是一个高性能(NOSQL)的key-value数据库,Redis是一个开源的使用ANSIC语言编写、支持网络、... 目录概述Redis特点Redis应用场景缓存缓存分布式会话分布式锁社交网络最新列表Redis各版本介绍旧

SpringBoot整合Sa-Token实现RBAC权限模型的过程解析

《SpringBoot整合Sa-Token实现RBAC权限模型的过程解析》:本文主要介绍SpringBoot整合Sa-Token实现RBAC权限模型的过程解析,本文给大家介绍的非常详细,对大家的学... 目录前言一、基础概念1.1 RBAC模型核心概念1.2 Sa-Token核心功能1.3 环境准备二、表结

Jvm sandbox mock机制的实践过程

《Jvmsandboxmock机制的实践过程》:本文主要介绍Jvmsandboxmock机制的实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景二、定义一个损坏的钟1、 Springboot工程中创建一个Clock类2、 添加一个Controller

python多线程并发测试过程

《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、并发与并行?二、同步与异步的概念?三、线程与进程的区别?需求1:多线程执行不同任务需求2:多线程执行相同任务总结一、并发与并行?1、

MybatisPlus3.3.1整合clickhouse的过程

《MybatisPlus3.3.1整合clickhouse的过程》:本文主要介绍MybatisPlus3.3.1整合clickhouse的过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定... 前言ClickHouse是俄罗斯Yandex发布的一款数据分析型数据库支持sql语法,详情可以访问官网,

Spring AI 实现 STDIO和SSE MCP Server的过程详解

《SpringAI实现STDIO和SSEMCPServer的过程详解》STDIO方式是基于进程间通信,MCPClient和MCPServer运行在同一主机,主要用于本地集成、命令行工具等场景... 目录Spring AI 实现 STDIO和SSE MCP Server1.新建Spring Boot项目2.a

使用Java将实体类转换为JSON并输出到控制台的完整过程

《使用Java将实体类转换为JSON并输出到控制台的完整过程》在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用JSON格式,用Java将实体类转换为J... 在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用j

将图片导入Python的turtle库的详细过程

《将图片导入Python的turtle库的详细过程》在Python编程的世界里,turtle库以其简单易用、图形化交互的特点,深受初学者喜爱,随着项目的复杂度增加,仅仅依靠线条和颜色来绘制图形可能已经... 目录开篇引言正文剖析1. 理解基础:Turtle库的工作原理2. 图片格式与支持3. 实现步骤详解第

Linux系统调试之ltrace工具使用与调试过程

《Linux系统调试之ltrace工具使用与调试过程》:本文主要介绍Linux系统调试之ltrace工具使用与调试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、ltrace 定义与作用二、ltrace 工作原理1. 劫持进程的 PLT/GOT 表2. 重定