快速入门Flink (6) —— Flink的广播变量、累加器与分布式缓存

2024-02-07 23:59

本文主要是介绍快速入门Flink (6) —— Flink的广播变量、累加器与分布式缓存,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影。我希望在最美的年华,做最好的自己

        通过快速入门Flink系列的(1-5)篇博客,博主已经为大家介绍了一些Flink中常见的概念与一些基础的操作,感兴趣的朋友们可以收藏一下菌哥的Flink专栏哟(👉快速入门Flink)。本篇博客,博主为大家介绍的是Flink的广播变量、累加器与分布式缓存

        码字不易,先赞后看
在这里插入图片描述

文章目录

      • 1.5 Flink的广播变量
      • 1.6 Flink的分布式缓存
      • 1.7 Flink Accumulators & Counters
      • 小结


1.5 Flink的广播变量

        Flink支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中, 这样可以减缓大量的 shuffle 操作; 比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存 中,可以直接在内存中拿数据,避免了大量的 shuffle, 导致集群性能下降; 广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要 记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的

        一句话解释,可以理解为是一个公共的共享变量,我们可以把一个 dataset 数据集广播出去, 然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。 如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集, 比较浪费内存(也就是一个节点中可能会存在多份 dataset 数据)。

        注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题。

  • Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
  • Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量

        让我们来通过一张图来感受下,使用广播变量和不使用广播变量,到底差在哪里。
在这里插入图片描述
小结一下:

        ■ 可以理解广播就是一个公共的共享变量
        ■ 将一个数据集广播后,不同的Task 都可以在节点上获取到
        ■ 每个节点只存一份
        ■ 如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费

用法:

        在需要使用广播的操作后,使用withBroadcastSet 创建广播

        在操作中,使用 getRuntimeContext.getBroadcastVariable [广播数据类型] ( 广播名 ) 获取广播变量

示例:

        创建一个学生数据集,包含以下数据:

|学生 ID | 姓名 |
|------|------|
List((1, “张三”), (2, “李四”), (3, “王五”))

        将该数据,发布到广播。再创建一个 成绩 数据集。

|学生 ID | 学科 | 成绩 |
|------|------|-----|
List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))

        请通过广播获取到学生姓名,将数据转换为

List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))

        步骤

        1) 获取批处理运行环境

        2) 分别创建两个数据集

        3) 使用 RichMapFunction 对 成绩 数据集进行 map 转换

        4) 在数据集调用 map 方法后,调用 withBroadcastSet 将 学生 数据集创建广播

        5) 实现 RichMapFunction

        a. 将成绩数据(学生 ID,学科,成绩) -> (学生姓名,学科,成绩)
        b. 重写 open 方法中,获取广播数据
        c. 导入 scala.collection.JavaConverters._ 隐式转换
        d. 将广播数据使用 asScala 转换为 Scala 集合,再使用 toList 转换为 scala List 集合
        e. 在 map 方法中使用广播进行转换

        6) 打印测试

        参考代码

import java.utilimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._
/** @Author: Alice菌* @Date: 2020/8/1 20:30* @Description: */
object BatchBroadcastDemo {def main(args: Array[String]): Unit = {// 1、创建批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment// 2、分别创建两个数据集// 创建学生数据集val stuDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))// 创建成绩数据集val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))// 3、使用RichMapFunction 对成绩数据集进行map转换// 返回值类型(学生名字,学科称名,成绩)val result: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {// 定义获取学生数据集的集合var studentMap: Map[Int, String] = _// 初始化的时候被执行一次,在对象的生命周期中只被执行一次override def open(parameters: Configuration): Unit = {// 因为获取到的广播变量中的数据类型是java的集合类型,但是我们的代码是scala,因此需要将java的集合转换成scala的集合// 我们这里将list转换成了map对象,之所以能够转换是因为list中的元素是对偶元组,因此可以转换成 kv 键值对类型// 之所以要转换,是因为后面好用,传递一个学生id,可以直接获取到学生的名字import scala.collection.JavaConversions._// 获取到广播变量的内容val studentList: util.List[(Int, String)] = getRuntimeContext.getBroadcastVariable[(Int, String)]("student")studentMap = studentList.toMap}// 要对集合中的每个元素执行map操作,也就是说集合中有多少元素,就被执行多少次override def map(value: (Int, String, Int)): (String, String, Int) = {//(Int, String, Int)=》(学生id,学科名字,学生成绩)//返回值类型(学生名字,学科名,成绩)val stuId: Int = value._1val stuName: String = studentMap.getOrElse(stuId, "")//(学生名字,学科名,成绩)(stuName, value._2, value._3)}}).withBroadcastSet(stuDataSet,"student")result.print()//(张三,语文,50)//(李四,数学,70)//(王五,英文,86)}
}

1.6 Flink的分布式缓存

        Flink 提供了一个类似于 Hadoop 的分布式缓存,让并行运行实例的函数可以在本地访 问。这 个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等! 缓存的使用流程:

        使用 ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS 上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink 会自动将复制文件或者目录到所有 worker 节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!

        【注意】广播是将变量分发到各个 worker 节点的内存上,分布式缓存是将文件缓存到各个 worker 节点上

        用法

  • 使用 Flink 运行时环境的 registerCachedFile
  • 在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存

        示例

        创建一个成绩数据集

List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))

        请通过分布式缓存获取到学生姓名,将数据转换为

List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))

        注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名

1,张三
2,李四
3,王五

        操作步骤

  1. 将 distribute_cache_student 文件上传到 HDFS / 目录下

  2. 获取批处理运行环境

  3. 创建成绩数据集

  4. 对成绩数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)

        a. RichMapFunction 的 open 方法中,获取分布式缓存数据
        b. 在 map 方法中进行转换

  1. 实现 open 方法

        a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
        b. 使用 Scala.fromFile 读取文件,并获取行
        c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List

  1. 实现 map 方法

        a. 从分布式缓存中根据学生 ID 过滤出来学生
        b. 获取学生姓名
        c. 构建最终结果元组

        参考代码

import java.io.Fileimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._import scala.io.Source
/** @Author: Alice菌* @Date: 2020/8/1 22:40* @Description: */
object BatchDisCachedFile {def main(args: Array[String]): Unit = {// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment// 注册一个分布式缓存env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student","student")/*1,张三2,李四3,王五*/// 创建成绩数据集val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var studentMap: Map[Int, String] = _// 初始化的时候被调用一次override def open(parameters: Configuration): Unit = {// 获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()studentMap = linesIter.map(lines => {val words: Array[String] = lines.split(",")(words(0).toInt, words(1))}).toMap}override def map(value: (Int, String, Int)): (String, String, Int) = {val stuName: String = studentMap.getOrElse(value._1, "")(stuName, value._2, value._3)}})// 输出结果resultDataSet.print()//(张三,语文,50)//(李四,数学,70)//(王五,英文,86)}
}

1.7 Flink Accumulators & Counters

        Accumulator累加器,与 Mapreduce counter 的应用场景差不多,都能很好地观察 task 在运行期间的数据变化 可以在 Flink job 任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

        Counter 是一个具体的累加器 (Accumulator) ,我们可以实 现 IntCounter, LongCounter 和 DoubleCounter。

        步骤

        1) 创建累加器

        private IntCounter numLines = new IntCounter();

        2) 注册累加器

        getRuntimeContext().addAccumulator("num-lines", this.numLines);

        3) 使用累加器

        this.numLines.add(1);

        4) 获取累加器的结果

        myJobExecutionResult.getAccumulatorResult("num-lines")

        示例:

需求: 给定一个数据源 “a”,“b”,“c”,“d” 通过累加器打印出多少个元素

        参考代码

import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode/** @Author: Alice菌* @Date: 2020/8/1 23:26* @Description: */
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1、创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2、创建执行环境val sourceDataSet: DataSet[String] = env.fromElements("a","b","c","d")//3、对sourceDataSet 进行map操作val resultDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {// 创建累加器val counter: IntCounter = new IntCounter// 初始化的时候执行一次override def open(parameters: Configuration): Unit = {// 注册累加器getRuntimeContext.addAccumulator("MyAccumulator", this.counter)}// 初始化的时候被执行一次override def map(value: String): String = {counter.add(1)value}})resultDataSet.writeAsText("data/output/Accumulators",WriteMode.OVERWRITE)val result: JobExecutionResult = env.execute("BatchCounterDemo")val MyAccumlatorValue: Int = result.getAccumulatorResult[Int]("MyAccumulator")println("累加器的值:"+MyAccumlatorValue)//累加器的值:4}
}

        Flink Broadcast 和 Accumulators 的区别:

        Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改

        Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。


小结

        本篇博客所讲述的内容,与前几篇博客相比,就有点类似于拓展的感觉。大家对于新的知识点一定要在理解的程度上再去进行复习回顾,而不是单纯地靠硬记。

        如果以上过程中出现了任何的纰漏错误,烦请大佬们指正😅

        受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波🙏

        希望我们都能在学习的道路上越走越远😉
在这里插入图片描述

这篇关于快速入门Flink (6) —— Flink的广播变量、累加器与分布式缓存的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/689341

相关文章

Java Lettuce 客户端入门到生产的实现步骤

《JavaLettuce客户端入门到生产的实现步骤》本文主要介绍了JavaLettuce客户端入门到生产的实现步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 目录1 安装依赖MavenGradle2 最小化连接示例3 核心特性速览4 生产环境配置建议5 常见问题

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

MyBatis延迟加载与多级缓存全解析

《MyBatis延迟加载与多级缓存全解析》文章介绍MyBatis的延迟加载与多级缓存机制,延迟加载按需加载关联数据提升性能,一级缓存会话级默认开启,二级缓存工厂级支持跨会话共享,增删改操作会清空对应缓... 目录MyBATis延迟加载策略一对多示例一对多示例MyBatis框架的缓存一级缓存二级缓存MyBat

前端缓存策略的自解方案全解析

《前端缓存策略的自解方案全解析》缓存从来都是前端的一个痛点,很多前端搞不清楚缓存到底是何物,:本文主要介绍前端缓存的自解方案,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、为什么“清缓存”成了技术圈的梗二、先给缓存“把个脉”:浏览器到底缓存了谁?三、设计思路:把“发版”做成“自愈”四、代码

Java 线程池+分布式实现代码

《Java线程池+分布式实现代码》在Java开发中,池通过预先创建并管理一定数量的资源,避免频繁创建和销毁资源带来的性能开销,从而提高系统效率,:本文主要介绍Java线程池+分布式实现代码,需要... 目录1. 线程池1.1 自定义线程池实现1.1.1 线程池核心1.1.2 代码示例1.2 总结流程2. J

使用EasyPoi快速导出Word文档功能的实现步骤

《使用EasyPoi快速导出Word文档功能的实现步骤》EasyPoi是一个基于ApachePOI的开源Java工具库,旨在简化Excel和Word文档的操作,本文将详细介绍如何使用EasyPoi快速... 目录一、准备工作1、引入依赖二、准备好一个word模版文件三、编写导出方法的工具类四、在Export

Python之变量命名规则详解

《Python之变量命名规则详解》Python变量命名需遵守语法规范(字母开头、不使用关键字),遵循三要(自解释、明确功能)和三不要(避免缩写、语法错误、滥用下划线)原则,确保代码易读易维护... 目录1. 硬性规则2. “三要” 原则2.1. 要体现变量的 “实际作用”,拒绝 “无意义命名”2.2. 要让

Java 缓存框架 Caffeine 应用场景解析

《Java缓存框架Caffeine应用场景解析》文章介绍Caffeine作为高性能Java本地缓存框架,基于W-TinyLFU算法,支持异步加载、灵活过期策略、内存安全机制及统计监控,重点解析其... 目录一、Caffeine 简介1. 框架概述1.1 Caffeine的核心优势二、Caffeine 基础2

Redis高性能Key-Value存储与缓存利器常见解决方案

《Redis高性能Key-Value存储与缓存利器常见解决方案》Redis是高性能内存Key-Value存储系统,支持丰富数据类型与持久化方案(RDB/AOF),本文给大家介绍Redis高性能Key-... 目录Redis:高性能Key-Value存储与缓存利器什么是Redis?为什么选择Redis?Red

React 记忆缓存的三种方法实现

《React记忆缓存的三种方法实现》本文主要介绍了React记忆缓存的三种方法实现,包含React.memo、useMemo、useCallback,用于避免不必要的组件重渲染和计算,感兴趣的可以... 目录1. React.memo2. useMemo3. useCallback使用场景与注意事项在 Re