Flink之DataStream API的转换算子

2023-12-06 23:20

本文主要是介绍Flink之DataStream API的转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简单转换算子

函数的实现方式

  1. 自定义类,实现函数接口:编码麻烦,使用灵活
  2. 匿名内部类:编码简单
  3. Lambda:编码简洁
public class Flink02_FunctionImplement {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);env.socketTextStream("hadoop102",8888).flatMap((String line, Collector<Tuple2<String, Integer>> out)->{String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(0).sum(1).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {private String Operator;@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}}
}

Reduce规约聚合

  1. reduce:规约聚合
    • 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
    • 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
    • 方法: T reduce(T value1, T value2) throws Exception
      • value1:上一次的聚合值
      • value2:本次新到的值
public class Flink04_ReduceAggOpterator {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);ds.print("input================");//reduce:每个用户的点击次数ds.map(event->new WordCount(event.getUser(),1)).keyBy(WordCount::getWord).reduce(new ReduceFunction<WordCount>() {/**** @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出* @param value2 新来的值* @return* @throws Exception*/@Overridepublic WordCount reduce(WordCount value1, WordCount value2) throws Exception {System.out.println("测试");return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());}}).print("reduce");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

富函数

调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能

  • 函数:
    • 普通函数
      Map
      filter
      flatMap
      reduce
      富函数:基本上每个普通函数都有对应的富函数
      统一接口interface RichFunction extends Function
      具体使用的富函数类:
      - RichMapFunction
      - RichFilterFunction
      - RichFlatMapFunction
      - RichReduceFunction
      - …
      富函数功能:
      • 生命周期方法:
        • open(): 当前算子的每个并行子任务的实例创建时会调用一次
        • close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
      • 获取运行时上下文对象 getRuntimeContext
        • 可以获取当前作业,当前task的相关信息
        • 获取不同类型的状态,进行状态编程*
          getState | getListState | getReducingState | getMapState
public class Flink05_RichFunction {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat(),new Path("input/enents.txt")).build();DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");SingleOutputStreamOperator<Event> ds = fileDs.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] valueArr = value.split("");return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));}});ds.print("input================");ds.map(new RichMapFunction<Event, WordCount>() {/*** 生命周期open方法,当前算子实例创建时执行一次,只执行一次* @param parameters The configuration containing the parameters attached to the contract.* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建Redis的连接对象");}@Overridepublic WordCount map(Event value) throws Exception {System.out.println("每条数据执行一次");return new WordCount(value.getUser(),1);}/*** 生命周期的close方法* 当前算子实例销毁时执行一次* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("关闭连接对象");}}).print("map");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink之DataStream API的转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463724

相关文章

python通过curl实现访问deepseek的API

《python通过curl实现访问deepseek的API》这篇文章主要为大家详细介绍了python如何通过curl实现访问deepseek的API,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编... API申请和充值下面是deepeek的API网站https://platform.deepsee

C语言中的常见进制转换详解(从二进制到十六进制)

《C语言中的常见进制转换详解(从二进制到十六进制)》进制转换是计算机编程中的一个常见任务,特别是在处理低级别的数据操作时,C语言作为一门底层编程语言,在进制转换方面提供了灵活的操作方式,今天,我们将深... 目录1、进制基础2、C语言中的进制转换2.1 从十进制转换为其他进制十进制转二进制十进制转八进制十进

Pandas进行周期与时间戳转换的方法

《Pandas进行周期与时间戳转换的方法》本教程将深入讲解如何在pandas中使用to_period()和to_timestamp()方法,完成时间戳与周期之间的转换,并结合实际应用场景展示这些方法的... 目录to_period() 时间戳转周期基本操作应用示例to_timestamp() 周期转时间戳基

Java对接Dify API接口的完整流程

《Java对接DifyAPI接口的完整流程》Dify是一款AI应用开发平台,提供多种自然语言处理能力,通过调用Dify开放API,开发者可以快速集成智能对话、文本生成等功能到自己的Java应用中,本... 目录Java对接Dify API接口完整指南一、Dify API简介二、准备工作三、基础对接实现1.

一文详解如何在Vue3中封装API请求

《一文详解如何在Vue3中封装API请求》在现代前端开发中,API请求是不可避免的一部分,尤其是与后端交互时,下面我们来看看如何在Vue3项目中封装API请求,让你在实现功能时更加高效吧... 目录为什么要封装API请求1. vue 3项目结构2. 安装axIOS3. 创建API封装模块4. 封装API请求

使用Python开发Markdown兼容公式格式转换工具

《使用Python开发Markdown兼容公式格式转换工具》在技术写作中我们经常遇到公式格式问题,例如MathML无法显示,LaTeX格式错乱等,所以本文我们将使用Python开发Markdown兼容... 目录一、工具背景二、环境配置(Windows 10/11)1. 创建conda环境2. 获取XSLT

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Java对象转换的实现方式汇总

《Java对象转换的实现方式汇总》:本文主要介绍Java对象转换的多种实现方式,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录Java对象转换的多种实现方式1. 手动映射(Manual Mapping)2. Builder模式3. 工具类辅助映

python实现svg图片转换为png和gif

《python实现svg图片转换为png和gif》这篇文章主要为大家详细介绍了python如何实现将svg图片格式转换为png和gif,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录python实现svg图片转换为png和gifpython实现图片格式之间的相互转换延展:基于Py

C#实现将Excel表格转换为图片(JPG/ PNG)

《C#实现将Excel表格转换为图片(JPG/PNG)》Excel表格可能会因为不同设备或字体缺失等问题,导致格式错乱或数据显示异常,转换为图片后,能确保数据的排版等保持一致,下面我们看看如何使用C... 目录通过C# 转换Excel工作表到图片通过C# 转换指定单元格区域到图片知识扩展C# 将 Excel