53、Flink Interval Join 代码示例

2024-06-24 11:20

本文主要是介绍53、Flink Interval Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、概述

interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join;

interval Join 算子的水位线会取两条流中水位线的最小值;

interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;

interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];

2、代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join* interval Join 算子的水位线会取两条流中水位线的最小值;* interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;* interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];*/
public class _04_IntervalInnerJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);OutputTag<Tuple2<String, Long>> leftLateTag = new OutputTag<Tuple2<String, Long>>("left-late") {};OutputTag<Tuple2<String, Long>> rightLateTag = new OutputTag<Tuple2<String, Long>>("right-late") {};// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));/*** left-1** a,1718089200000* b,1718089200000* c,1718089200000** interval_join_watermark=No Watermark** right-2** a,1718089201000* b,1718089201000* c,1718089201000** interval_join_watermark=1718089199999** res=:2> (a,1718089200000,1718089201000)* res=:1> (b,1718089200000,1718089201000)* res=:1> (c,1718089200000,1718089201000)** left-3** a,1718089203000* b,1718089203000* c,1718089203000** interval_join_watermark=1718089200999** right-4** a,1718089204000* b,1718089204000* c,1718089204000** interval_join_watermark=1718089202999** res=:2> (a,1718089203000,1718089204000)* res=:1> (b,1718089203000,1718089204000)* res=:1> (c,1718089203000,1718089204000)** left-right-5** a,1718089202000* b,1718089202000* c,1718089202000** left-late=:1> (b,1718089202000)* left-late=:2> (a,1718089202000)* left-late=:1> (c,1718089202000)* right-late=:1> (b,1718089202000)* right-late=:2> (a,1718089202000)* right-late=:1> (c,1718089202000)*/SingleOutputStreamOperator<Tuple3<String, Long, Long>> resStream = watermarkLeft.keyBy(e -> e.f0).intervalJoin(watermarkRight.keyBy(e -> e.f0)).between(Duration.ofSeconds(-1), Duration.ofSeconds(1)).sideOutputLeftLateData(leftLateTag).sideOutputRightLateData(rightLateTag).process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic void processElement(Tuple2<String, Long> t1, Tuple2<String, Long> t2, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>.Context context, Collector<Tuple3<String, Long, Long>> collector) throws Exception {collector.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}});resStream.print("res=");resStream.getSideOutput(leftLateTag).print("left-late=");resStream.getSideOutput(rightLateTag).print("right-late=");env.execute();}
}

3、测试用例

		  left-1a,1718089200000b,1718089200000c,1718089200000interval_join_watermark=No Watermarkright-2a,1718089201000b,1718089201000c,1718089201000interval_join_watermark=1718089199999res=:2> (a,1718089200000,1718089201000)res=:1> (b,1718089200000,1718089201000)res=:1> (c,1718089200000,1718089201000)left-3a,1718089203000b,1718089203000c,1718089203000interval_join_watermark=1718089200999right-4a,1718089204000b,1718089204000c,1718089204000interval_join_watermark=1718089202999res=:2> (a,1718089203000,1718089204000)res=:1> (b,1718089203000,1718089204000)res=:1> (c,1718089203000,1718089204000)left-right-5a,1718089202000b,1718089202000c,1718089202000left-late=:1> (b,1718089202000)left-late=:2> (a,1718089202000)left-late=:1> (c,1718089202000)right-late=:1> (b,1718089202000)right-late=:2> (a,1718089202000)right-late=:1> (c,1718089202000)

这篇关于53、Flink Interval Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1090005

相关文章

Django开发时如何避免频繁发送短信验证码(python图文代码)

《Django开发时如何避免频繁发送短信验证码(python图文代码)》Django开发时,为防止频繁发送验证码,后端需用Redis限制请求频率,结合管道技术提升效率,通过生产者消费者模式解耦业务逻辑... 目录避免频繁发送 验证码1. www.chinasem.cn避免频繁发送 验证码逻辑分析2. 避免频繁

精选20个好玩又实用的的Python实战项目(有图文代码)

《精选20个好玩又实用的的Python实战项目(有图文代码)》文章介绍了20个实用Python项目,涵盖游戏开发、工具应用、图像处理、机器学习等,使用Tkinter、PIL、OpenCV、Kivy等库... 目录① 猜字游戏② 闹钟③ 骰子模拟器④ 二维码⑤ 语言检测⑥ 加密和解密⑦ URL缩短⑧ 音乐播放

Python使用Tenacity一行代码实现自动重试详解

《Python使用Tenacity一行代码实现自动重试详解》tenacity是一个专为Python设计的通用重试库,它的核心理念就是用简单、清晰的方式,为任何可能失败的操作添加重试能力,下面我们就来看... 目录一切始于一个简单的 API 调用Tenacity 入门:一行代码实现优雅重试精细控制:让重试按我

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

Java Thread中join方法使用举例详解

《JavaThread中join方法使用举例详解》JavaThread中join()方法主要是让调用改方法的thread完成run方法里面的东西后,在执行join()方法后面的代码,这篇文章主要介绍... 目录前言1.join()方法的定义和作用2.join()方法的三个重载版本3.join()方法的工作原

MySQL CTE (Common Table Expressions)示例全解析

《MySQLCTE(CommonTableExpressions)示例全解析》MySQL8.0引入CTE,支持递归查询,可创建临时命名结果集,提升复杂查询的可读性与维护性,适用于层次结构数据处... 目录基本语法CTE 主要特点非递归 CTE简单 CTE 示例多 CTE 示例递归 CTE基本递归 CTE 结

Spring AI使用tool Calling和MCP的示例详解

《SpringAI使用toolCalling和MCP的示例详解》SpringAI1.0.0.M6引入ToolCalling与MCP协议,提升AI与工具交互的扩展性与标准化,支持信息检索、行动执行等... 目录深入探索 Spring AI聊天接口示例Function CallingMCPSTDIOSSE结束语

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到

PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例

《PyTorch中的词嵌入层(nn.Embedding)详解与实战应用示例》词嵌入解决NLP维度灾难,捕捉语义关系,PyTorch的nn.Embedding模块提供灵活实现,支持参数配置、预训练及变长... 目录一、词嵌入(Word Embedding)简介为什么需要词嵌入?二、PyTorch中的nn.Em