详解 Spark 核心编程之累加器

2024-06-02 17:52

本文主要是介绍详解 Spark 核心编程之累加器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

累加器是分布式共享只写变量

一、累加器功能

​ 累加器可以用来把 Executor 端的变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在 Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge

在这里插入图片描述

二、累加器类型

1. 系统累加器

/**
常见的系统累加器:longAccumulator/doubleAccumulator/collectionAccumulator
说明:累加器一般放在行动算子中进行操作
*/
object TestRDDAcc {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Acc")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4), 2)// 创建累加器val accSum = sc.longAccumulator("sum")rdd.foreach(num => {accSum.add(num)    })println(accSum.value)sc.stop()}
}

三、自定义累加器

自定义累加器实现 WordCount 案例,避免 shuffle 操作

/**1.继承 AccumulatorV2[IN, OUT] 抽象类,定义输入输出的泛型类型1.1 IN 表述累加器 add 的数据的类型1.2 OUT 表示累加器 value 的返回类型2.重写累加器的抽象方法
*/
object TestAccWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("WCAcc")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List("hello", "hive", "hello", "spark"))// 创建自定义累加器val wcAcc = new MyAccumulator()// 向 spark 进行注册sc.register(wcAcc, "wordCountAcc")// 循环遍历 rddrdd.foreach(word => {// 使用累加器wcAcc.add(word)    })// 输出累加器的值println(wcAcc.value)sc.stop()}
}/*自定义累加器
*/
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {// 定义累加器的返回结果 Mapprivate var resultMap = mutable.Map[String, Long]()// 判断是否为初始状态override def isZero: Boolean = resultMap.isEmpty()// 复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {this}// 重置累加器override def reset(): Unit = resultMap.clear()// 获取累加器输入的数据进行操作override def add(word: String): Unit = {// 向 resultMap 中添加新值或累加旧值val count = resultMap.getOrElse(word, 0L) + 1resultMap.update(word, count)}// 合并多个累加器的结果override def merge(other:  AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {other.value.foreach({case (word, count) => {val newCount = this.resultMap.getOrElse(word, 0L) + 1this.resultMap.update(word, newCount)}})
}// 返回累加器的结果override def value: mutable.Map[String, Long] = resultMap}

这篇关于详解 Spark 核心编程之累加器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024670

相关文章

Python 字典 (Dictionary)使用详解

《Python字典(Dictionary)使用详解》字典是python中最重要,最常用的数据结构之一,它提供了高效的键值对存储和查找能力,:本文主要介绍Python字典(Dictionary)... 目录字典1.基本特性2.创建字典3.访问元素4.修改字典5.删除元素6.字典遍历7.字典的高级特性默认字典

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

一文详解如何使用Java获取PDF页面信息

《一文详解如何使用Java获取PDF页面信息》了解PDF页面属性是我们在处理文档、内容提取、打印设置或页面重组等任务时不可或缺的一环,下面我们就来看看如何使用Java语言获取这些信息吧... 目录引言一、安装和引入PDF处理库引入依赖二、获取 PDF 页数三、获取页面尺寸(宽高)四、获取页面旋转角度五、判断

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

Redis中Stream详解及应用小结

《Redis中Stream详解及应用小结》RedisStreams是Redis5.0引入的新功能,提供了一种类似于传统消息队列的机制,但具有更高的灵活性和可扩展性,本文给大家介绍Redis中Strea... 目录1. Redis Stream 概述2. Redis Stream 的基本操作2.1. XADD

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Java JDK1.8 安装和环境配置教程详解

《JavaJDK1.8安装和环境配置教程详解》文章简要介绍了JDK1.8的安装流程,包括官网下载对应系统版本、安装时选择非系统盘路径、配置JAVA_HOME、CLASSPATH和Path环境变量,... 目录1.下载JDK2.安装JDK3.配置环境变量4.检验JDK官网下载地址:Java Downloads

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串