Flink之DataStream API的转换算子

2023-12-06 23:20

本文主要是介绍Flink之DataStream API的转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简单转换算子

函数的实现方式

  1. 自定义类,实现函数接口:编码麻烦,使用灵活
  2. 匿名内部类:编码简单
  3. Lambda:编码简洁
public class Flink02_FunctionImplement {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);env.socketTextStream("hadoop102",8888).flatMap((String line, Collector<Tuple2<String, Integer>> out)->{String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(0).sum(1).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {private String Operator;@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}}
}

Reduce规约聚合

  1. reduce:规约聚合
    • 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
    • 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
    • 方法: T reduce(T value1, T value2) throws Exception
      • value1:上一次的聚合值
      • value2:本次新到的值
public class Flink04_ReduceAggOpterator {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);ds.print("input================");//reduce:每个用户的点击次数ds.map(event->new WordCount(event.getUser(),1)).keyBy(WordCount::getWord).reduce(new ReduceFunction<WordCount>() {/**** @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出* @param value2 新来的值* @return* @throws Exception*/@Overridepublic WordCount reduce(WordCount value1, WordCount value2) throws Exception {System.out.println("测试");return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());}}).print("reduce");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

富函数

调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能

  • 函数:
    • 普通函数
      Map
      filter
      flatMap
      reduce
      富函数:基本上每个普通函数都有对应的富函数
      统一接口interface RichFunction extends Function
      具体使用的富函数类:
      - RichMapFunction
      - RichFilterFunction
      - RichFlatMapFunction
      - RichReduceFunction
      - …
      富函数功能:
      • 生命周期方法:
        • open(): 当前算子的每个并行子任务的实例创建时会调用一次
        • close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
      • 获取运行时上下文对象 getRuntimeContext
        • 可以获取当前作业,当前task的相关信息
        • 获取不同类型的状态,进行状态编程*
          getState | getListState | getReducingState | getMapState
public class Flink05_RichFunction {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat(),new Path("input/enents.txt")).build();DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");SingleOutputStreamOperator<Event> ds = fileDs.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] valueArr = value.split("");return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));}});ds.print("input================");ds.map(new RichMapFunction<Event, WordCount>() {/*** 生命周期open方法,当前算子实例创建时执行一次,只执行一次* @param parameters The configuration containing the parameters attached to the contract.* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建Redis的连接对象");}@Overridepublic WordCount map(Event value) throws Exception {System.out.println("每条数据执行一次");return new WordCount(value.getUser(),1);}/*** 生命周期的close方法* 当前算子实例销毁时执行一次* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("关闭连接对象");}}).print("map");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink之DataStream API的转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463724

相关文章

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

利用Python脚本实现批量将图片转换为WebP格式

《利用Python脚本实现批量将图片转换为WebP格式》Python语言的简洁语法和库支持使其成为图像处理的理想选择,本文将介绍如何利用Python实现批量将图片转换为WebP格式的脚本,WebP作为... 目录简介1. python在图像处理中的应用2. WebP格式的原理和优势2.1 WebP格式与传统

HTML5 getUserMedia API网页录音实现指南示例小结

《HTML5getUserMediaAPI网页录音实现指南示例小结》本教程将指导你如何利用这一API,结合WebAudioAPI,实现网页录音功能,从获取音频流到处理和保存录音,整个过程将逐步... 目录1. html5 getUserMedia API简介1.1 API概念与历史1.2 功能与优势1.3

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

在Java中将XLS转换为XLSX的实现方案

《在Java中将XLS转换为XLSX的实现方案》在本文中,我们将探讨传统ExcelXLS格式与现代XLSX格式的结构差异,并为Java开发者提供转换方案,通过了解底层原理、性能优势及实用工具,您将掌握... 目录为什么升级XLS到XLSX值得投入?实际转换过程解析推荐技术方案对比Apache POI实现编程

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Python使用FFmpeg实现高效音频格式转换工具

《Python使用FFmpeg实现高效音频格式转换工具》在数字音频处理领域,音频格式转换是一项基础但至关重要的功能,本文主要为大家介绍了Python如何使用FFmpeg实现强大功能的图形化音频转换工具... 目录概述功能详解软件效果展示主界面布局转换过程截图完成提示开发步骤详解1. 环境准备2. 项目功能结

使用Python实现网页表格转换为markdown

《使用Python实现网页表格转换为markdown》在日常工作中,我们经常需要从网页上复制表格数据,并将其转换成Markdown格式,本文将使用Python编写一个网页表格转Markdown工具,需... 在日常工作中,我们经常需要从网页上复制表格数据,并将其转换成Markdown格式,以便在文档、邮件或

使用Python实现调用API获取图片存储到本地的方法

《使用Python实现调用API获取图片存储到本地的方法》开发一个自动化工具,用于从JSON数据源中提取图像ID,通过调用指定API获取未经压缩的原始图像文件,并确保下载结果与Postman等工具直接... 目录使用python实现调用API获取图片存储到本地1、项目概述2、核心功能3、环境准备4、代码实现