RDD的join和Dstream的join有什么区别?

2023-10-09 03:32
文章标签 区别 join rdd dstream

本文主要是介绍RDD的join和Dstream的join有什么区别?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

有人在知识星球里问:

浪院长,RDD的join和Dstream的join有什么区别?

浪尖的回答:

DStream的join底层就是rdd的join。

下面,我们就带着疑问去验证以下,我们的想法。

2. DStream -> PairDStreamFunctions

Dstream这个类实际上支持的只是Spark Streaming的基础操作算子,比如: mapfilter 和window.PairDStreamFunctions 这个支持key-valued类型的流数据

,支持的操作算子,如,groupByKeyAndWindow,join。这些操作,在有key-value类型的流上是自动识别的。

对于dstream -> PairDStreamFunctions自动转换的过程大家肯定想到的是scala的隐式转换。具体代码在Dstream的object内部。

implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
     (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
   PairDStreamFunctions[K, V] = {
   new PairDStreamFunctions[K, V](stream)
 }

假如,你对scala的隐式转换比较懵逼,请阅读下面文章。

Scala语法基础之隐式转换

3. PairDStreamFunctions的join

PairDStreamFunctions的join API总共有三种

/**
  * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
  * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   *
   *  通过join this和other Dstream的rdd构建出一个新的DStream.
   *  Hash分区器,用来使用默认的分区数来产生RDDs。
  */
 def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
   join[W](other, defaultPartitioner())
 }

 /**
  * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
  * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
   *
   *  通过join this和other Dstream的rdd构建出一个新的DStream.
   *  Hash分区器,用来使用numPartitions分区数来产生RDDs。
  */
 def join[W: ClassTag](
     other: DStream[(K, W)],
     numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope {
   join[W](other, defaultPartitioner(numPartitions))
 }

 /**
  * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
  * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   * 通过join this和other Dstream的rdd构建出一个新的DStream.
   * 使用org.apache.spark.Partitioner来控制每个RDD的分区。
  */
 def join[W: ClassTag](
     other: DStream[(K, W)],
     partitioner: Partitioner
   ): DStream[(K, (V, W))] = ssc.withScope {
   self.transformWith(
     other,
     (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
   )
 }

上面所示代码中,第三个PairDStreamFunctions的join api 体现了join的操作,也即是函数:

(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)

上面是两个RDD的join过程,并且指定了分区器。后面我们主要是关注该函数封装及调用。

其实,看过浪尖的Spark Streaming的视频的朋友或者度过浪尖关于Spark Streaming相关源码讲解的朋友应该有所了解的是。 这个生成RDD的函数应该是在 DStream的compute方法中在生成RDD的时候调用。假设你不了解也不要紧。 我们跟着代码轨迹前进,验证我们的想法。

DStream.transformWith

/**
  * Return a new DStream in which each RDD is generated by applying a function
  * on each RDD of 'this' DStream and 'other' DStream.
  */

 def transformWith[U: ClassTag, V: ClassTag](
     other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
   ): DStream[V] = ssc.withScope {
   // because the DStream is reachable from the outer object here, and because
   // DStreams can't be serialized with closures, we can't proactively check
   // it for serializability and so we pass the optional false to SparkContext.clean
   val cleanedF = ssc.sparkContext.clean(transformFunc, false)
   transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
 }
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream and 'other' DStream.
    */

   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
     ): DStream[V] = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 2)
       val rdd1 = rdds(0).asInstanceOf[RDD[T]]
       val rdd2 = rdds(1).asInstanceOf[RDD[U]]
       cleanedF(rdd1, rdd2, time)
     }
     new TransformedDStream[V](Seq(this, other), realTransformFunc)
   }

经过上面两个 TransformWith操作,最终生成了一个TransformedDStream。需要关注的是new TransformedDStream[V](Seq(this, other), realTransformFunc) 第一个参数是一个包含要进行join操作的两个流的Seq。

那么,TransformedDStream 的parents 就包含了两个流。我们可以看到其 compute 方法的第一行。

override def compute(validTime: Time): Option[RDD[U]] = {
//    针对每一个流,获取其当前时间的RDD。
   val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
     // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
     throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
   }

compute的第一行就是获取parent中每个流,当前有效时间的RDD。然后调用,前面步骤封装的函数进行join。

val transformedRDD = transformFunc(parentRDDs, validTime)

以上就是join的全部过程。也是,验证浪尖所说的,DStream的join底层就是RDD的join。

640?wx_fmt=jpeg

这篇关于RDD的join和Dstream的join有什么区别?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/170049

相关文章

java String.join()方法实例详解

《javaString.join()方法实例详解》String.join()是Java提供的一个实用方法,用于将多个字符串按照指定的分隔符连接成一个字符串,这一方法是Java8中引入的,极大地简化了... 目录bVARxMJava String.join() 方法详解1. 方法定义2. 基本用法2.1 拼接

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

Linux中的more 和 less区别对比分析

《Linux中的more和less区别对比分析》在Linux/Unix系统中,more和less都是用于分页查看文本文件的命令,但less是more的增强版,功能更强大,:本文主要介绍Linu... 目录1. 基础功能对比2. 常用操作对比less 的操作3. 实际使用示例4. 为什么推荐 less?5.

Java 关键字transient与注解@Transient的区别用途解析

《Java关键字transient与注解@Transient的区别用途解析》在Java中,transient是一个关键字,用于声明一个字段不会被序列化,这篇文章给大家介绍了Java关键字transi... 在Java中,transient 是一个关键字,用于声明一个字段不会被序列化。当一个对象被序列化时,被

解读@ConfigurationProperties和@value的区别

《解读@ConfigurationProperties和@value的区别》:本文主要介绍@ConfigurationProperties和@value的区别及说明,具有很好的参考价值,希望对大家... 目录1. 功能对比2. 使用场景对比@ConfigurationProperties@Value3. 核

Spring Boot拦截器Interceptor与过滤器Filter深度解析(区别、实现与实战指南)

《SpringBoot拦截器Interceptor与过滤器Filter深度解析(区别、实现与实战指南)》:本文主要介绍SpringBoot拦截器Interceptor与过滤器Filter深度解析... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现与实

关于Mybatis和JDBC的使用及区别

《关于Mybatis和JDBC的使用及区别》:本文主要介绍关于Mybatis和JDBC的使用及区别,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、JDBC1.1、流程1.2、优缺点2、MyBATis2.1、执行流程2.2、使用2.3、实现方式1、XML配置文件

exfat和ntfs哪个好? U盘格式化选择NTFS与exFAT的详细区别对比

《exfat和ntfs哪个好?U盘格式化选择NTFS与exFAT的详细区别对比》exFAT和NTFS是两种常见的文件系统,它们各自具有独特的优势和适用场景,以下是关于exFAT和NTFS的详细对比... 无论你是刚入手了内置 SSD 还是便携式移动硬盘或 U 盘,都需要先将它格式化成电脑或设备能够识别的「文

什么是ReFS 文件系统? ntfs和refs的优缺点区别介绍

《什么是ReFS文件系统?ntfs和refs的优缺点区别介绍》最近有用户在Win11Insider的安装界面中发现,可以使用ReFS来格式化硬盘,这是不是意味着,ReFS有望在未来成为W... 数十年以来,Windows 系统一直将 NTFS 作为「内置硬盘」的默认文件系统。不过近些年来,微软还在研发一款名

MySQL 多表连接操作方法(INNER JOIN、LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN)

《MySQL多表连接操作方法(INNERJOIN、LEFTJOIN、RIGHTJOIN、FULLOUTERJOIN)》多表连接是一种将两个或多个表中的数据组合在一起的SQL操作,通过连接,... 目录一、 什么是多表连接?二、 mysql 支持的连接类型三、 多表连接的语法四、实战示例 数据准备五、连接的性