####好#####DStreams上的输出操作

2024-05-07 15:08

本文主要是介绍####好#####DStreams上的输出操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DStreams上的输出操作

输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作:

Output OperationMeaning
print()在DStream的每个批数据中打印前10条元素,这个操作在开发和调试中都非常有用。在Python API中调用pprint()
saveAsObjectFiles(prefix, [suffix])保存DStream的内容为一个序列化的文件SequenceFile。每一个批间隔的文件的文件名基于prefixsuffix生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。
saveAsTextFiles(prefix, [suffix])保存DStream的内容为一个文本文件。每一个批间隔的文件的文件名基于prefixsuffix生成。"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix, [suffix])保存DStream的内容为一个hadoop文件。每一个批间隔的文件的文件名基于prefixsuffix生成。"prefix-TIME_IN_MS[.suffix]",在Python API中不可用。
foreachRDD(func)在从流中生成的每个RDD上应用函数func的最通用的输出操作。这个函数应该推送每个RDD的数据到外部系统,例如保存RDD到文件或者通过网络写到数据库中。需要注意的是,func函数在驱动程序中执行,并且通常都有RDD action在里面推动RDD流的计算。

利用foreachRDD的设计模式

dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。

  • 经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中尝试调用这个连接对象保存记录到RDD中,如下:
  dstream.foreachRDD(rdd => {val connection = createNewConnection()  // executed at the driverrdd.foreach(record => {connection.send(record) // executed at the worker})})

这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。

  • 然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如:
  dstream.foreachRDD(rdd => {rdd.foreach(record => {val connection = createNewConnection()connection.send(record)connection.close()})})

通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用rdd.foreachPartition方法。为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。

 dstream.foreachRDD(rdd => {rdd.foreachPartition(partitionOfRecords => {val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()})})

这就将连接对象的创建开销分摊到了partition的所有记录上了。

  • 最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。
  dstream.foreachRDD(rdd => {rdd.foreachPartition(partitionOfRecords => {// ConnectionPool is a static, lazily initialized pool of connectionsval connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => connection.send(record))ConnectionPool.returnConnection(connection)  // return to the pool for future reuse})})

需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。

其它需要注意的地方:

  • 输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者用于输出操作dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。
  • 默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。

这篇关于####好#####DStreams上的输出操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967696

相关文章

Java操作Word文档的全面指南

《Java操作Word文档的全面指南》在Java开发中,操作Word文档是常见的业务需求,广泛应用于合同生成、报表输出、通知发布、法律文书生成、病历模板填写等场景,本文将全面介绍Java操作Word文... 目录简介段落页头与页脚页码表格图片批注文本框目录图表简介Word编程最重要的类是org.apach

在Linux中改变echo输出颜色的实现方法

《在Linux中改变echo输出颜色的实现方法》在Linux系统的命令行环境下,为了使输出信息更加清晰、突出,便于用户快速识别和区分不同类型的信息,常常需要改变echo命令的输出颜色,所以本文给大家介... 目python录在linux中改变echo输出颜色的方法技术背景实现步骤使用ANSI转义码使用tpu

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

Linux链表操作方式

《Linux链表操作方式》:本文主要介绍Linux链表操作方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、链表基础概念与内核链表优势二、内核链表结构与宏解析三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势六、典型应用场景七、调试技巧与