详解 Flink 的 ProcessFunction API

2024-06-10 03:04

本文主要是介绍详解 Flink 的 ProcessFunction API,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、Flink 不同级别的 API

在这里插入图片描述

  • Flink 拥有易于使用的不同级别分层 API 使得它是一个非常易于开发的框架
  • 最底层的 API 仅仅提供了有状态流处理,它将处理函数(Process Function )嵌入到了 DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。
  • 核心 API(Core APIs),比如 DataStream API (用于处理有界或无界流数据)以及 DataSet API (用于处理有界数据集)在实际生产中一般使用较多。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。
  • Table API 是以表为中心的声明式编程,其中表在表达流数据时会动态变化。 Table API 遵循关系模型:表有二维数据结构(schema)(类似于关系数据库中的表),同时 API 提供可比较的操作,例如 select、join、group-by、aggregate 等。
  • Flink 提供的最高层级的抽象是 SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL 查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

二、ProcessFunction 介绍

  • 相较于 map、filter 和 window 等特定的具体的操作而言,Flink 在底层 API 中提炼出一个统一通用的 process 操作,它是所有转换算子的一个概括性的表达,可以在对应的接口中自定义处理逻辑,而这一层接口就被叫作“处理函数”(ProcessFunction)
  • 处理函数 (ProcessFunction) 提供了一个“定时服务”(TimerService),可以通过它访问流中的事件(event )、时间戳(timestamp )、水位线(watermark),甚至可以注册“定时事件”
  • 处理函数 (ProcessFunction) 继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息
  • 处理函数 (ProcessFunction) 可以直接将数据输出到侧输出流(side output)中
  • 所以,处理函数 (ProcessFunction) 是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础

三、常见的 ProcessFunction 类

  • ProcessFunction:最基本的处理函数,基于 DataStream 直接调用 process() 时作为参数传入
  • KeyedProcessFunction:对流按键分区后的处理函数,基于 KeyedStream 调用 process() 时作为参数传入
  • CoProcessFunction:合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process() 时作为参数传入
  • ProcessJoinFunction:间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process() 时作为参数传入
  • BroadcastProcessFunction:广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。“广播连接流” BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物
  • KeyedBroadcastProcessFunction:按键分区的广播连接流处理函数,基于 BroadcastConnectedStream 调用 process() 时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流, 是一个 KeyedStream 与广播流(BroadcastStream)做连接之后的产物
  • ProcessWindowFunction:KeyedStream 开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用 process() 时作为参数传入
  • ProcessAllWindowFunction:DataStream 开窗之后的处理函数,基于 AllWindowedStream 调用 process() 时作为参数传入

四、ProcessFunction API 实战

1. KeyedProcessFunction

1.1 解析
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {//1.两个核心方法://1.1 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context 可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context 还可以将结果输出到别的流(side outputs) public abstract void processElement(I value, Context ctx, Collector<O> out);//1.2 一个回调函数。当processElement中注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)public abstract void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out);//2.富函数的以下方法:open()/close()/getRuntimeContext()
}
1.2 ProcessFunction 的 Context
//Context的常用方法
context.timestamp(); //获取当前数据的时间戳
context.getCurrentKey(); //获取当前数据的 key
context.output(OutputTag<X> outputTag, X value); //输出侧输出流
context.timerService(); //获取 TimerService 对象
1.3 Timer 和 TimerService

ProcessFunction 的 Context 对象调用 timerService() 方法可以直接返回一个 TimerService 对象;定时器 Timer 只能在 KeyedStream 上面使用

//TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:
//获取当前的处理时间
long currentProcessingTime();//获取当前的水位线(事件时间)
long currentWatermark();//注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);//注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);//删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);//删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);
1.4 案例

需求:监控温度传感器的温度值,如果温度值在 10 秒钟之内 (processing time) 连续上升,则报警

public class ProcessFunctionCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});dataStream.keyBy("id").process(new TempContIncreWarning(10)).print();env.execute();}//自定义处理函数,用于监测一段时间内某个传感器温度值是否连续上升,输出报警信息public static class TempContIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {//定义私有属性:监测的时间间隔private Integer interval;public TempContIncreWarning(Integer interval) {this.interval = interval;}//定义两个值状态属性,分别保存上一次的温度值和定时器的时间戳private ValueState<Double> lastTempState;private ValueState<Long> timerTsState;@Overridepublic void open(Configuration parameters) throws Exception {lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));}@Overridepublic void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {//获取状态值Double lastTemp = lastTempState.value();Long timerTs = timerTsState.value();//如果上一次的温度值为null或者上一次的温度值小于当前温度值并且定时器为null则注册定时器if(lastTemp == null || (lastTemp != null && value.getTemperature() > lastTemp && timerTs == null)) {Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;ctx.timerService().registerProcessingTimeTimer(ts);timerTsState.update(ts);} else if(value.getTemperature() < lastTemp && timerTs != null) {//如果上一次的温度值大于当前温度值且定时器不为null则删除定时器,清空定时器值状态ctx.timerService().deleteProcessingTimeTimer(timerTs);timerTsState.clear();}//更新温度值状态lastTempState.update(value.getTemperature());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {//定时器触发则输出报警信息out.collect("传感器" + ctx.getCurrentKey().getField(0) + "的温度在" + interval + "s内连续上升");timerTsState.clear();}@Overridepublic void close() throws Exception {lastTempState.clear();}}
}

2. 侧输出流

监控传感器温度值,将温度值低于 30 度的数据输出到 side output

/**核心方法:ProcessFunction中的 Context 对象的 output(OutputTag<X> outputTag, X value)
*/
public class SideOutputCase {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> inputStream = env.socketTextStream("localhost", 7777);DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});//定义OutputTag,用来标记侧输出流的低温流OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp"){};//DataStream不做keyBy,使用ProcessFunction的侧输出流进行高低温分流SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>(){@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if(value.getTemperature() > 30) {//高温流,输出到主流out.collect(value);} else {//低温流,输出到侧输出流ctx.output(lowTempTag, value);}}});//高温流highTempStream.print("high-temp");//低温流highTempStream.getSideOutput(lowTempTag).print("low-temp");env.execute();}
}

这篇关于详解 Flink 的 ProcessFunction API的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1047046

相关文章

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

从基础到高级详解Python数值格式化输出的完全指南

《从基础到高级详解Python数值格式化输出的完全指南》在数据分析、金融计算和科学报告领域,数值格式化是提升可读性和专业性的关键技术,本文将深入解析Python中数值格式化输出的相关方法,感兴趣的小伙... 目录引言:数值格式化的核心价值一、基础格式化方法1.1 三种核心格式化方式对比1.2 基础格式化示例

Java中的stream流分组示例详解

《Java中的stream流分组示例详解》Java8StreamAPI以函数式风格处理集合数据,支持分组、统计等操作,可按单/多字段分组,使用String、Map.Entry或Java16record... 目录什么是stream流1、根据某个字段分组2、按多个字段分组(组合分组)1、方法一:使用 Stri

Spring创建Bean的八种主要方式详解

《Spring创建Bean的八种主要方式详解》Spring(尤其是SpringBoot)提供了多种方式来让容器创建和管理Bean,@Component、@Configuration+@Bean、@En... 目录引言一、Spring 创建 Bean 的 8 种主要方式1. @Component 及其衍生注解

Python异步编程之await与asyncio基本用法详解

《Python异步编程之await与asyncio基本用法详解》在Python中,await和asyncio是异步编程的核心工具,用于高效处理I/O密集型任务(如网络请求、文件读写、数据库操作等),接... 目录一、核心概念二、使用场景三、基本用法1. 定义协程2. 运行协程3. 并发执行多个任务四、关键

从基础到进阶详解Python条件判断的实用指南

《从基础到进阶详解Python条件判断的实用指南》本文将通过15个实战案例,带你大家掌握条件判断的核心技巧,并从基础语法到高级应用一网打尽,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录​引言:条件判断为何如此重要一、基础语法:三行代码构建决策系统二、多条件分支:elif的魔法三、

Java利用@SneakyThrows注解提升异常处理效率详解

《Java利用@SneakyThrows注解提升异常处理效率详解》这篇文章将深度剖析@SneakyThrows的原理,用法,适用场景以及隐藏的陷阱,看看它如何让Java异常处理效率飙升50%,感兴趣的... 目录前言一、检查型异常的“诅咒”:为什么Java开发者讨厌它1.1 检查型异常的痛点1.2 为什么说

MySQL的配置文件详解及实例代码

《MySQL的配置文件详解及实例代码》MySQL的配置文件是服务器运行的重要组成部分,用于设置服务器操作的各种参数,下面:本文主要介绍MySQL配置文件的相关资料,文中通过代码介绍的非常详细,需要... 目录前言一、配置文件结构1.[mysqld]2.[client]3.[mysql]4.[mysqldum