Apache Flink 事件时间处理和 Watermarks

2024-05-12 23:38

本文主要是介绍Apache Flink 事件时间处理和 Watermarks,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

扫码关注公众号免费阅读全文:冰山烈焰的黑板报
在这里插入图片描述

原文地址:Flink Event Time Processing and Watermarks

如果你正在搭建一个实时流程序,事件时间处理是你不久将不得不使用的一个功能之一。因为在现实世界中绝大多数用例的消息都是乱序的,你的系统应该有一个方法应对和处理可能延迟的消息。在这篇博客中,我们将会看到为什么我们需要事件时间处理和我们怎么在 Flink 中使用它。

EvenTime 是一个事件在现实世界中发生时的时间。ProcessingTime 是该事件被 Flink 处理时的时间。为了理解事件时间处理,我们先以一个基于处理时间的系统开始,看看它的缺点。

我们将创建一个大小10秒的滑动窗口(SlidingWindow),每5秒滑动一次。在窗口结束时,系统将提交在此期间收到的一些消息。一旦你理解事件时间处理和 SlidingWindow 的相关工作,就不难理解它和 TumblingWindow 是怎么一起使用的。让我们开始吧。

基于处理时间的系统

在这个例子中,我们期望的消息有这样的格式:vaue,timestamp,其中 value 是消息,timestamp 是这个消息在数据源生成时的时间。因为我们现在在搭建一个基于 ProcessingTime 的系统,下面的代码将忽略 timestamp 这部分。

理解消息生成时包含的信息是很重要的一个方面。Flink 或其他系统不是一个可以自己搞清楚这些的系统。稍后我们将看到事件时间处理提取 timestamp 信息用以处理延迟的消息。

val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)
counts.print
senv.execute("ProcessingTime processing example")

示例1:无延迟的消息

假设数据源分别在第13秒、第13秒和第16秒的时候,生成3条 a 类型的消息。(这里使用小时和分钟都可以,因为窗口的大小仅有10秒)。
在这里插入图片描述
这些消息将如下所示进入窗口中。在第13秒生成的前两个消息将进入 window1[5s-10s] 和 window2 [10s-20s],在第16秒生成的第三个消息将进入 window2 [10s-20s] 和 window3[15s-25s]。每个窗口提交后最后的统计值将分别是 (a, 2),(a, 3) 和 (a, 1)。
在这里插入图片描述
这个输出可以被看作是预期的结果。现在我们将看下其中一条消息延迟进入系统时,会发生什么。

示例2:有延迟消息

现在假设其中一条消息(在第13秒生成的)延迟6秒达到(第19秒),可能由于网络阻塞。你能猜出来这条消息会落入哪一个窗口吗?
在这里插入图片描述
这条延迟的消息会落入 window2 和 window3,因为 19秒在 10s-20s 和 15s-25s 中。它不会对 window2 的计算造成任何影响(因为这条消息不管怎样还是落入了该窗口),但是它会影响 window1 和 window3 的结果。我们现在通过使用事件时间处理来修复这个问题。

基于 EventTime 的系统

为了能够使用事件时间处理,我们需要一个提取消息中事件时间信息的提取器。注意消息的格式是 vaue,timestamp。extractTimestamp() 方法获取 timestamp 并作为 Long 类型返回。请先忽略 getCurrentWatermark() 方法,我们将稍后讨论它。

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {override def extractTimestamp(e: String, prevElementTimestamp: Long) = {e.split(",")(1).toLong }override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis)}
}

现在我们需要配置这个 timestamp 提取器,并且配置 TimeCharactersistic 为 EventTime。其余的代码和 ProcessingTime 的一样。

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999).assignTimestampsAndWatermarks(new TimestampExtractor) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) }.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)
counts.print
senv.execute("EventTime processing example")

上述代码的运行结果如下图所示:
在这里插入图片描述
这个结果看起来更好些,window2 和 window3 提交了正确的结果,但是 window1 还是错误的。Flink 不会把这条延迟的消息分配到 window3,因为它检测到这个消息的事件时间,知道它没有落入该窗口。但是,它为什么不分配这条消息到 window1 呢?原因是这条延迟的消息进入系统的时候,window1 已经计算完成了。现在让我们使用 Watermark 来解决这个问题。

注意在 window2 中,这条延迟的消息出现在第19秒的位置,而不是第13秒(它的事件时间)。图中的展示是有意表明在该窗口中的这条消息是不会根据它的事件时间排序的。(这或许在将来会有改变)。

Watermarks

Watermark 是非常重要且有趣的点子。我将尽力给你一个简要的概述。如果你有兴趣了解更多的信息,可以观看 Google 这个非常棒的演讲,以及阅读 dataArtisans 这篇博客。Watermark 本质上是一个时间戳。当 Flink 的一个算子接收到一个 Watermark 时,它知道(假设)不会再有比这个时间戳更晚的消息了(译者注:可以理解为不会再有比这个时间戳更早的事件时间的消息进入系统)。所以 Watermark 也可以被认为是告诉 Flink 在 EventTime 中有多远的一种方式。

就本例而言,可以把他看作是告诉 Flink 一条消息可以延迟多久的方式。最后一次尝试,我们现在设置 Watermark 为 current time - 5 seconds,告诉 Flink 期望消息的最大延时是5秒——这是因为只有当 Watermark 通过窗口时,窗口才会计算。由于我们的 Watermark 是 current time - 5 seconds,第一个窗口 [5s-15s] 将在第20秒的时候计算。类似地,窗口 [10s-20s] 将在第25秒的时候计算,以此类推。

override def getCurrentWatermark(): Watermark = { new Watermark(System.currentTimeMillis - 5000)}

这里我们假设 EventTime 比当前系统时间晚5秒,但并不总是这样。大多数情况下,最好保存到目前为止接收到的最大时间戳(从消息中提取的),然后减去预期的延迟。

修改之后的代码的运行结果:
在这里插入图片描述
最终,我们得到了正确的结果,三个窗口都按预期提交统计值——(a, 2),(a, 3) 和 (a, 1)。

Allowed Lateness

在我们使用 Watermark - delay 的早期方法中,窗口不会触发直到 Watermark 超过了 window_length + delay。如果你想适应延迟事件,并且希望窗口能够准时启动,您可以使用 Allowed Lateness。如果 Allowed Lateness 被设置了,Flink 将不会丢弃消息直到它超过 window_end_time + allowed lateness。一旦一条延迟的消息被接收,Flink 将会提取它的时间戳,并检查它是否在 allowed lateness 里,然后它会检查是否触发窗口(根据触发器集)。所以,注意这种方式中,一个窗口可能被多次触发,如果只需要一次处理,您可能希望使sink具有幂等性。

总结

实时流处理系统的重要性日益增长,必须处理延迟消息是构建此类系统的一部分。在这篇博客中,我们看到延迟的消息是怎样影响系统的结果,以及 Flink 的事件时间处理功能是怎么解决这些问题的。

这篇关于Apache Flink 事件时间处理和 Watermarks的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/984061

相关文章

SpringBoot结合Docker进行容器化处理指南

《SpringBoot结合Docker进行容器化处理指南》在当今快速发展的软件工程领域,SpringBoot和Docker已经成为现代Java开发者的必备工具,本文将深入讲解如何将一个SpringBo... 目录前言一、为什么选择 Spring Bootjavascript + docker1. 快速部署与

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

go中的时间处理过程

《go中的时间处理过程》:本文主要介绍go中的时间处理过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 获取当前时间2 获取当前时间戳3 获取当前时间的字符串格式4 相互转化4.1 时间戳转时间字符串 (int64 > string)4.2 时间字符串转时间

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

电脑提示xlstat4.dll丢失怎么修复? xlstat4.dll文件丢失处理办法

《电脑提示xlstat4.dll丢失怎么修复?xlstat4.dll文件丢失处理办法》长时间使用电脑,大家多少都会遇到类似dll文件丢失的情况,不过,解决这一问题其实并不复杂,下面我们就来看看xls... 在Windows操作系统中,xlstat4.dll是一个重要的动态链接库文件,通常用于支持各种应用程序

SQL Server数据库死锁处理超详细攻略

《SQLServer数据库死锁处理超详细攻略》SQLServer作为主流数据库管理系统,在高并发场景下可能面临死锁问题,影响系统性能和稳定性,这篇文章主要给大家介绍了关于SQLServer数据库死... 目录一、引言二、查询 Sqlserver 中造成死锁的 SPID三、用内置函数查询执行信息1. sp_w

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据