15、Flink 的广播状态 (Broadcast State) 详解

2024-05-07 06:20

本文主要是介绍15、Flink 的广播状态 (Broadcast State) 详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、广播状态 (Broadcast State)

广播状态是一种特殊的算子状态,支持将一个流中的元素需要广播到所有下游任务的使用情形,广播状态用于保持所有子任务状态相同

2、广播状态和其他算子状态的区别

  • 它具有 map 格式,
  • 它仅在一些特殊的算子中可用,这些算子的输入为一个广播数据流和非广播数据流,
  • 这类算子可以拥有不同命名的多个广播状态
3、广播状态 API

案例:存在一个序列,序列中的元素是具有不同颜色与形状的图形,希望在序列里相同颜色的图形中寻找满足一定顺序模式的图形对(比如在红色的图形里,有一个长方形跟着一个三角形)同时希望寻找的模式也会随着时间而改变。

定义两个流,一个流包含图形(Item),具有颜色形状两个属性;另一个流包含特定的规则(Rule),代表希望寻找的模式,在图形流中,首先使用颜色将流进行进行分区(keyBy),确保相同颜色的图形会流转到相同的物理机上。

// 将图形使用颜色进行划分
KeyedStream<Item, Color> colorPartitionedStream = itemStream.keyBy(new KeySelector<Item, Color>(){...});

规则流应该被广播到所有的下游 task 中,下游 task 应当存储这些规则并根据它寻找满足规则的图形对。

  • 规则广播给所有下游 task;
  • 使用 MapStateDescriptor 来描述并创建 broadcast state 在下游的存储结构。
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<Rule>() {}));// 广播流,广播规则并且创建 broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

使用规则来筛选图形序列

  • 将两个流关联起来
  • 完成模式识别逻辑

为关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入;这个方法的返回参数是 BroadcastConnectedStream,具有 process()方法,可以传入一个特殊的 CoProcessFunction 来编辑模式识别逻辑,具体传入 process() 的是哪个类型取决于非广播流的类型

  • 如果流是一个 keyed 流,那就是 KeyedBroadcastProcessFunction 类型;
  • 如果流是一个 non-keyed 流,那就是 BroadcastProcessFunction 类型。
DataStream<String> output = colorPartitionedStream.connect(ruleBroadcastStream).process(// KeyedBroadcastProcessFunction 中的类型参数表示://   1. key stream 中的 key 类型//   2. 非广播流中的元素类型//   3. 广播流中的元素类型//   4. 结果的类型,在这里是 stringnew KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {// 模式匹配逻辑});
4、BroadcastProcessFunction 和 KeyedBroadcastProcessFunction

在传入的 BroadcastProcessFunctionKeyedBroadcastProcessFunction 中,需要实现两个方法;processBroadcastElement() 负责处理广播流中的元素,processElement() 负责处理非广播流中的元素。

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

注意processBroadcastElement() 处理广播流的元素, processElement() 处理另一个流的元素,两个方法的第二个参数(Context)不同。

得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
查询元素的时间戳:ctx.timestamp()
查询目前的Watermark:ctx.currentWatermark()
目前的处理时间(processing time):ctx.currentProcessingTime()
产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)

getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同。

注意:对于 broadcast state 的访问权限,在处理广播流元素这端,是具有读写权限的,而对于处理非广播流元素这端是只读的;因为 Flink 中是不存在跨 task 通讯的,为了保证 broadcast state 在所有的并发实例中是一致的,在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的, 那么最终所有 task 得到的 broadcast state 是一致的。

processBroadcastElement() 的实现必须在所有的并发实例中具有确定性的结果。

KeyedBroadcastProcessFunction 在 Keyed Stream 上工作,提供了一些 BroadcastProcessFunction 没有的功能:

1.processElement() 的参数 ReadOnlyContext 提供了方法访问 Flink 的定时器服务,可以注册事件时间定时器(event-time timer)或处理时间定时器(processing-time timer);当定时器触发时,会调用 onTimer() 方法, 提供了 OnTimerContext,它具有 ReadOnlyContext 的全部功能,并且提供:

  • 查询当前触发的是一个事件时间还是处理时间的定时器
  • 查询定时器关联的key

2.processBroadcastElement() 方法中的参数 Context 会提供方法 applyToKeyedState(StateDescriptor stateDescriptor, KeyedStateFunction function);这个方法使用一个 KeyedStateFunction 能够对 stateDescriptor 对应的 state 中所有 key 的存储状态进行某些操作。

注册定时器只能在 KeyedBroadcastProcessFunctionprocessElement() 方法中进行,在 processBroadcastElement() 方法中不能注册定时器,因为广播的元素中并没有关联的 key。

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素// 用一个数组来存储,因为同时可能有很多第一个元素正在等待private final MapStateDescriptor<String, List<Item>> mapStateDesc =new MapStateDescriptor<>("items",BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo<>(Item.class));// 与之前的 ruleStateDescriptor 相同private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<Rule>() {}));@Overridepublic void processBroadcastElement(Rule value,Context ctx,Collector<String> out) throws Exception {ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);}@Overridepublic void processElement(Item value,ReadOnlyContext ctx,Collector<String> out) throws Exception {final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);final Shape shape = value.getShape();for (Map.Entry<String, Rule> entry :ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {final String ruleName = entry.getKey();final Rule rule = entry.getValue();List<Item> stored = state.get(ruleName);if (stored == null) {stored = new ArrayList<>();}if (shape == rule.second && !stored.isEmpty()) {for (Item i : stored) {out.collect("MATCH: " + i + " - " + value);}stored.clear();}// 不需要额外的 else{} 段来考虑 rule.first == rule.second 的情况if (shape.equals(rule.first)) {stored.add(value);}if (stored.isEmpty()) {state.remove(ruleName);} else {state.put(ruleName, stored);}}}
}
3.注意事项
  • 没有跨 task 通讯:只有在 (Keyed)-BroadcastProcessFunction 中处理广播流元素的方法里可以更改 broadcast state 的内容;同时,需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。
  • broadcast state 在不同的 task 的事件顺序可能是不同的:虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同,所以 broadcast state 的更新不能依赖于流中元素到达的顺序
  • 所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint;防止在作业恢复后读文件造成的文件热点;Flink 会保证在恢复状态/改变并发的时候数据没有重复没有缺失,在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state,在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。
  • 不使用 RocksDB state backend: broadcast state 在运行时保存在内存中,需保证内存充足,同样适用于其它 Operator State。

这篇关于15、Flink 的广播状态 (Broadcast State) 详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966562

相关文章

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

MyBatis常用XML语法详解

《MyBatis常用XML语法详解》文章介绍了MyBatis常用XML语法,包括结果映射、查询语句、插入语句、更新语句、删除语句、动态SQL标签以及ehcache.xml文件的使用,感兴趣的朋友跟随小... 目录1、定义结果映射2、查询语句3、插入语句4、更新语句5、删除语句6、动态 SQL 标签7、ehc

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

一文详解Python如何开发游戏

《一文详解Python如何开发游戏》Python是一种非常流行的编程语言,也可以用来开发游戏模组,:本文主要介绍Python如何开发游戏的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、python简介二、Python 开发 2D 游戏的优劣势优势缺点三、Python 开发 3D

Redis 基本数据类型和使用详解

《Redis基本数据类型和使用详解》String是Redis最基本的数据类型,一个键对应一个值,它的功能十分强大,可以存储字符串、整数、浮点数等多种数据格式,本文给大家介绍Redis基本数据类型和... 目录一、Redis 入门介绍二、Redis 的五大基本数据类型2.1 String 类型2.2 Hash

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input