Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量

一.累加器(accumulator)

问题引入:
当使用foreach来对rdd求和会发现求和数据为0

val rdd = sc.makeRDD(List(1,2,3,4))var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum)

输出:sum = 0

在Spark中声明SparkContext的类称为Driver,所以变量sum在Driver中;而任务Task(即分区数据的运算)的执行是在Executor中进行,即sum = sum + num在Executor节点执行;

问题的关键点在于:Executor只是做了运算,但并没有将sum运算后的值返回Driver中,也就是说Driver中的sum变量至始至终都保持初始值为0;如下图所示:
在这里插入图片描述
此时便可以考虑使用累加器解决上述问题

1.系统累加器

val rdd = sc.makeRDD(List(1,2,3,4))
val sumAcc = sc.longAccumulator("sum")//sc.doubleAccumulator//sc.collectionAccumulatorrdd.foreach(num => {// 使用累加器sumAcc.add(num)})// 获取累加器的值println(sumAcc.value)

输出:10
图解:
在这里插入图片描述

使用累加器的一些问题:

  • 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
val rdd = sc.makeRDD(List(1,2,3,4))val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})println(sumAcc.value)

输出:0

  • 多加:累加器为全局共享变量,多次调用行动算子就会多次执行
val sumAcc = sc.longAccumulator("sum")val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})mapRDD.collect()mapRDD.collect()println(sumAcc.value)

输出:20

2.自定义累加器

  1. 继承 AccumulatorV2,并设定泛型
  2. 重写累加器的抽象方法

示例:自定义数据累加器WordCount

package org.xyl
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutableobject RDD_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc").set("spark.testing.memory", "2147480000")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})// 获取累加器累加的结果println(wcAcc.value)sc.stop()}/*自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型IN : 累加器输入的数据类型 StringOUT : 累加器返回的数据类型 mutable.Map[String, Long]2. 重写方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}override def reset(): Unit = {wcMap.clear()}// 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}// Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}// 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}
}

输出:Map(spark -> 1, hello -> 2)

二.广播变量(broadcast variable)

问题引入:
当在Executor端用到了Driver变量,比如使用map()函数,在每个Executor中有多少个task就有多少个Driver端变量副本。
在这里插入图片描述广播变量可以让我们在每台计算机上保留一个只读变量,而不是为每个任务复制一份副本。
Spark会自动广播每个stage任务需要的通用数据。这些被广播的数据以序列化的形式缓存起来,然后在任务运行前进行反序列化。也就是说,在以下两种情况下显示的创建广播变量才有用:
1)当任务跨多个stage并且需要同样的数据时;
2)当以反序列化的形式来缓存数据时。
在这里插入图片描述

【闭包】:是一个函数,这个函数的执行结果由外部自由变量(不是函数的局部变量也不是函数的参数,也称字面量)决定——函数执行时,才捕获相关的自由变量(获取自由变量的当前值),从而形成闭合的函数。
spark执行一个Stage时,会为待执行函数(function,也称为【算子】)建立闭包(捕获函数引用的所有数据集),形成该Stage所有task所需信息的二进制形式,然后把这个闭包发送到集群的每个Executor上。

示例:
注:在创建广播变量时,广播变量的值必须是本地的可序列化的值,不能是RDD,因为RDD是不存数据的。但可以将RDD的结果广播出去。

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark06_Bc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("broadcast")val sc = new SparkContext(sparConf)val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6))// 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)rdd1.map {case (w, c) => {// 访问广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)sc.stop()}
}

输出:
(a,(1,4))
(b,(2,5))
(c,(3,6))

注意:

累加器只能在driver端定义,driver端读取,不能在Executor端读取。

广播变量只能在driver端定义,在Executor端读取,Executor不能修改。

这篇关于Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592028

相关文章

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

MySQL8 密码强度评估与配置详解

《MySQL8密码强度评估与配置详解》MySQL8默认启用密码强度插件,实施MEDIUM策略(长度8、含数字/字母/特殊字符),支持动态调整与配置文件设置,推荐使用STRONG策略并定期更新密码以提... 目录一、mysql 8 密码强度评估机制1.核心插件:validate_password2.密码策略级

从入门到精通详解Python虚拟环境完全指南

《从入门到精通详解Python虚拟环境完全指南》Python虚拟环境是一个独立的Python运行环境,它允许你为不同的项目创建隔离的Python环境,下面小编就来和大家详细介绍一下吧... 目录什么是python虚拟环境一、使用venv创建和管理虚拟环境1.1 创建虚拟环境1.2 激活虚拟环境1.3 验证虚

详解python pycharm与cmd中制表符不一样

《详解pythonpycharm与cmd中制表符不一样》本文主要介绍了pythonpycharm与cmd中制表符不一样,这个问题通常是因为PyCharm和命令行(CMD)使用的制表符(tab)的宽... 这个问题通常是因为PyCharm和命令行(CMD)使用的制表符(tab)的宽度不同导致的。在PyChar

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

从基础到高级详解Python数值格式化输出的完全指南

《从基础到高级详解Python数值格式化输出的完全指南》在数据分析、金融计算和科学报告领域,数值格式化是提升可读性和专业性的关键技术,本文将深入解析Python中数值格式化输出的相关方法,感兴趣的小伙... 目录引言:数值格式化的核心价值一、基础格式化方法1.1 三种核心格式化方式对比1.2 基础格式化示例