####好#####DStream中的转换(transformation)

2024-05-07 15:08

本文主要是介绍####好#####DStream中的转换(transformation),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DStream中的转换(transformation)

和RDD类似,transformation允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:

TransformationMeaning
map(func)利用函数func处理原DStream的每个元素,返回一个新的DStream
flatMap(func)与map相似,但是每个输入项可用被映射为0个或者多个输出项
filter(func)返回一个新的DStream,它仅仅包含源DStream中满足函数func的项
repartition(numPartitions)通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism)
union(otherStream)返回一个新的DStream,它包含源DStream和otherStream的联合元素
count()通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStream
reduce(func)利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化
countByValue()这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。
reduceByKey(func, [numTasks])当在一个由(K,V)对组成的DStream上调用这个算子,返回一个新的由(K,V)对组成的DStream,每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数
join(otherStream, [numTasks])当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStream
cogroup(otherStream, [numTasks])当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, Seq[V], Seq[W])的元组
transform(func)通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用
updateStateByKey(func)利用给定的函数更新DStream的状态,返回一个新"state"的DStream。

最后两个transformation算子需要重点介绍一下:

UpdateStateByKey操作

updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它

  • 定义状态-状态可以是任何的数据类型
  • 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态

让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个state表示,它的类型是整数

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = ...  // add the new values with the previous running count to get the new countSome(newCount)
}

这个函数被用到了DStream包含的单词上

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函数将会被每个单词调用,newValues拥有一系列的1(从 (词, 1)对而来),runningCount拥有之前的次数。要看完整的代码,见例子

Transform操作

transform操作(以及它的变化形式如transformWith)允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用transform方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流来清理实时数据,然后过了它们,你可以按如下方法来做:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informationval cleanedDStream = wordCounts.transform(rdd => {rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
})

事实上,你也可以在transform方法中用机器学习和图计算算法

窗口(window)操作

Spark Streaming也支持窗口计算,它允许你在一个滑动窗口数据上应用transformation算子。下图阐明了这个滑动窗口。

滑动窗口

如上图显示,窗口在源DStream上滑动,合并和操作落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。这说明,任何一个窗口操作都需要指定两个参数:

  • 窗口长度:窗口的持续时间
  • 滑动的时间间隔:窗口操作执行的时间间隔

这两个参数必须是源DStream的批时间间隔的倍数。

下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,我们必须在过去30秒的pairs DStream上应用reduceByKey操作。用方法reduceByKeyAndWindow实现。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数:窗口长度和滑动的时间间隔

这篇关于####好#####DStream中的转换(transformation)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967695

相关文章

bimface 模型集成-后端(java)上传、发起转换、获取转换状态

目录 前言后端架构流程存储表结构全局工具类先根据appid, appsecret 生成accesstoken, 保存到自己的存储服务器。利用保存的 accesstoken 上传模型发起转换获取转换状态根据bimface文件ID获取模型viewtoken, 获取到viewtoken就可以利用前端浏览模型或图纸了 前言 之前没有注意官方有个sdk,然后自己就实现了这么个逻辑。建议

C++转换

#include <iostream> #include <stdlib.h> using namespace std; int main(void){     cout << "请输入一个整数:" << endl;     int x = 0;     cin >> x;     cout <<"  八进制:"<< oct << x << endl;

SpringMVC日期参数转换问题Can not deserialize value of type java.util.Date from String 2018-07-19 15:59:34

问题分析 报错日志 Caused by: com.fasterxml.jackson.databind.exc.InvalidFormatException: Can not deserializevalue of type java.util.Date from Stringto parse Date value '2018-07-19 15:59:34': Can not parse da

网络地址转换(nat,easy ip,nat server)资源上传

实验概述 由内到外  nat,easy ip,转换的是源ip nat server 由外到内,转换的是目的IP 实验拓扑 结果验证 nat实验得到结果 1.ar1到ar3没有路由也可以访问 2.ar3配置telent后ar1也可以通过telnet远程配置 esay ip 如果ar2 g0/0/1接口ip非固定IP,此时可以配置easy ip 配置简单,修改ar2 g1接口ip后测试

golang时间转换工具 将ISO 8601时间转为当地时间戳(13位 单位毫秒)

golang 时间转换工具 将ISO 8601时间转为当地时间戳(13位 单位毫秒) 获得了一个字符串的时间,而且是ISO 8601规范的时间格式,现转为当地(例如中国北京的时间戳) 工具代码如下: /**remark:时间转换工具 将ISO 8601时间转为当地时间戳(13位 毫秒)author:曾冠男*/func transformTimestrToTimestamp(timestr s

OffsetDateTime时间格式转换

OffsetDateTime时间格式转换 可能开发中又碰到OffsetDateTime时间格式,不知道如何转换,最简单一招解决: OffsetDateTime creationTimestamp;creationTimestamp.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) 即可轻松转换为String字符串类型

犀牛Rhinoceros 8创建、编辑、分析、记录、渲染、制作动画和转换

Rhino - 多功能 3D 建模器。 Rhinoceros 可以创建、编辑、分析、记录、渲染、制作动画和转换 NURBS* 曲线、曲面、实体、点云和多边形网格。除了硬件之外,复杂性、程度或大小没有任何限制。 特殊功能包括: -不受约束的自由形式 3D 建模工具,这些工具只在价格高出 20 到 50 倍的产品中才能找到。塑造您能想象到的任何形状。 - 设计、原型、工程、分析和制造从飞机到

【OpenCV C++】cvtColor将彩色图像转换为灰度图时,3个通道的灰度值是如何处理的? 三个通道是如何加权计算的?三个通道取平均得到灰度图吗?

文章目录 在OpenCV中,使用cv::cvtColor函数将彩色图像转换为灰度图时,3个通道的灰度值并不是简单地取平均值,而是通过加权平均的方法来计算的。 具体来说,灰度值是根据人眼对不同颜色敏感度的不同,使用加权公式计算得到的。 转换公式 通常使用的加权公式是: Gray=0.299×R+0.587×G+0.114×B 解释 R、G、B 分别代表红色、绿色和蓝色通道的

Python自学之路--004:Python使用注意点(原始字符串‘r’\字符转换\‘wb’与‘w区别’\‘\‘与‘\\’区别)

目录 1、原始字符串‘r’ 2、字符转换问题 3、open与write函数’wb’与’w’区分 4、Python里面\与\\的区别 1、原始字符串‘r’         以前的脚本通过Python2.7写的,通过Python3.12去编译发现不通用了,其实也是从一个初学者的角度去看待这些问题。         其中的\被认为特殊字符串的一个标识,如下图 需要在前面加

NAT(网络地址转换)模式

它允许私有网络上的主机使用私有IP地址,同时仍然能够与公共网络(如互联网)上的主机进行通信。当私有网络上的设备想要与公共网络上的设备通信时,NAT路由器会将其私有IP地址和端口号转换为一个公共IP地址和端口号。这样,公共网络上的设备就可以响应这个请求,并将响应发送回NAT路由器,路由器再将响应转发给私有网络上的原始设备。 为了简单解释NAT(网络地址转换)模式的工作原理,我们可以考虑以下示