Spark算子:RDD基本转换操作(6)–zip、zipPartitions

2024-06-23 13:18

本文主要是介绍Spark算子:RDD基本转换操作(6)–zip、zipPartitions,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

zip

      def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

       zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))           scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常

zipPartitions

      zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

      该函数有好几种实现,可分为三类:

      参数是一个RDD
            def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1:       ClassTag[V]): RDD[V]

            def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])      (implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

      这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

      映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){|       (rdd1Iter,rdd2Iter) => {|         var result = List[String]()|         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())|         }|         result.iterator|       }|     }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)


      参数是两个RDD
            def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit       arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

            def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T],

      Iterator[B],Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]


      用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){|       (rdd1Iter,rdd2Iter,rdd3Iter) => {|         var result = List[String]()|         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())|         }|         result.iterator|       }|     }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

       参数是三个RDD
      def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C],      Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

      def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T],      Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3:      ClassTag[V]): RDD[V]

      用法同上面,只不过这里又多了个一个RDD而已。

      转载请注明:Spark算子:RDD基本转换操作(6)–zip、zipPartitions

这篇关于Spark算子:RDD基本转换操作(6)–zip、zipPartitions的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087263

相关文章

mapstruct中的@Mapper注解的基本用法

《mapstruct中的@Mapper注解的基本用法》在MapStruct中,@Mapper注解是核心注解之一,用于标记一个接口或抽象类为MapStruct的映射器(Mapper),本文给大家介绍ma... 目录1. 基本用法2. 常用属性3. 高级用法4. 注意事项5. 总结6. 编译异常处理在MapSt

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

MyBatis ResultMap 的基本用法示例详解

《MyBatisResultMap的基本用法示例详解》在MyBatis中,resultMap用于定义数据库查询结果到Java对象属性的映射关系,本文给大家介绍MyBatisResultMap的基本... 目录MyBATis 中的 resultMap1. resultMap 的基本语法2. 简单的 resul

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与

Java Multimap实现类与操作的具体示例

《JavaMultimap实现类与操作的具体示例》Multimap出现在Google的Guava库中,它为Java提供了更加灵活的集合操作,:本文主要介绍JavaMultimap实现类与操作的... 目录一、Multimap 概述Multimap 主要特点:二、Multimap 实现类1. ListMult

在Java中将XLS转换为XLSX的实现方案

《在Java中将XLS转换为XLSX的实现方案》在本文中,我们将探讨传统ExcelXLS格式与现代XLSX格式的结构差异,并为Java开发者提供转换方案,通过了解底层原理、性能优势及实用工具,您将掌握... 目录为什么升级XLS到XLSX值得投入?实际转换过程解析推荐技术方案对比Apache POI实现编程

Java 枚举的基本使用方法及实际使用场景

《Java枚举的基本使用方法及实际使用场景》枚举是Java中一种特殊的类,用于定义一组固定的常量,枚举类型提供了更好的类型安全性和可读性,适用于需要定义一组有限且固定的值的场景,本文给大家介绍Jav... 目录一、什么是枚举?二、枚举的基本使用方法定义枚举三、实际使用场景代替常量状态机四、更多用法1.实现接

git stash命令基本用法详解

《gitstash命令基本用法详解》gitstash是Git中一个非常有用的命令,它可以临时保存当前工作区的修改,让你可以切换到其他分支或者处理其他任务,而不需要提交这些还未完成的修改,这篇文章主要... 目录一、基本用法1. 保存当前修改(包括暂存区和工作区的内容)2. 查看保存了哪些 stash3. 恢

Python中文件读取操作漏洞深度解析与防护指南

《Python中文件读取操作漏洞深度解析与防护指南》在Web应用开发中,文件操作是最基础也最危险的功能之一,这篇文章将全面剖析Python环境中常见的文件读取漏洞类型,成因及防护方案,感兴趣的小伙伴可... 目录引言一、静态资源处理中的路径穿越漏洞1.1 典型漏洞场景1.2 os.path.join()的陷