Flink多流转换(1)—— 分流合流

2024-01-24 06:36

本文主要是介绍Flink多流转换(1)—— 分流合流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

分流

代码示例

使用侧输出流

合流

联合(Union)

连接(Connect)


简单划分的话,多流转换可以分为“分流”和“合流”两大类

目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream

代码示例

调用.filter()方法进行筛选,将符合条件的数据拣选出来放到对应的流里

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 筛选Mary的浏览行为放入MaryStream流中DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Mary");}});// 筛选Bob的购买行为放入BobStream流中DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 筛选其他人的浏览行为放入elseStream流中DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob") ;}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}
}

缺点:上述操作将原始流复制了三份,对每一份分别进行筛选,因此代码冗余,不够高效

解决:①.split()方法(但限制了数据类型转换,已经废弃)

②测输出流

使用侧输出流

改进后的代码如下:

public class SplitStreamByOutputTag {// 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {@Overridepublic void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {if (value.user.equals("Mary")){ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));} else if (value.user.equals("Bob")){ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}

①定义OutputTag作为标签

②使用ctx.output方法将符合筛选条件的数据写入侧输出流

③使用getSideOutput方法从侧输出流中获得数据

合流

对于来源不同的多条流中的数据进行联合处理(与分流相比,合流操作更为普遍)

联合(Union)

直接将多条流合在一起(要求必须流中的数据类型必须相同),合并之后的新流会包括所有流中的元素,数据类型不变

操作:基于 DataStream 直接调用.union()方法

参数:其他 DataStream

返回值:一个 DataStream

stream1.union(stream2, stream3, ...)

水位线时效性:多流合并时处理的时效性是以最慢的那个流为准的(多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程)

代码示例:

public class UnionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream1.print("stream1");SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream2.print("stream2");// 合并两条流stream1.union(stream2).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {out.collect("水位线:" + ctx.timerService().currentWatermark());}}).print();env.execute();}
}

测试:

分别在两台机器上输入以下数据:

hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000

水位线的推进如下:

连接(Connect)

连接操作允许流的数据类型不同

连接流(ConnectedStreams)

连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的

要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型

代码实现

①基于一条 DataStream 调用.connect()方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams

②调用同处理方法得到 DataStream(可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法)

public class ConnectTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Integer> stream1 = env.fromElements(1,2,3);DataStream<Long> stream2 = env.fromElements(1L,2L,3L);ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {@Overridepublic String map1(Integer value) {return "Integer: " + value;}@Overridepublic String map2(Long value) {return "Long: " + value;}});result.print();env.execute();}
}

①ConnectedStreams有两个类型参数,分别是stream1和stream2的类型;

②map方法中实现了一个CoMapFunction,表示分别对两条流中的数据执行 map 操作

类型参数<IN1, IN2, OUT>,分别表示第一条流、第二条流,以及合并后的流中的数据类型

这里我们将一条 Integer 流和一条 Long 流合并,转换成 String 输出。所以当遇到第一条流输入的整型值时,调用.map1();而遇到第二条流输入的长整型数据时,调用.map2():最终都转换为字符串输出,合并成了一条字符串流

③补充:ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

传入的参数是两条流中各自的键选择器

这样的操作就是把两条流中key相同的数据放到了一起,然后针对来源的流各自进行处理;

同样也可以在合并之前先使用KeyBy进行分区,然后基于两条KeyedStream进行连接操作;

要注意两条流定义的键的类型必须相同,否则会抛出异常


CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)

例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction

CoProcessFunction源码如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}public abstract class Context {...}
...
}

简单示例:实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息

// 实时对账
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 来自app的支付日志SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(Tuple3.of("order-1", "app", 1000L),Tuple3.of("order-2", "app", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {return element.f2;}}));// 来自第三方支付平台的支付日志SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L),Tuple4.of("order-3", "third-party", "success", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {return element.f3;}}));// 检测同一支付单在两条流中是否匹配,不匹配就报警appStream.connect(thirdpartStream).keyBy(data -> data.f0, data -> data.f0).process(new OrderMatchResult()).print();env.execute();}// 自定义实现CoProcessFunctionpublic static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{// 定义状态变量,用来保存已经到达的事件private ValueState<Tuple3<String, String, Long>> appEventState;private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;@Overridepublic void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG)));}@Overridepublic void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() != null){out.collect("对账成功:" + value + "  " + thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个5秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);}}@Overridepublic void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {if (appEventState.value() != null){out.collect("对账成功:" + appEventState.value() + "  " + value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个5秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来if (appEventState.value() != null) {out.collect("对账失败:" + appEventState.value() + "  " + "第三方支付平台信息未到");}if (thirdPartyEventState.value() != null) {out.collect("对账失败:" + thirdPartyEventState.value() + "  " + "app信息未到");}appEventState.clear();thirdPartyEventState.clear();}}}

运行结果如下:

运行结果解析:
①在CoProcessFunction的实现中,声明了两个状态变量用来保存App的支付信息和第三方的支付信息

②App支付信息到达之后,触发processElement1中的操作,检查第三方的支付信息是否已经到达(如果先到达会保存在相应的状态变量中);如果已经到达,则对账成功;如果没有到达,则等待5s,仍未到达则对账失败;

③第三方支付信息到达后,流程同②

④对于order-1,时间戳为1000的数据(App)到达后,第三方支付信息未到达,等待5s,接着时间戳未3000的数据(第三方)到达后,发现App支付信息已经到达,因此对账成功

⑤对于order-2和order-3,均是等待5s后没有检测到App(第三方)数据到达而发出报警信息

广播连接流(BroadcastConnectedStream)

DataStream 调用.connect()方法时可以传入一个广播流(BroadcastConnectedStream)

这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)

如何创建广播流


基于DataStream调用.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor),说明状态的名称和类型;

因为广播状态底层是用一个“映射”(map)结构来保存的

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

数据流和广播流的连接

得到“广播连接流”(BroadcastConnectedStream),然后基于广播连接流调用.process()方法,就可以同时获取规则和数据,进行动态处理

DataStream<String> output = stream.connect(ruleBroadcastStream).process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction

BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement().processBroadcastElement()

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili 

这篇关于Flink多流转换(1)—— 分流合流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/638799

相关文章

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

在Java中将XLS转换为XLSX的实现方案

《在Java中将XLS转换为XLSX的实现方案》在本文中,我们将探讨传统ExcelXLS格式与现代XLSX格式的结构差异,并为Java开发者提供转换方案,通过了解底层原理、性能优势及实用工具,您将掌握... 目录为什么升级XLS到XLSX值得投入?实际转换过程解析推荐技术方案对比Apache POI实现编程

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Python使用FFmpeg实现高效音频格式转换工具

《Python使用FFmpeg实现高效音频格式转换工具》在数字音频处理领域,音频格式转换是一项基础但至关重要的功能,本文主要为大家介绍了Python如何使用FFmpeg实现强大功能的图形化音频转换工具... 目录概述功能详解软件效果展示主界面布局转换过程截图完成提示开发步骤详解1. 环境准备2. 项目功能结

使用Python实现网页表格转换为markdown

《使用Python实现网页表格转换为markdown》在日常工作中,我们经常需要从网页上复制表格数据,并将其转换成Markdown格式,本文将使用Python编写一个网页表格转Markdown工具,需... 在日常工作中,我们经常需要从网页上复制表格数据,并将其转换成Markdown格式,以便在文档、邮件或

Python将字符串转换为小写字母的几种常用方法

《Python将字符串转换为小写字母的几种常用方法》:本文主要介绍Python中将字符串大写字母转小写的四种方法:lower()方法简洁高效,手动ASCII转换灵活可控,str.translate... 目录一、使用内置方法 lower()(最简单)二、手动遍历 + ASCII 码转换三、使用 str.tr

Java如何将文件内容转换为MD5哈希值

《Java如何将文件内容转换为MD5哈希值》:本文主要介绍Java如何将文件内容转换为MD5哈希值的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java文件内容转换为MD5哈希值一个完整的Java示例代码代码解释注意事项总结Java文件内容转换为MD5

使用Java将实体类转换为JSON并输出到控制台的完整过程

《使用Java将实体类转换为JSON并输出到控制台的完整过程》在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用JSON格式,用Java将实体类转换为J... 在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用j

Java实现视频格式转换的完整指南

《Java实现视频格式转换的完整指南》在Java中实现视频格式的转换,通常需要借助第三方工具或库,因为视频的编解码操作复杂且性能需求较高,以下是实现视频格式转换的常用方法和步骤,需要的朋友可以参考下... 目录核心思路方法一:通过调用 FFmpeg 命令步骤示例代码说明优点方法二:使用 Jaffree(FF