2,StructuredStreaming的事件时间和窗口操作

2023-10-09 03:58

本文主要是介绍2,StructuredStreaming的事件时间和窗口操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

推荐阅读:1,StructuredStreaming简介

使用Structured Streaming基于事件时间的滑动窗口的聚合操作是很简单的,很像分组聚合。在一个分组聚合操作中,聚合值被唯一保存在用户指定的列中。在基于窗口的聚合的情况下,对于行的事件时间的每个窗口,维护聚合值。

如前面的例子,我们运行wordcount操作,希望以10min窗口计算,每五分钟滑动一次窗口。也即,12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 这些十分钟窗口中进行单词统计。12:00 - 12:10意思是在12:00之后到达12:10之前到达的数据,比如一个单词在12:07收到。这个单词会影响12:00 - 12:10, 12:05 - 12:15两个窗口。

结果表将如下所示。

640?wx_fmt=jpeg&wxfrom=5&wx_lazy=1

import org.apache.spark.sql.streaming.Trigger
import java.sql.Timestamp
import org.apache.spark.sql.functions._
import spark.implicits._

val lines = spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 9999).option("includeTimestamp", true).load()

val words = lines.as[(String, Timestamp)].flatMap(line =>line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp")
val windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(window($"timestamp", "30 seconds", "15 seconds"), $"word").count()
val query = windowedCounts.writeStream.outputMode("Append").format("console").trigger(Trigger.ProcessingTime(5000)).option("truncate", "false").start()
query.awaitTermination()


推荐阅读:

Spark Structured Streaming高级特性

Spark Streaming 中管理 Kafka Offsets 的几种方式


640?wx_fmt=png

这篇关于2,StructuredStreaming的事件时间和窗口操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/170165

相关文章

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据