Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量

一.累加器(accumulator)

问题引入:
当使用foreach来对rdd求和会发现求和数据为0

val rdd = sc.makeRDD(List(1,2,3,4))var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum)

输出:sum = 0

在Spark中声明SparkContext的类称为Driver,所以变量sum在Driver中;而任务Task(即分区数据的运算)的执行是在Executor中进行,即sum = sum + num在Executor节点执行;

问题的关键点在于:Executor只是做了运算,但并没有将sum运算后的值返回Driver中,也就是说Driver中的sum变量至始至终都保持初始值为0;如下图所示:
在这里插入图片描述
此时便可以考虑使用累加器解决上述问题

1.系统累加器

val rdd = sc.makeRDD(List(1,2,3,4))
val sumAcc = sc.longAccumulator("sum")//sc.doubleAccumulator//sc.collectionAccumulatorrdd.foreach(num => {// 使用累加器sumAcc.add(num)})// 获取累加器的值println(sumAcc.value)

输出:10
图解:
在这里插入图片描述

使用累加器的一些问题:

  • 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
val rdd = sc.makeRDD(List(1,2,3,4))val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})println(sumAcc.value)

输出:0

  • 多加:累加器为全局共享变量,多次调用行动算子就会多次执行
val sumAcc = sc.longAccumulator("sum")val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})mapRDD.collect()mapRDD.collect()println(sumAcc.value)

输出:20

2.自定义累加器

  1. 继承 AccumulatorV2,并设定泛型
  2. 重写累加器的抽象方法

示例:自定义数据累加器WordCount

package org.xyl
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutableobject RDD_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc").set("spark.testing.memory", "2147480000")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})// 获取累加器累加的结果println(wcAcc.value)sc.stop()}/*自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型IN : 累加器输入的数据类型 StringOUT : 累加器返回的数据类型 mutable.Map[String, Long]2. 重写方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}override def reset(): Unit = {wcMap.clear()}// 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}// Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}// 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}
}

输出:Map(spark -> 1, hello -> 2)

二.广播变量(broadcast variable)

问题引入:
当在Executor端用到了Driver变量,比如使用map()函数,在每个Executor中有多少个task就有多少个Driver端变量副本。
在这里插入图片描述广播变量可以让我们在每台计算机上保留一个只读变量,而不是为每个任务复制一份副本。
Spark会自动广播每个stage任务需要的通用数据。这些被广播的数据以序列化的形式缓存起来,然后在任务运行前进行反序列化。也就是说,在以下两种情况下显示的创建广播变量才有用:
1)当任务跨多个stage并且需要同样的数据时;
2)当以反序列化的形式来缓存数据时。
在这里插入图片描述

【闭包】:是一个函数,这个函数的执行结果由外部自由变量(不是函数的局部变量也不是函数的参数,也称字面量)决定——函数执行时,才捕获相关的自由变量(获取自由变量的当前值),从而形成闭合的函数。
spark执行一个Stage时,会为待执行函数(function,也称为【算子】)建立闭包(捕获函数引用的所有数据集),形成该Stage所有task所需信息的二进制形式,然后把这个闭包发送到集群的每个Executor上。

示例:
注:在创建广播变量时,广播变量的值必须是本地的可序列化的值,不能是RDD,因为RDD是不存数据的。但可以将RDD的结果广播出去。

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark06_Bc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("broadcast")val sc = new SparkContext(sparConf)val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6))// 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)rdd1.map {case (w, c) => {// 访问广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)sc.stop()}
}

输出:
(a,(1,4))
(b,(2,5))
(c,(3,6))

注意:

累加器只能在driver端定义,driver端读取,不能在Executor端读取。

广播变量只能在driver端定义,在Executor端读取,Executor不能修改。

这篇关于Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592028

相关文章

Python中Flask模板的使用与高级技巧详解

《Python中Flask模板的使用与高级技巧详解》在Web开发中,直接将HTML代码写在Python文件中会导致诸多问题,Flask内置了Jinja2模板引擎,完美解决了这些问题,下面我们就来看看F... 目录一、模板渲染基础1.1 为什么需要模板引擎1.2 第一个模板渲染示例1.3 模板渲染原理二、模板

Redis中6种缓存更新策略详解

《Redis中6种缓存更新策略详解》Redis作为一款高性能的内存数据库,已经成为缓存层的首选解决方案,然而,使用缓存时最大的挑战在于保证缓存数据与底层数据源的一致性,本文将介绍Redis中6种缓存更... 目录引言策略一:Cache-Aside(旁路缓存)策略工作原理代码示例优缺点分析适用场景策略二:Re

SpringBoot中四种AOP实战应用场景及代码实现

《SpringBoot中四种AOP实战应用场景及代码实现》面向切面编程(AOP)是Spring框架的核心功能之一,它通过预编译和运行期动态代理实现程序功能的统一维护,在SpringBoot应用中,AO... 目录引言场景一:日志记录与性能监控业务需求实现方案使用示例扩展:MDC实现请求跟踪场景二:权限控制与

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

MySQL数据库约束深入详解

《MySQL数据库约束深入详解》:本文主要介绍MySQL数据库约束,在MySQL数据库中,约束是用来限制进入表中的数据类型的一种技术,通过使用约束,可以确保数据的准确性、完整性和可靠性,需要的朋友... 目录一、数据库约束的概念二、约束类型三、NOT NULL 非空约束四、DEFAULT 默认值约束五、UN

Python使用Matplotlib绘制3D曲面图详解

《Python使用Matplotlib绘制3D曲面图详解》:本文主要介绍Python使用Matplotlib绘制3D曲面图,在Python中,使用Matplotlib库绘制3D曲面图可以通过mpl... 目录准备工作绘制简单的 3D 曲面图绘制 3D 曲面图添加线框和透明度控制图形视角Matplotlib

MySQL中的分组和多表连接详解

《MySQL中的分组和多表连接详解》:本文主要介绍MySQL中的分组和多表连接的相关操作,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录mysql中的分组和多表连接一、MySQL的分组(group javascriptby )二、多表连接(表连接会产生大量的数据垃圾)MySQL中的

Java 实用工具类Spring 的 AnnotationUtils详解

《Java实用工具类Spring的AnnotationUtils详解》Spring框架提供了一个强大的注解工具类org.springframework.core.annotation.Annot... 目录前言一、AnnotationUtils 的常用方法二、常见应用场景三、与 JDK 原生注解 API 的

redis中使用lua脚本的原理与基本使用详解

《redis中使用lua脚本的原理与基本使用详解》在Redis中使用Lua脚本可以实现原子性操作、减少网络开销以及提高执行效率,下面小编就来和大家详细介绍一下在redis中使用lua脚本的原理... 目录Redis 执行 Lua 脚本的原理基本使用方法使用EVAL命令执行 Lua 脚本使用EVALSHA命令

SpringBoot3.4配置校验新特性的用法详解

《SpringBoot3.4配置校验新特性的用法详解》SpringBoot3.4对配置校验支持进行了全面升级,这篇文章为大家详细介绍了一下它们的具体使用,文中的示例代码讲解详细,感兴趣的小伙伴可以参考... 目录基本用法示例定义配置类配置 application.yml注入使用嵌套对象与集合元素深度校验开发