Spark算子:RDD基本转换操作(6)–zip、zipPartitions

2024-06-23 13:18

本文主要是介绍Spark算子:RDD基本转换操作(6)–zip、zipPartitions,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

zip

      def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

       zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))           scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常

zipPartitions

      zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

      该函数有好几种实现,可分为三类:

      参数是一个RDD
            def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1:       ClassTag[V]): RDD[V]

            def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])      (implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

      这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

      映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){|       (rdd1Iter,rdd2Iter) => {|         var result = List[String]()|         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())|         }|         result.iterator|       }|     }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)


      参数是两个RDD
            def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit       arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

            def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T],

      Iterator[B],Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]


      用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){|       (rdd1Iter,rdd2Iter,rdd3Iter) => {|         var result = List[String]()|         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())|         }|         result.iterator|       }|     }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

       参数是三个RDD
      def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C],      Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

      def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T],      Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3:      ClassTag[V]): RDD[V]

      用法同上面,只不过这里又多了个一个RDD而已。

      转载请注明:Spark算子:RDD基本转换操作(6)–zip、zipPartitions

这篇关于Spark算子:RDD基本转换操作(6)–zip、zipPartitions的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087263

相关文章

Python正则表达式匹配和替换的操作指南

《Python正则表达式匹配和替换的操作指南》正则表达式是处理文本的强大工具,Python通过re模块提供了完整的正则表达式功能,本文将通过代码示例详细介绍Python中的正则匹配和替换操作,需要的朋... 目录基础语法导入re模块基本元字符常用匹配方法1. re.match() - 从字符串开头匹配2.

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java实现将HTML文件与字符串转换为图片

《Java实现将HTML文件与字符串转换为图片》在Java开发中,我们经常会遇到将HTML内容转换为图片的需求,本文小编就来和大家详细讲讲如何使用FreeSpire.DocforJava库来实现这一功... 目录前言核心实现:html 转图片完整代码场景 1:转换本地 HTML 文件为图片场景 2:转换 H

Java实现在Word文档中添加文本水印和图片水印的操作指南

《Java实现在Word文档中添加文本水印和图片水印的操作指南》在当今数字时代,文档的自动化处理与安全防护变得尤为重要,无论是为了保护版权、推广品牌,还是为了在文档中加入特定的标识,为Word文档添加... 目录引言Spire.Doc for Java:高效Word文档处理的利器代码实战:使用Java为Wo

Python中Json和其他类型相互转换的实现示例

《Python中Json和其他类型相互转换的实现示例》本文介绍了在Python中使用json模块实现json数据与dict、object之间的高效转换,包括loads(),load(),dumps()... 项目中经常会用到json格式转为object对象、dict字典格式等。在此做个记录,方便后续用到该方

sysmain服务可以禁用吗? 电脑sysmain服务关闭后的影响与操作指南

《sysmain服务可以禁用吗?电脑sysmain服务关闭后的影响与操作指南》在Windows系统中,SysMain服务(原名Superfetch)作为一个旨在提升系统性能的关键组件,一直备受用户关... 在使用 Windows 系统时,有时候真有点像在「开盲盒」。全新安装系统后的「默认设置」,往往并不尽编

Python ORM神器之SQLAlchemy基本使用完全指南

《PythonORM神器之SQLAlchemy基本使用完全指南》SQLAlchemy是Python主流ORM框架,通过对象化方式简化数据库操作,支持多数据库,提供引擎、会话、模型等核心组件,实现事务... 目录一、什么是SQLAlchemy?二、安装SQLAlchemy三、核心概念1. Engine(引擎)

Python自动化处理PDF文档的操作完整指南

《Python自动化处理PDF文档的操作完整指南》在办公自动化中,PDF文档处理是一项常见需求,本文将介绍如何使用Python实现PDF文档的自动化处理,感兴趣的小伙伴可以跟随小编一起学习一下... 目录使用pymupdf读写PDF文件基本概念安装pymupdf提取文本内容提取图像添加水印使用pdfplum

Python从Word文档中提取图片并生成PPT的操作代码

《Python从Word文档中提取图片并生成PPT的操作代码》在日常办公场景中,我们经常需要从Word文档中提取图片,并将这些图片整理到PowerPoint幻灯片中,手动完成这一任务既耗时又容易出错,... 目录引言背景与需求解决方案概述代码解析代码核心逻辑说明总结引言在日常办公场景中,我们经常需要从 W

使用Python的requests库来发送HTTP请求的操作指南

《使用Python的requests库来发送HTTP请求的操作指南》使用Python的requests库发送HTTP请求是非常简单和直观的,requests库提供了丰富的API,可以发送各种类型的HT... 目录前言1. 安装 requests 库2. 发送 GET 请求3. 发送 POST 请求4. 发送