Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark三大数据结构分别是:
➢ RDD : 弹性分布式数据集
➢ 累加器:分布式共享只写变量
➢ 广播变量:分布式共享只读变量

一.累加器(accumulator)

问题引入:
当使用foreach来对rdd求和会发现求和数据为0

val rdd = sc.makeRDD(List(1,2,3,4))var sum = 0rdd.foreach(num => {sum += num})println("sum = " + sum)

输出:sum = 0

在Spark中声明SparkContext的类称为Driver,所以变量sum在Driver中;而任务Task(即分区数据的运算)的执行是在Executor中进行,即sum = sum + num在Executor节点执行;

问题的关键点在于:Executor只是做了运算,但并没有将sum运算后的值返回Driver中,也就是说Driver中的sum变量至始至终都保持初始值为0;如下图所示:
在这里插入图片描述
此时便可以考虑使用累加器解决上述问题

1.系统累加器

val rdd = sc.makeRDD(List(1,2,3,4))
val sumAcc = sc.longAccumulator("sum")//sc.doubleAccumulator//sc.collectionAccumulatorrdd.foreach(num => {// 使用累加器sumAcc.add(num)})// 获取累加器的值println(sumAcc.value)

输出:10
图解:
在这里插入图片描述

使用累加器的一些问题:

  • 少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
val rdd = sc.makeRDD(List(1,2,3,4))val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})println(sumAcc.value)

输出:0

  • 多加:累加器为全局共享变量,多次调用行动算子就会多次执行
val sumAcc = sc.longAccumulator("sum")val mapRDD = rdd.map(num => {// 使用累加器sumAcc.add(num)num})mapRDD.collect()mapRDD.collect()println(sumAcc.value)

输出:20

2.自定义累加器

  1. 继承 AccumulatorV2,并设定泛型
  2. 重写累加器的抽象方法

示例:自定义数据累加器WordCount

package org.xyl
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutableobject RDD_Acc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("Acc").set("spark.testing.memory", "2147480000")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List("hello", "spark", "hello"))// 累加器 : WordCount// 创建累加器对象val wcAcc = new MyAccumulator()// 向Spark进行注册sc.register(wcAcc, "wordCountAcc")rdd.foreach(word => {// 数据的累加(使用累加器)wcAcc.add(word)})// 获取累加器累加的结果println(wcAcc.value)sc.stop()}/*自定义数据累加器:WordCount1. 继承AccumulatorV2, 定义泛型IN : 累加器输入的数据类型 StringOUT : 累加器返回的数据类型 mutable.Map[String, Long]2. 重写方法(6)*/class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]()// 判断是否初始状态override def isZero: Boolean = {wcMap.isEmpty}override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}override def reset(): Unit = {wcMap.clear()}// 获取累加器需要计算的值override def add(word: String): Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt)}// Driver合并多个累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1 = this.wcMapval map2 = other.valuemap2.foreach{case ( word, count ) => {val newCount = map1.getOrElse(word, 0L) + countmap1.update(word, newCount)}}}// 累加器结果override def value: mutable.Map[String, Long] = {wcMap}}
}

输出:Map(spark -> 1, hello -> 2)

二.广播变量(broadcast variable)

问题引入:
当在Executor端用到了Driver变量,比如使用map()函数,在每个Executor中有多少个task就有多少个Driver端变量副本。
在这里插入图片描述广播变量可以让我们在每台计算机上保留一个只读变量,而不是为每个任务复制一份副本。
Spark会自动广播每个stage任务需要的通用数据。这些被广播的数据以序列化的形式缓存起来,然后在任务运行前进行反序列化。也就是说,在以下两种情况下显示的创建广播变量才有用:
1)当任务跨多个stage并且需要同样的数据时;
2)当以反序列化的形式来缓存数据时。
在这里插入图片描述

【闭包】:是一个函数,这个函数的执行结果由外部自由变量(不是函数的局部变量也不是函数的参数,也称字面量)决定——函数执行时,才捕获相关的自由变量(获取自由变量的当前值),从而形成闭合的函数。
spark执行一个Stage时,会为待执行函数(function,也称为【算子】)建立闭包(捕获函数引用的所有数据集),形成该Stage所有task所需信息的二进制形式,然后把这个闭包发送到集群的每个Executor上。

示例:
注:在创建广播变量时,广播变量的值必须是本地的可序列化的值,不能是RDD,因为RDD是不存数据的。但可以将RDD的结果广播出去。

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Spark06_Bc {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("broadcast")val sc = new SparkContext(sparConf)val rdd1 = sc.makeRDD(List(("a", 1),("b", 2),("c", 3)))val map = mutable.Map(("a", 4),("b", 5),("c", 6))// 封装广播变量val bc: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)rdd1.map {case (w, c) => {// 访问广播变量val l: Int = bc.value.getOrElse(w, 0)(w, (c, l))}}.collect().foreach(println)sc.stop()}
}

输出:
(a,(1,4))
(b,(2,5))
(c,(3,6))

注意:

累加器只能在driver端定义,driver端读取,不能在Executor端读取。

广播变量只能在driver端定义,在Executor端读取,Executor不能修改。

这篇关于Spark学习笔记(详解,附代码实列和图解)----------累加器和广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592028

相关文章

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的锁机制详解之全局锁,表级锁,行级锁

《MySQL中的锁机制详解之全局锁,表级锁,行级锁》MySQL锁机制通过全局、表级、行级锁控制并发,保障数据一致性与隔离性,全局锁适用于全库备份,表级锁适合读多写少场景,行级锁(InnoDB)实现高并... 目录一、锁机制基础:从并发问题到锁分类1.1 并发访问的三大问题1.2 锁的核心作用1.3 锁粒度分

MySQL数据库中ENUM的用法是什么详解

《MySQL数据库中ENUM的用法是什么详解》ENUM是一个字符串对象,用于指定一组预定义的值,并可在创建表时使用,下面:本文主要介绍MySQL数据库中ENUM的用法是什么的相关资料,文中通过代码... 目录mysql 中 ENUM 的用法一、ENUM 的定义与语法二、ENUM 的特点三、ENUM 的用法1

MySQL count()聚合函数详解

《MySQLcount()聚合函数详解》MySQL中的COUNT()函数,它是SQL中最常用的聚合函数之一,用于计算表中符合特定条件的行数,本文给大家介绍MySQLcount()聚合函数,感兴趣的朋... 目录核心功能语法形式重要特性与行为如何选择使用哪种形式?总结深入剖析一下 mysql 中的 COUNT

一文详解Git中分支本地和远程删除的方法

《一文详解Git中分支本地和远程删除的方法》在使用Git进行版本控制的过程中,我们会创建多个分支来进行不同功能的开发,这就容易涉及到如何正确地删除本地分支和远程分支,下面我们就来看看相关的实现方法吧... 目录技术背景实现步骤删除本地分支删除远程www.chinasem.cn分支同步删除信息到其他机器示例步骤

Java中调用数据库存储过程的示例代码

《Java中调用数据库存储过程的示例代码》本文介绍Java通过JDBC调用数据库存储过程的方法,涵盖参数类型、执行步骤及数据库差异,需注意异常处理与资源管理,以优化性能并实现复杂业务逻辑,感兴趣的朋友... 目录一、存储过程概述二、Java调用存储过程的基本javascript步骤三、Java调用存储过程示

Visual Studio 2022 编译C++20代码的图文步骤

《VisualStudio2022编译C++20代码的图文步骤》在VisualStudio中启用C++20import功能,需设置语言标准为ISOC++20,开启扫描源查找模块依赖及实验性标... 默认创建Visual Studio桌面控制台项目代码包含C++20的import方法。右键项目的属性:

Go语言数据库编程GORM 的基本使用详解

《Go语言数据库编程GORM的基本使用详解》GORM是Go语言流行的ORM框架,封装database/sql,支持自动迁移、关联、事务等,提供CRUD、条件查询、钩子函数、日志等功能,简化数据库操作... 目录一、安装与初始化1. 安装 GORM 及数据库驱动2. 建立数据库连接二、定义模型结构体三、自动迁