####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析

本文主要是介绍####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、DStream和RDD的关系
    DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据,如下图所示:
 
    
从上图可以看出, 一个DStream 对应了时间维度上的多个RDD。
DStream 作为Spark Stream的一个基本抽象,提供了高层的API来进行Spark Streaming 程序开发,先看一个简单的Spark Streaming的WordCount程序实例:
  1. object WordCount{
  2. def main(args:Array[String]):Unit={
  3. val sparkConf =newSparkConf().setMaster("local[4]").setAppName("WordCount")
  4. val ssc =newStreamingContext(sparkConf,Seconds(1))
  5. val lines = ssc.socketTextStream("localhost",9999)
  6. val words = lines.flatMap(_.split(" "))
  7. val wordCounts = words.map(=>(x,1)).reduceByKey(_+_)
  8. wordCounts.print()
  9. ssc.start()
  10. ssc.awaitTermination()
  11. }
  12. }
我们会发现对DStream的操作和RDD的操作惊人的相似, 通过对DStream的不断转换,形成依赖关系。所以的DStream操作最终会转换成底层的RDD的操作,上面的例子中
lines DStream转换成wods DSteam。 lines DStream的 flatMap操作会作用于其中每一个RDD去生成words DStream 中的RDD, 过程如下图所示:
 
下面从源码角度看一下 DStream和RDD的关系:
    DStream 中 有一个HashMap[Time,RDD[T]]类型的对象 generatedRDDs,其中Key为作业开始时间,RDD为该DStream对应的RDD,源码如下:
 
    
二、Dstream 的分类
    Dstream 主要分为三大类:
         1. Input DStream
         2.  Transformed DStream
         3. Output DStream
 
2.1 InputDStream 是DStream 最初诞生的地方,也是RDD最初诞生的地方,它是依据数据源创建的最初的DStream,如上面例子中的代码:
 
val lines = ssc . socketTextStream ( "localhost" , 9999 )
 
基于Socket数据源创建了 SocketInputDStream对象lines,下面从源码角度分析一下他是怎么生成RDD的,  SocketInputDStream生成RDD的方法在 它的父类ReceiverInputDSteam中:
 

 
ReceiverInputDSteam  的compute方法中调用了createBloackRDD方法基于Block信息创建了RDD :
 

可以看到  ReceiverInputDSteam 的 createBloackRDD 方法new了BlockRDD对象,BlockRDD 是继承自RDD。至此,最初的RDD创建完成。
 
2.2、  Transformed DStream 是由其他DStream 通过非Output算子装换而来的DStream
   例如例子中的lines通过flatMap算子转换生成了FlatMappedDStream:
     val words = lines.flatMap(_.split(" "))
   下面看一下flatMap的源码:
    
    
 
可以看到flatMap是DStream的方法,它创建了FlatMappeedDStream并返回,上面例子中words 就是 FlatMappeedDStream 对象,创建 FlatMappeedDStream对象时传入了 参数flatMapFunc,这里的flatMapFunc就是用户编写的业务逻辑,我们再进入FlatMappedDStream,查看其compute方法:
 

可以惊喜的看到 FlatMappedDStream的compute方法调用了parent的getOrCompute方法获取父DStream的RDD.通过对 父DStream的RDD的flatMap算子生成新的RDD,转换的业务逻辑通过flatMapFunc参数传递给flatMap算子。这样对DStream的操作都转换成了对RDD的操作,同时DSream的依赖关系也与RDD之间依赖关系同时建立了起来。
说明:这些RDD的创建是在Job动态生成时候发生的,Job生成最终会调用ForeachDStream的generateJob方法,源码如下
 

其中的parent.getOrCompute方法会依据DStream之间的依赖关系,导致一系列的链式调用,从而创建所有的RDD,并形成RDD之间的依赖关系。
 
3.3  Output DStream 是有其他DStream通过Output算子生成,它只存在于Output算子内部,并不会像Transformed Stream一样由算子返回, 他是触发Job执行的关键。
          那么什么是Output 算子呢? Output 算子是让DStream中的数据被推送的外部系统,像数据库,文件系统(HDFS,GFS等)的算子。因为Output 算子是将转换后的数据推送到外部系统被使用的操作,所以他触发了前面转换操作的真正执行(类似于RDD的action操作)。
          下面,我们看看有哪些Output算子:
 
Output Operation Meaning
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. 
Python API This is called pprint() in the Python API.
saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based onprefix and suffix"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix"prefix-TIME_IN_MS[.suffix]"
Python API This is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix"prefix-TIME_IN_MS[.suffix]"
Python API This is not available in the Python API.
foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
下面,回到我们开头的例子:
wordCounts . print ()
其中pirnt算子就是Output算子,我们进入print的源码:
 

print()方法调用了print(10),其实是调用了另一个print方法:
print 方法中首先定义了一个函数foreachFunc,foreachFunc从rdd中出去num个元素打印出来。接下来print函数调用了foreachRDD,并将foreachFunc的处理逻辑作为参数传入。这里的foreachRDD也是一个Output算子(上面已经有说明),接下来看看 foreachRDD的源码。
 

 
可以看到foreachRDD中创建了一个ForeachDStream对象,这就是我们期待已久的Output DStream。这里需要注意一个关键点:
创建完ForeachRDD对象后,调用了该对象的register方法。register方法将当前对象注册给DStreamGraph。源码如下:
 

注册的过程就是将当前对象加入graph的输出流outputStream中:

这个过程很重要,在Job触发时候会用到outputStream。我们先在这里记住这个过程,下面的分析会用到这个内容。
至此,DStream到RDD过程已经解析完毕。
三 、由Dstream触发RDD的执行
    Spark Stream的Job执行过程我在另一篇博客有详细介绍,具体细节请参考 http://www.cnblogs.com/zhouyf/p/5503682.html
在生成Job的过程中会调用DStreamGraph的generate方法:
其中,就调用了outputStream的generateJob方法,这里的outputStream就上面有output算子注册给DStreamGraph的输出流。就是我们实例中ForeachDStream 。
 
ForeachDStream 的generateJob方法源码:
 
可以看到它将我们的业务逻辑封装成jobFunc传递给了最终生成的Job对象。
由上篇博客《 Spark streaming技术内幕 : Job动态生成原理与源码解析 我们知道在StreamContext启动会动态创建job,并且最终调用Job的run方法
Job的run方法由JobScheduler的submitJobSet触发 : 
其中jobExecutor对象是一个线程池,JobHandler实现了 Runnable接口,在JobHandler 的run方法中会调用传入的job对象的run方法。在这里Job的run方法开始在线程中执行,JobHandler的run方法源码如下:
 

其中的job就是封装了我们业务逻辑的Job对象,它的run方法会触发我们在foreachRDD方法中对RDD的操作(一般是action操作),到这里RDD的Action操作被触发,spark作业开始执行。
总结:
    1、在一个固定时间维度上,DStream和RDD是一一对应关系,可以将DStream看成是RDD在时间维度上封装。
    2、Dstream 主要分为三大类: Input DStream,Transformed DStream,Output DStream,其中Output Dstream 对开发者是透明的,存在于Output 算子内部。
    3、Spark Streaming应用程序最终会转化成对RDD操作的spark 程序,spark 程序由于执行了foreachRDD算子中的RDD操作被触发。

这篇关于####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967701

相关文章

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

一文解析C#中的StringSplitOptions枚举

《一文解析C#中的StringSplitOptions枚举》StringSplitOptions是C#中的一个枚举类型,用于控制string.Split()方法分割字符串时的行为,核心作用是处理分割后... 目录C#的StringSplitOptions枚举1.StringSplitOptions枚举的常用

Python函数作用域与闭包举例深度解析

《Python函数作用域与闭包举例深度解析》Python函数的作用域规则和闭包是编程中的关键概念,它们决定了变量的访问和生命周期,:本文主要介绍Python函数作用域与闭包的相关资料,文中通过代码... 目录1. 基础作用域访问示例1:访问全局变量示例2:访问外层函数变量2. 闭包基础示例3:简单闭包示例4

MyBatis延迟加载与多级缓存全解析

《MyBatis延迟加载与多级缓存全解析》文章介绍MyBatis的延迟加载与多级缓存机制,延迟加载按需加载关联数据提升性能,一级缓存会话级默认开启,二级缓存工厂级支持跨会话共享,增删改操作会清空对应缓... 目录MyBATis延迟加载策略一对多示例一对多示例MyBatis框架的缓存一级缓存二级缓存MyBat

前端缓存策略的自解方案全解析

《前端缓存策略的自解方案全解析》缓存从来都是前端的一个痛点,很多前端搞不清楚缓存到底是何物,:本文主要介绍前端缓存的自解方案,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、为什么“清缓存”成了技术圈的梗二、先给缓存“把个脉”:浏览器到底缓存了谁?三、设计思路:把“发版”做成“自愈”四、代码

idea+spring boot创建项目的搭建全过程

《idea+springboot创建项目的搭建全过程》SpringBoot是Spring社区发布的一个开源项目,旨在帮助开发者快速并且更简单的构建项目,:本文主要介绍idea+springb... 目录一.idea四种搭建方式1.Javaidea命名规范2JavaWebTomcat的安装一.明确tomcat

Java集合之Iterator迭代器实现代码解析

《Java集合之Iterator迭代器实现代码解析》迭代器Iterator是Java集合框架中的一个核心接口,位于java.util包下,它定义了一种标准的元素访问机制,为各种集合类型提供了一种统一的... 目录一、什么是Iterator二、Iterator的核心方法三、基本使用示例四、Iterator的工

java程序远程debug原理与配置全过程

《java程序远程debug原理与配置全过程》文章介绍了Java远程调试的JPDA体系,包含JVMTI监控JVM、JDWP传输调试命令、JDI提供调试接口,通过-Xdebug、-Xrunjdwp参数配... 目录背景组成模块间联系IBM对三个模块的详细介绍编程使用总结背景日常工作中,每个程序员都会遇到bu