大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等

2024-09-05 05:36

本文主要是介绍大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink Sink JDBC
  • Flink Sink Kafka

在这里插入图片描述

注意事项

DataSetAPI 和 DataStream API一样有三个部分组成,各部分的作用对应一致,此处不再赘述。

FlinkDataSet

在 Apache Flink 中,DataSet API 是 Flink 批处理的核心接口,它主要用于处理静态数据集。虽然 Flink 的 DataStream API 被广泛用于流式数据处理,但 DataSet API 适用于大规模批处理场景,如数据清洗、ETL、分析等。虽然近年来 Flink 更多地向流处理方向发展,但批处理仍然是数据处理中的一个重要场景。

DataSource

对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个 DataSource 组件:

  • 基于集合:fromCollection 主要是为了方便测试
  • 基于文件:readTextFile,基于HDFS中的数据进行计算分析

基本概念

Flink 的 DataSet API 是一个功能强大的批处理 API,专为处理静态、离线数据集设计。DataSet 中的数据是有限的,处理时系统会先等待整个数据集加载完毕。DataSet 可以通过多种方式创建,例如从文件、数据库、集合等加载数据,然后通过一系列转换操作(如 map、filter、join 等)进行处理。

核心特性

  • 支持丰富的转换操作。
  • 提供多种输入输出数据源。
  • 支持复杂的数据类型,包括基本类型、元组、POJO、列表等。
  • 支持优化计划,例如通过 cost-based optimizer 来优化查询执行计划。

DataSet 创建

在 Flink 中,可以通过多种方式创建 DataSet。以下是常见的数据源:

从本地文件读取

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("path/to/file");

从 CSV 文件读取

DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv").types(Integer.class, String.class, Double.class);

从集合中创建

List<Tuple2<String, Integer>> data = Arrays.asList(new Tuple2<>("Alice", 1),new Tuple2<>("Bob", 2)
);
DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);

从数据库中读取

可以通过自定义的输入格式(如 JDBC 输入格式)从数据库中读取数据,虽然 Flink 本身并没有内置 JDBC 源的批处理 API,但可以通过自定义实现。

DataSet 的转换操作(Transformation)

Flink 的 DataSet API 提供了丰富的转换操作,可以对数据进行各种变换,以下是常用的转换操作:
在这里插入图片描述
在这里插入图片描述

Map

将 DataSet 中的每一条记录进行映射操作,生成新的 DataSet。

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);

Filter

过滤掉不满足条件的记录。

DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

FlatMap

类似于 map,但允许一条记录生成多条输出记录。

DataSet<String> lines = env.fromElements("hello world", "flink is great");
DataSet<String> words = lines.flatMap((line, collector) -> {for (String word : line.split(" ")) {collector.collect(word);}
});

Reduce

将数据集根据某种聚合逻辑进行合并

DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);

GroupBy 和 Reduce

对数据集进行分组,然后在每个组上执行聚合操作

DataSet<Tuple2<String, Integer>> wordCounts = words.map(word -> new Tuple2<>(word, 1)).groupBy(0).reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));

Join

类似于 SQL 中的连接操作,连接两个 DataSet。

DataSet<Tuple2<Integer, String>> persons = env.fromElements(new Tuple2<>(1, "Alice"),new Tuple2<>(2, "Bob")
);
DataSet<Tuple2<Integer, String>> cities = env.fromElements(new Tuple2<>(1, "Berlin"),new Tuple2<>(2, "Paris")
);
DataSet<Tuple2<String, String>> personWithCities = persons.join(cities).where(0).equalTo(0).with((p, c) -> new Tuple2<>(p.f1, c.f1));

DataSet 输出

DataSet API 提供多种方式将数据写出到外部系统:

写入文件

wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");

写入数据库

虽然 DataSet API 没有直接提供 JDBC Sink,可以通过自定义 Sink 实现写入数据库功能。

打印控制台

wordCounts.print();

批处理的优化

DataSet API 提供了优化机制,通过成本模型和执行计划的分析来优化任务执行。在 Flink 内部,编译器会根据任务定义的转换操作生成一个优化的执行计划,这个过程类似于 SQL 查询优化器的工作原理。

  • DataSet 的分区:Flink 可以根据数据集的分区进行优化。例如,通过 partitionByHash 或 partitionByRange 来手动控制数据的分布方式。
  • DataSet 的缓存:可以通过 rebalance()、hashPartition() 等方法来均衡数据负载,以提高并行度和计算效率。

DataSet API 的容错机制

Flink 的 DataSet API 提供了容错机制,支持在发生故障时重新执行失败的任务。虽然 DataSet API 没有像 DataStream 那样依赖于 Checkpoint 机制,但其批处理特性允许任务从头开始重新执行,确保数据处理的正确性。

DataSet 与 DataStream 的对比

DataSet API 与 DataStream API 之间有一些重要的区别:

请添加图片描述

DataSet API 的未来

需要注意的是,Flink 的官方路线图中已经不再优先开发 DataSet API 的新特性,未来的主要开发将集中在 DataStream API,甚至批处理功能都将通过 DataStream API 来实现。
因此,如果可能,建议新项目尽量使用 DataStream API 来替代 DataSet API。
特别是 Flink 的 Table API 和 SQL API 也适用于批处理和流处理,这些高层 API 提供了更简洁的语法和更强的优化能力。

这篇关于大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1138092

相关文章

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

使用Java将各种数据写入Excel表格的操作示例

《使用Java将各种数据写入Excel表格的操作示例》在数据处理与管理领域,Excel凭借其强大的功能和广泛的应用,成为了数据存储与展示的重要工具,在Java开发过程中,常常需要将不同类型的数据,本文... 目录前言安装免费Java库1. 写入文本、或数值到 Excel单元格2. 写入数组到 Excel表格

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

SpringBoot3.4配置校验新特性的用法详解

《SpringBoot3.4配置校验新特性的用法详解》SpringBoot3.4对配置校验支持进行了全面升级,这篇文章为大家详细介绍了一下它们的具体使用,文中的示例代码讲解详细,感兴趣的小伙伴可以参考... 目录基本用法示例定义配置类配置 application.yml注入使用嵌套对象与集合元素深度校验开发

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Qt实现网络数据解析的方法总结

《Qt实现网络数据解析的方法总结》在Qt中解析网络数据通常涉及接收原始字节流,并将其转换为有意义的应用层数据,这篇文章为大家介绍了详细步骤和示例,感兴趣的小伙伴可以了解下... 目录1. 网络数据接收2. 缓冲区管理(处理粘包/拆包)3. 常见数据格式解析3.1 jsON解析3.2 XML解析3.3 自定义

使用Python和Pyecharts创建交互式地图

《使用Python和Pyecharts创建交互式地图》在数据可视化领域,创建交互式地图是一种强大的方式,可以使受众能够以引人入胜且信息丰富的方式探索地理数据,下面我们看看如何使用Python和Pyec... 目录简介Pyecharts 简介创建上海地图代码说明运行结果总结简介在数据可视化领域,创建交互式地

SpringMVC 通过ajax 前后端数据交互的实现方法

《SpringMVC通过ajax前后端数据交互的实现方法》:本文主要介绍SpringMVC通过ajax前后端数据交互的实现方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价... 在前端的开发过程中,经常在html页面通过AJAX进行前后端数据的交互,SpringMVC的controll

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式