Flink中的时间语义与Watermark概念

2023-12-25 08:38

本文主要是介绍Flink中的时间语义与Watermark概念,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、时间语义

1.1 时间语义类型

在这里插入图片描述

  • Event Time:事件创建的时间
  • Ingestion Time:数据进入Flink的时间
  • Processing Time:执行操作算子的本地系统时间,与机器相关

问题:哪种时间语义更重要?

不同的时间语义有不同的应用场合,通常更关心的是事件时间
在这里插入图片描述
某些应用场合,不应该使用Processing Time。Event Time可以从日志数据的时间戳(timestamp)中提取
在这里插入图片描述

1.2 实际应用

public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});env.execute();}
}

二、水位线(Watermark)

乱序数据的影响

  • 当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子
  • 由于网络、分布式等原因,会导致乱序数据的产生
    在这里插入图片描述

怎样避免乱序数据带来的计算不正确问题呢?

遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口

  • Watermark是一种衡量Event Time进展的机制,可以设定延迟触发
  • Watermark是用于处理乱序事件的,处理乱序事件正确的方法,通常是用Watermark机制结合window来实现
  • 数据流中的Watermark用于表示 timestamp小于Watermark 的数据,都已经到达了。因此,window的执行也是由Watermark触发的
  • watermark用来让程序自己平衡延迟和结果正确性

2.1 Flink三种方法保证数据准确性(三重保证)

(1)Watermark,可以保证 几百毫秒内 的乱序数据的准确性
(2)在(1)的基础上,可以再使用 allowedLateness 设置等待时间
(3)在(2)的基础上,可以再使用侧输出流

2.2 Watermark的特点

  • Watermark是一条特殊的数据记录
  • Watermark必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退
  • Watermark与数据的时间戳相关
    在这里插入图片描述

2.3 Watermark的传递

多个分区Watermark不同时,取最小值的Watermark,再将新的Watermark广播给下游算子;Watermark不更新时,不用广播
在这里插入图片描述

2.4 Watermark的引入

Event Time的使用一定要指定数据源中的时间戳;
调用dataStream.assignTimestampsAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,即可指定Watermark

public class WindowTest4_UDFTimeStampAssigner {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//升序数据设置事件时间和watermarkdataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});//乱序数据设置时间戳和watermark//BoundedOutOfOrdernessTimestampExtractor 有界乱序时间戳提取器dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading sensorReading) {return sensorReading.getTimestamp() * 1000L;}});env.execute();}
}

自定义的周期分配器:

public static class MyPeriodicAssigner implements AssignerWithPeriodicWatermarks<SensorReading>{private Long bound = 60 * 1000L; //延迟一分钟private Long maxTs = Long.MIN_VALUE; //当前最大时间戳@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(maxTs - bound);}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {maxTs = Math.max(maxTs, element.getTimestamp());return element.getTimestamp();}
}

自定义的断点分配器:

//断点分配器
public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReading>{private Long bound = 60 * 1000L; //延迟一分钟@Nullable@Overridepublic Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractedTimestamp) {if(lastElement.getId().equals("sensor_1")){return new Watermark(extractedTimestamp - bound);}else {return null;}}@Overridepublic long extractTimestamp(SensorReading element, long previousElementTimestamp) {return element.getTimestamp();}
}

2.5 Watermark的设定原则

  • 在Flink中,watermark由应用程序开发人员生成,这种通常需要对相应的领域有一定的了解
  • 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果
  • 如果watermark到达太早,则可能使收到错误结果,不过Flink处理迟到数据的机制可以解决这个问题

事件时间语义下的窗口测试代码1:

public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermark// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});/*   //升序数据设置事件时间和watermarkdataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});*///乱序数据设置时间戳和watermark//BoundedOutOfOrdernessTimestampExtractor 有界乱序时间戳提取器dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading sensorReading) {return sensorReading.getTimestamp() * 1000L;}});// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream  = dataStream.keyBy("id").timeWindow(Time.seconds(15)).minBy("temperature");minTempStream.print("minTemp");env.execute();}
}

事件时间语义下的窗口测试代码2----迟到数据处理:

public class WindowTest3_EventTimeWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //默认是处理时间// socket文本流DataStream<String> inputStream = env.socketTextStream("localhost", 7777);// 转换成SensorReading类型,分配时间戳和watermark// java8 中的lamda表达式DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});/*   //升序数据设置事件时间和watermarkdataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {@Overridepublic long extractAscendingTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});*///乱序数据设置时间戳和watermark//BoundedOutOfOrdernessTimestampExtractor 有界乱序时间戳提取器dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading sensorReading) {return sensorReading.getTimestamp() * 1000L;}});OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {};// 基于事件时间的开窗聚合,统计15秒内温度的最小值SingleOutputStreamOperator<SensorReading> minTempStream  = dataStream.keyBy("id").timeWindow(Time.seconds(15)).allowedLateness(Time.minutes(1)).sideOutputLateData(outputTag).minBy("temperature");minTempStream.print("minTemp");minTempStream.getSideOutput(outputTag).print("late");env.execute();}
}

这篇关于Flink中的时间语义与Watermark概念的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/534895

相关文章

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

MySQL 事务的概念及ACID属性和使用详解

《MySQL事务的概念及ACID属性和使用详解》MySQL通过多线程实现存储工作,因此在并发访问场景中,事务确保了数据操作的一致性和可靠性,下面通过本文给大家介绍MySQL事务的概念及ACID属性和... 目录一、什么是事务二、事务的属性及使用2.1 事务的 ACID 属性2.2 为什么存在事务2.3 事务

利用Python实现时间序列动量策略

《利用Python实现时间序列动量策略》时间序列动量策略作为量化交易领域中最为持久且被深入研究的策略类型之一,其核心理念相对简明:对于显示上升趋势的资产建立多头头寸,对于呈现下降趋势的资产建立空头头寸... 目录引言传统策略面临的风险管理挑战波动率调整机制:实现风险标准化策略实施的技术细节波动率调整的战略价

Python日期和时间完全指南与实战

《Python日期和时间完全指南与实战》在软件开发领域,‌日期时间处理‌是贯穿系统设计全生命周期的重要基础能力,本文将深入解析Python日期时间的‌七大核心模块‌,通过‌企业级代码案例‌揭示最佳实践... 目录一、背景与核心价值二、核心模块详解与实战2.1 datetime模块四剑客2.2 时区处理黄金法

macOS Sequoia 15.5 发布: 改进邮件和屏幕使用时间功能

《macOSSequoia15.5发布:改进邮件和屏幕使用时间功能》经过常规Beta测试后,新的macOSSequoia15.5现已公开发布,但重要的新功能将被保留到WWDC和... MACOS Sequoia 15.5 正式发布!本次更新为 Mac 用户带来了一系列功能强化、错误修复和安全性提升,进一步增

Pandas进行周期与时间戳转换的方法

《Pandas进行周期与时间戳转换的方法》本教程将深入讲解如何在pandas中使用to_period()和to_timestamp()方法,完成时间戳与周期之间的转换,并结合实际应用场景展示这些方法的... 目录to_period() 时间戳转周期基本操作应用示例to_timestamp() 周期转时间戳基

JavaScript时间戳与时间的转化常用方法

《JavaScript时间戳与时间的转化常用方法》在JavaScript中,时间戳(Timestamp)通常指Unix时间戳,即从1970年1月1日00:00:00UTC到某个时间点经过的毫秒数,下面... 目录1. 获取当前时间戳2. 时间戳 → 时间对象3. 时间戳php → 格式化字符串4. 时间字符