【pyspark速成专家】5_Spark之RDD编程3

2024-05-26 00:20

本文主要是介绍【pyspark速成专家】5_Spark之RDD编程3,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

​编辑

六,共享变量

七,分区操作


六,共享变量

当spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。

但是,有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量,广播变量和累加器。

广播变量是不可变变量,实现在不同节点不同任务之间共享数据。

广播变量在每个机器上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。

累加器主要是不同节点和Driver之间共享变量,只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的,在节点上不可见。

#广播变量 broadcast 不可变,在所有节点可读broads = sc.broadcast(100)rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())print(broads.value)[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100#累加器 只能在Driver上可读,在其它节点只能进行累加total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)rdd.foreach(lambda x:total.add(x))
total.value45# 计算数据的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)def func(x):total.add(x)count.add(1)rdd.foreach(func)total.value/count.value2.6

七,分区操作

分区操作包括改变分区操作,以及针对分区执行的一些转换操作。

glom:将一个分区内的数据转换为一个列表作为一行。

coalesce:shuffle可选,默认为False情况下窄依赖,不能增加分区。repartition和partitionBy调用它实现。

repartition:按随机数进行shuffle,相同key不一定在同一个分区

partitionBy:按key进行shuffle,相同key放入同一个分区

HashPartitioner:默认分区器,根据key的hash值进行分区,相同的key进入同一分区,效率较高,key不可为Array.

RangePartitioner:只在排序相关函数中使用,除相同的key进入同一分区,相邻的key也会进入同一分区,key必须可排序。

TaskContext: 获取当前分区id方法 TaskContext.get.partitionId

mapPartitions:每次处理分区内的一批数据,适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支

mapPartitionsWithIndex:类似mapPartitions,提供了分区索引,输入参数为(i,Iterator)

foreachPartition:类似foreach,但每次提供一个Partition的一批数据

glom

#glom将一个分区内的数据转换为一个列表作为一行。
a = sc.parallelize(range(10),2)
b = a.glom()
b.collect() [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

coalesce

#coalesce 默认shuffle为False,不能增加分区,只能减少分区
#如果要增加分区,要设置shuffle = true
#parallelize等许多操作可以指定分区数
a = sc.parallelize(range(10),3)  
print(a.getNumPartitions())
print(a.glom().collect())3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]b = a.coalesce(2) 
print(b.glom().collect())[[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]

repartition

#repartition按随机数进行shuffle,相同key不一定在一个分区,可以增加分区
#repartition实际上调用coalesce实现,设置了shuffle = True
a = sc.parallelize(range(10),3)  
c = a.repartition(4) 
print(c.glom().collect())[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]#repartition按随机数进行shuffle,相同key不一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.repartition(2)
print(c.glom().collect())[[('a', 1), ('a', 2), ('c', 3)], [('a', 1)]]

partitionBy

#partitionBy按key进行shuffle,相同key一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.partitionBy(2)
print(c.glom().collect())

mapPartitions

#mapPartitions可以对每个分区分别执行操作
#每次处理分区内的一批数据,适合需要按批处理数据的情况
#例如将数据写入数据库时,可以极大的减少连接次数。
#mapPartitions的输入分区内数据组成的Iterator,其输出也需要是一个Iterator
#以下例子查看每个分区内的数据,相当于用mapPartitions实现了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

mapPartitionsWithIndex

#mapPartitionsWithIndex可以获取两个参数
#即分区id和每个分区内的数据组成的Iterator
a = sc.parallelize(range(11),2)def func(pid,it):s = sum(it)return(iter([str(pid) + "|" + str(s)]))[str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()#利用TaskContext可以获取当前每个元素的分区
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]

foreachPartitions

#foreachPartition对每个分区分别执行操作
#范例:求每个分区内最大值的和
total = sc.accumulator(0.0)a = sc.parallelize(range(1,101),3)def func(it):total.add(max(it))a.foreachPartition(func)
total.value199.0

aggregate

#aggregate是一个Action操作
#aggregate比较复杂,先对每个分区执行一个函数,再对每个分区结果执行一个合并函数。
#例子:求元素之和以及元素个数
#三个参数,第一个参数为初始值,第二个为分区执行函数,第三个为结果合并执行函数。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):return((t[0]+x,t[1]+1))def outer_func(p,q):return((p[0]+q[0],p[1]+q[1]))rdd.aggregate((0,0),inner_func,outer_func)(210, 20)

aggregateByKey

#aggregateByKey的操作和aggregate类似,但是会对每个key分别进行操作
#第一个参数为初始值,第二个参数为分区内归并函数,第三个参数为分区间归并函数a = sc.parallelize([("a",1),("b",1),("c",2),("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),lambda x,y:max(x,y))
b.collect()[('b', 3), ('a', 2), ('c', 2)]

这篇关于【pyspark速成专家】5_Spark之RDD编程3的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1003030

相关文章

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁

Python 异步编程 asyncio简介及基本用法

《Python异步编程asyncio简介及基本用法》asyncio是Python的一个库,用于编写并发代码,使用协程、任务和Futures来处理I/O密集型和高延迟操作,本文给大家介绍Python... 目录1、asyncio是什么IO密集型任务特征2、怎么用1、基本用法2、关键字 async1、async

Java并发编程之如何优雅关闭钩子Shutdown Hook

《Java并发编程之如何优雅关闭钩子ShutdownHook》这篇文章主要为大家详细介绍了Java如何实现优雅关闭钩子ShutdownHook,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起... 目录关闭钩子简介关闭钩子应用场景数据库连接实战演示使用关闭钩子的注意事项开源框架中的关闭钩子机制1.

shell编程之函数与数组的使用详解

《shell编程之函数与数组的使用详解》:本文主要介绍shell编程之函数与数组的使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录shell函数函数的用法俩个数求和系统资源监控并报警函数函数变量的作用范围函数的参数递归函数shell数组获取数组的长度读取某下的

揭秘Python Socket网络编程的7种硬核用法

《揭秘PythonSocket网络编程的7种硬核用法》Socket不仅能做聊天室,还能干一大堆硬核操作,这篇文章就带大家看看Python网络编程的7种超实用玩法,感兴趣的小伙伴可以跟随小编一起... 目录1.端口扫描器:探测开放端口2.简易 HTTP 服务器:10 秒搭个网页3.局域网游戏:多人联机对战4.

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

Python异步编程中asyncio.gather的并发控制详解

《Python异步编程中asyncio.gather的并发控制详解》在Python异步编程生态中,asyncio.gather是并发任务调度的核心工具,本文将通过实际场景和代码示例,展示如何结合信号量... 目录一、asyncio.gather的原始行为解析二、信号量控制法:给并发装上"节流阀"三、进阶控制

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同

PyCharm接入DeepSeek实现AI编程的操作流程

《PyCharm接入DeepSeek实现AI编程的操作流程》DeepSeek是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的AI模型,接下来,我们把DeepSeek接入到PyCharm中... 目录引言效果演示创建API key在PyCharm中下载Continue插件配置Continue引言

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont