Flink DataSet分布式缓冲

2024-06-03 13:48
文章标签 分布式 flink 缓冲 dataset

本文主要是介绍Flink DataSet分布式缓冲,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档 ,本节内容对应示例源码

[[toc]]

DataSet 分布式缓冲

Flink 提供了一个分布式缓存,类似于 hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在 taskManager 节点中,防止 task 重复拉取。

执行机制如下
程序注册一个文件或者目录 (本地或者远程文件系统,例如 hdfs 或者 s3),通过 ExecutionEnvironment 注册缓存文件并为它起一个名称
当程序执行,Flink 自动将文件或者目录复制到所有 taskManager 节点的本地文件系统,仅会执行一次。
用户可以通过这个指定的名称查找文件或者目录,然后从 taskManager 节点的本地文件系统访问它。

示例代码:

/** 分布式缓存* =示例说明=* 分布式载入用户 ID 黑名单文件,针对用户登录数据匹配在黑名单 ID 及对应登录状态** @author Li.Wei by 2019/11/4*/
object DistributedCache extends BatchExecutionEnvironmentApp {private val path = getClass.getClassLoader.getResource("data/game/blacklist-uid.txt").getPathbEnv.registerCachedFile(path, "blacklist-uid", executable = false)// 用户登录数据 DataSetval userLoginDataSet = DataSet.userLogin(this)import org.apache.flink.api.scala.extensions._ // use filterWithuserLoginDataSet.map(new BlacklistMap()).filterWith(_._1 != "none").distinct().print()}class BlacklistMap extends RichMapFunction[UserLogin, (String, String)] {var source: BufferedSource = _ // 读取文件流,函数结束后执行关闭操作var blackUid: Seq[String] = _ // 黑名单数据,从分布式缓冲文件中载入override def open(config: Configuration): Unit = {val file: File = getRuntimeContext.getDistributedCache.getFile("blacklist-uid")import scala.io.Sourcesource = Source.fromFile(file, "UTF-8")blackUid = source.getLines().toSeq}// 判断当前用户对应的 ID 在该用户对应角色中是否登录过override def map(value: UserLogin): (String, String) =if (blackUid.contains(value.uid)) (value.uid, value.status) else ("none", value.status)override def close(): Unit = source.close()
}

这篇关于Flink DataSet分布式缓冲的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1027162

相关文章

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)

《Golang实现Redis分布式锁(Lua脚本+可重入+自动续期)》本文主要介绍了Golang分布式锁实现,采用Redis+Lua脚本确保原子性,持可重入和自动续期,用于防止超卖及重复下单,具有一定... 目录1 概念应用场景分布式锁必备特性2 思路分析宕机与过期防止误删keyLua保证原子性可重入锁自动

基于MongoDB实现文件的分布式存储

《基于MongoDB实现文件的分布式存储》分布式文件存储的方案有很多,今天分享一个基于mongodb数据库来实现文件的存储,mongodb支持分布式部署,以此来实现文件的分布式存储,需要的朋友可以参考... 目录一、引言二、GridFS 原理剖析三、Spring Boot 集成 GridFS3.1 添加依赖

Redis实现分布式锁全解析之从原理到实践过程

《Redis实现分布式锁全解析之从原理到实践过程》:本文主要介绍Redis实现分布式锁全解析之从原理到实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、背景介绍二、解决方案(一)使用 SETNX 命令(二)设置锁的过期时间(三)解决锁的误删问题(四)Re

Gradle下如何搭建SpringCloud分布式环境

《Gradle下如何搭建SpringCloud分布式环境》:本文主要介绍Gradle下如何搭建SpringCloud分布式环境问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录Gradle下搭建SpringCloud分布式环境1.idea配置好gradle2.创建一个空的gr

C#使用StackExchange.Redis实现分布式锁的两种方式介绍

《C#使用StackExchange.Redis实现分布式锁的两种方式介绍》分布式锁在集群的架构中发挥着重要的作用,:本文主要介绍C#使用StackExchange.Redis实现分布式锁的... 目录自定义分布式锁获取锁释放锁自动续期StackExchange.Redis分布式锁获取锁释放锁自动续期分布式

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

redis+lua实现分布式限流的示例

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可... 目录为什么使用Redis+Lua实现分布式限流使用ZSET也可以实现限流,为什么选择lua的方式实现