53、Flink Interval Join 代码示例

2024-06-24 11:20

本文主要是介绍53、Flink Interval Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、概述

interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join;

interval Join 算子的水位线会取两条流中水位线的最小值;

interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;

interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];

2、代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join* interval Join 算子的水位线会取两条流中水位线的最小值;* interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;* interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];*/
public class _04_IntervalInnerJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);OutputTag<Tuple2<String, Long>> leftLateTag = new OutputTag<Tuple2<String, Long>>("left-late") {};OutputTag<Tuple2<String, Long>> rightLateTag = new OutputTag<Tuple2<String, Long>>("right-late") {};// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));/*** left-1** a,1718089200000* b,1718089200000* c,1718089200000** interval_join_watermark=No Watermark** right-2** a,1718089201000* b,1718089201000* c,1718089201000** interval_join_watermark=1718089199999** res=:2> (a,1718089200000,1718089201000)* res=:1> (b,1718089200000,1718089201000)* res=:1> (c,1718089200000,1718089201000)** left-3** a,1718089203000* b,1718089203000* c,1718089203000** interval_join_watermark=1718089200999** right-4** a,1718089204000* b,1718089204000* c,1718089204000** interval_join_watermark=1718089202999** res=:2> (a,1718089203000,1718089204000)* res=:1> (b,1718089203000,1718089204000)* res=:1> (c,1718089203000,1718089204000)** left-right-5** a,1718089202000* b,1718089202000* c,1718089202000** left-late=:1> (b,1718089202000)* left-late=:2> (a,1718089202000)* left-late=:1> (c,1718089202000)* right-late=:1> (b,1718089202000)* right-late=:2> (a,1718089202000)* right-late=:1> (c,1718089202000)*/SingleOutputStreamOperator<Tuple3<String, Long, Long>> resStream = watermarkLeft.keyBy(e -> e.f0).intervalJoin(watermarkRight.keyBy(e -> e.f0)).between(Duration.ofSeconds(-1), Duration.ofSeconds(1)).sideOutputLeftLateData(leftLateTag).sideOutputRightLateData(rightLateTag).process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic void processElement(Tuple2<String, Long> t1, Tuple2<String, Long> t2, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>.Context context, Collector<Tuple3<String, Long, Long>> collector) throws Exception {collector.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}});resStream.print("res=");resStream.getSideOutput(leftLateTag).print("left-late=");resStream.getSideOutput(rightLateTag).print("right-late=");env.execute();}
}

3、测试用例

		  left-1a,1718089200000b,1718089200000c,1718089200000interval_join_watermark=No Watermarkright-2a,1718089201000b,1718089201000c,1718089201000interval_join_watermark=1718089199999res=:2> (a,1718089200000,1718089201000)res=:1> (b,1718089200000,1718089201000)res=:1> (c,1718089200000,1718089201000)left-3a,1718089203000b,1718089203000c,1718089203000interval_join_watermark=1718089200999right-4a,1718089204000b,1718089204000c,1718089204000interval_join_watermark=1718089202999res=:2> (a,1718089203000,1718089204000)res=:1> (b,1718089203000,1718089204000)res=:1> (c,1718089203000,1718089204000)left-right-5a,1718089202000b,1718089202000c,1718089202000left-late=:1> (b,1718089202000)left-late=:2> (a,1718089202000)left-late=:1> (c,1718089202000)right-late=:1> (b,1718089202000)right-late=:2> (a,1718089202000)right-late=:1> (c,1718089202000)

这篇关于53、Flink Interval Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1090005

相关文章

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java高效实现PowerPoint转PDF的示例详解

《Java高效实现PowerPoint转PDF的示例详解》在日常开发或办公场景中,经常需要将PowerPoint演示文稿(PPT/PPTX)转换为PDF,本文将介绍从基础转换到高级设置的多种用法,大家... 目录为什么要将 PowerPoint 转换为 PDF安装 Spire.Presentation fo

Java集合之Iterator迭代器实现代码解析

《Java集合之Iterator迭代器实现代码解析》迭代器Iterator是Java集合框架中的一个核心接口,位于java.util包下,它定义了一种标准的元素访问机制,为各种集合类型提供了一种统一的... 目录一、什么是Iterator二、Iterator的核心方法三、基本使用示例四、Iterator的工

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

Python中isinstance()函数原理解释及详细用法示例

《Python中isinstance()函数原理解释及详细用法示例》isinstance()是Python内置的一个非常有用的函数,用于检查一个对象是否属于指定的类型或类型元组中的某一个类型,它是Py... 目录python中isinstance()函数原理解释及详细用法指南一、isinstance()函数

python中的高阶函数示例详解

《python中的高阶函数示例详解》在Python中,高阶函数是指接受函数作为参数或返回函数作为结果的函数,下面:本文主要介绍python中高阶函数的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录1.定义2.map函数3.filter函数4.reduce函数5.sorted函数6.自定义高阶函数

JS纯前端实现浏览器语音播报、朗读功能的完整代码

《JS纯前端实现浏览器语音播报、朗读功能的完整代码》在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环,下面:本文主要介绍JS纯前端实现浏览器语音播报、朗读功能的相关资料,文中通过代码... 目录一、朗读单条文本:① 语音自选参数,按钮控制语音:② 效果图:二、朗读多条文本:① 语音有默认值:②

Vue实现路由守卫的示例代码

《Vue实现路由守卫的示例代码》Vue路由守卫是控制页面导航的钩子函数,主要用于鉴权、数据预加载等场景,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、概念二、类型三、实战一、概念路由守卫(Navigation Guards)本质上就是 在路

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni