51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例

2024-06-20 11:20

本文主要是介绍51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、概述

窗口中的水位线取的是两条流中的最小值;

一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出;

2、代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import java.time.Duration;/*** 注意:* <p>* 窗口中的水位线取的是两条流中的最小值;* 一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出;*/
public class _02_WindowSlidingEventJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));watermarkLeft.join(watermarkRight).where(e -> e.f0).equalTo(e -> e.f0).window(SlidingEventTimeWindows.of(Duration.ofSeconds(6), Duration.ofSeconds(3))).apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic Tuple3<String, Long, Long> join(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return new Tuple3<>(t1.f0, t1.f1, t2.f1);}}).print();env.execute();}
}

3、测试用例

		  left-1a,1718089200000b,1718089200000c,1718089200000left-watermark=1718089199999window-watermark=no_watermarkright-2a,1718089201000b,1718089201000c,1718089201000right-watermark=1718089200999window-watermark=1718089199999[两条流中最小的]left-3a,1718089204000b,1718089204000c,1718089204000left-watermark=1718089203999window-watermark=1718089200999right-4a,1718089205000b,1718089205000c,1718089205000right-watermark=1718089204999window-watermark=1718089203999res:[1718089197000~1718089203000]2> (a,1718089200000,1718089201000)1> (b,1718089200000,1718089201000)1> (c,1718089200000,1718089201000)left-5a,1718089209000b,1718089209000c,1718089209000left-watermark=1718089208999window-watermark=1718089204999right-6a,1718089209000b,1718089209000c,1718089209000right-watermark=1718089208999window-watermark=1718089208999res[1718089200000~1718089206000]2> (a,1718089200000,1718089201000)2> (a,1718089200000,1718089205000)2> (a,1718089204000,1718089201000)2> (a,1718089204000,1718089205000)1> (b,1718089200000,1718089201000)1> (b,1718089200000,1718089205000)1> (b,1718089204000,1718089201000)1> (b,1718089204000,1718089205000)1> (c,1718089200000,1718089201000)1> (c,1718089200000,1718089205000)1> (c,1718089204000,1718089201000)1> (c,1718089204000,1718089205000)res[1718089203000~1718089209000]2> (a,1718089204000,1718089205000)1> (b,1718089204000,1718089205000)1> (c,1718089204000,1718089205000)

这篇关于51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077986

相关文章

Python的Darts库实现时间序列预测

《Python的Darts库实现时间序列预测》Darts一个集统计、机器学习与深度学习模型于一体的Python时间序列预测库,本文主要介绍了Python的Darts库实现时间序列预测,感兴趣的可以了解... 目录目录一、什么是 Darts?二、安装与基本配置安装 Darts导入基础模块三、时间序列数据结构与

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

QT Creator配置Kit的实现示例

《QTCreator配置Kit的实现示例》本文主要介绍了使用Qt5.12.12与VS2022时,因MSVC编译器版本不匹配及WindowsSDK缺失导致配置错误的问题解决,感兴趣的可以了解一下... 目录0、背景:qt5.12.12+vs2022一、症状:二、原因:(可以跳过,直奔后面的解决方法)三、解决方