Spark算子:转化算子、执行算子;累加器、广播变量

2024-03-08 09:20

本文主要是介绍Spark算子:转化算子、执行算子;累加器、广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

 一、转换算子

1、map

2、fliter

3、flatMap

4、Sample

5、Group

6、ReduceBykey

7、Union

8、Join

9、mapValus

10、sortBy

11、distinct

二、操作算子

三、累加器

四、广播变量


transformations转换算子:延迟执行--针对RDD的操作

Action操作算子:触发执行,转换算子是懒执行,需要一个action算子触发执行

 一、转换算子

1、map

    val conf = new SparkConf()conf.setMaster("local")conf.setAppName("map")val sc = new SparkContext(conf)//用parallelize构建rdd,不用读数据去创建rdd,后面是分区数val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)//getNumPartitions获取当前rdd分区数println(s"rdd1:${rdd1.getNumPartitions}")/*mapPartitions: 将一个分区的数据传递给后面的函数,一次处理一个分区的数据,需要返回一个迭代器为什么是迭代器而不是集合,因为集合会将数据加载到内存中,如果一个分区数据量太大会导致内存溢出*/val rdd2: RDD[Int] = rdd1.mapPartitions((iter: Iterator[Int]) => {println("=" * 40)val iter1: Iterator[Int] = iter.map(i => i * 2)iter1})//分区分批输出rdd2.foreach(println(_))/*mapPartitionsWithIndex获取当前分区号和分区数据*/val rdd3: RDD[Int] = rdd1.mapPartitionsWithIndex((i: Int, iter: Iterator[Int]) => {println(s"当前的分区编号:${i}")val ints: Iterator[Int] = iter.map(i => i - 1)ints})rdd3.foreach(println(_))

2、fliter

        对数据进行过滤,函数返回treu保留数据,函数返回false过滤数据

3、flatMap

        将rdd中的数据一行变多行,函数返回值必须是一个序列

4、Sample

        抽样数据

val conf = new SparkConf()conf.setAppName("sample")conf.setMaster("local")
val sc = new SparkContext(conf)val studentRDD: RDD[String] = sc.textFile("data/students.txt")
//sample: 对数据进行抽样
//false表示数据写不写入文件中,0.1是抽样比例,抽样的数量存在误差
val sRdd: RDD[String] = studentRDD.sample(false, 0.1)sRdd.foreach(println(_))

5、Group

(1)GroupBY

        指定一个分组的字段进行分组,不需要一定是一个kv格式,返回的新的rdd的value里面包括所有的字段

(2)GroupBYkey

        rdd必须是一个kv格式,返回的新的rdd的迭代器中的数据只包含value, 后续在处理数据的时候方便一点

val rdd1: RDD[String] = sc.parallelize(List("1500100001,1000001,98", "1500100001,1000002,5", "1500100001, 1000003,137", "1500100001, 1000004,29", "1500100001, 1000005,85"))val rdd2: RDD[(String, Int)] =rdd1.map(str=>str.split(",")).map{case Array(id: String, _: String, sco: String) =>(id, sco.toInt)}rdd2.foreach(print(_))
//(1500100001,98)(1500100001,5)(1500100001,137)(1500100001,29)(1500100001,85)val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(str => str._1)rdd3.foreach(println(_))
//(1500100001,CompactBuffer((1500100001,98), (1500100001,5), (1500100001,137), (1500100001,29), (1500100001,85)))val rdd4: RDD[(String, Iterable[Int])] = rdd2.groupByKey()rdd4.foreach(println(_))
//(1500100001,CompactBuffer(98, 5, 137, 29, 85))

6、ReduceBykey

        在kv格式的数据中,对相同k的v做和运算,groupBYKey需要先分组,分组后v是一个集合,在对集合做求和运算

reduceBYKey是对相同的key的value进行聚合计算,加上一个聚合函数。

//x和y表示相同的key的两个value值,x+y表示对相同key的value做求和运算
val rdd5: RDD[(String, Int)] = rdd2.reduceByKey((x: Int, y: Int) => {val j: Int = x + yj})rdd5.foreach(println(_))
//(1500100001,354)

7、Union

        合并两个rdd, 不会对数据做去重, 两个rdd的类型要完全一致,在物理层面并没有合并,只是在逻辑层面合并了,合并的rdd是两个分区。

    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6))val rdd2: RDD[Int] = sc.parallelize(List(3, 4, 5, 6, 7, 8, 9))val unionRDD: RDD[Int] = rdd1.union(rdd2)println(s"unionRDD:${unionRDD.getNumPartitions}") //2个分区unionRDD.foreach(println)

8、Join

val idNameRDD: RDD[(String, String)] = sc.parallelize(List(("000", "晓伟"),("001", "张三"),("002", "李四"),("003", "王五")))val idAgeRDD: RDD[(String, Int)] = sc.parallelize(List(("001", 23),("002", 24),("003", 25),("004", 23)))
//innerJoin: 两个表都有才能关联上val innerJoinRDD: RDD[(String, (String, Int))] = idNameRDD.join(idAgeRDD)innerJoinRDD.foreach(println)
结果:
(003,(王五,25))
(002,(李四,24))
(001,(张三,23))/*** leftOuterJoin: 以左表为基础,如果右表没有这个key,补NOne* Option: 可选择的值,有值或者没有值
*/val leftJoinRDD: RDD[(String, (String, Option[Int]))] = idNameRDD.leftOuterJoin(idAgeRDD)leftJoinRDD.foreach(println)
结果:
(003,(王五,Some(25)))
(000,(晓伟,None))
(002,(李四,Some(24)))
(001,(张三,Some(23)))//fullOuterJoin: 以两个表为基础,有一边有数据就会出来结果,列一边补Noneval fullJoinRDD: RDD[(String, (Option[String], Option[Int]))] = idNameRDD.fullOuterJoin(idAgeRDD)fullJoinRDD.foreach(println)
结果:
(003,(Some(王五),Some(25)))
(000,(Some(晓伟),None))
(004,(None,Some(23)))
(002,(Some(李四),Some(24)))
(001,(Some(张三),Some(23)))

9、mapValus

        key不变,对value做处理

val idAgeRDD: RDD[(String, Int)] = sc.parallelize(List(("001", 23),("002", 24),("003", 25),("004", 23)))val rdd: RDD[(String, Int)] = idAgeRDD.mapValues(age => age + 1)rdd.foreach(println)
结果:
(001,24)
(002,25)
(003,26)
(004,24)

10、sortBy

        指定一个字段进行排序

rdd.sortBy(kv => kv._2, ascending = false)
//ascending=false 表示降序,true表示升序

11、distinct

        对数据去重,会产生shuffle

al rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 7, 87, 9, 4, 4, 3, 2, 4, 5, 6))val distinctRDD: RDD[Int] = rdd1.distinct()distinctRDD.foreach(print)//4163798752

二、操作算子

//count: 统计rdd的数据行数val count: Long = studentRDD.count()println(count)//sum: 对rdd中的数据求和,rdd中的数据类型必须是数字val sumAge: Double = studentRDD.map(line => line.split(",")(2).toInt)//对所有数据求和,只能是数字类型.sum()println(sumAge / count)//take: 取集合的前几个数, 返回一个数组,不能取太多, 会导致内存溢出val top: Array[String] = studentRDD.take(10)top.foreach(println)//collect: 将rdd转换成数组,如果rdd数据量比较大,会导致内存溢出val array: Array[String] = studentRDD.collect()array.foreach(println)/**
* foreach: 遍历rdd中的数据,也是一个action算子
* foreachPartition: 一次将一个分区的数据传递给后面的函数
*/studentRDD.foreach(println)studentRDD.foreachPartition((iter: Iterator[String]) => {iter.foreach(println)})//saveAsTextFile: 将数据保存在hdfs中HDFSUtil.deletePath("data/test")studentRDD.saveAsTextFile("data/test")

三、累加器

//算子的代码运行在Driver端var count = 0studentRDD.foreach(stu => {//算子内的代码运行在Executor端//在spark写代码的时候不能在算子内取修改算子外的一个普通变量,//就算修改了在算子外也不会生效count += 1println(count) //会打出依次输出数据的数量})println(count) //输出为0/*** 累加器*///1、定义累加器val countAcc: LongAccumulator = sc.longAccumulatorstudentRDD.foreach(stu => {//2、在算子内对累加器进行累加countAcc.add(1)})//3、在算子外获取累加的结果println(countAcc.value) //结果为数据的数量

四、广播变量

def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("bro")conf.setMaster("local")val sc = new SparkContext(conf)val studentRDD: RDD[String] = sc.textFile("data/students.txt")val scoreRDD: RDD[String] = sc.textFile("data/score.txt")val array: Array[String] = studentRDD.collect()val array2: Array[(String, String)] = array.map(stu => {val s: Array[String] = stu.split(",")(s(0), stu)})val map: Map[String, String] = array2.toMap  //普通变量val Rdd: RDD[(String, String)] = scoreRDD.map(stu => {val strings: Array[String] = stu.split(",")val str: String = map.getOrElse(strings(0), "默认值·") //在算子内使用普通变量(str, stu)})Rdd.foreach(println(_))}
}

 

val map: Map[String, String] = array2.toMap//将一个普通变量广播出去,通常是较大的变量val mapBro: Broadcast[Map[String, String]] = sc.broadcast(map)val Rdd: RDD[(String, String)] = scoreRDD.map(stu => {val strings: Array[String] = stu.split(",")//使用value方法获取广播变量中的值val values: Map[String, String] = mapBro.valueval str: String = values.getOrElse(strings(0), "默认值·")(str, stu)})Rdd.foreach(println(_))

这篇关于Spark算子:转化算子、执行算子;累加器、广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/786648

相关文章

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

Python变量与数据类型全解析(最新整理)

《Python变量与数据类型全解析(最新整理)》文章介绍Python变量作为数据载体,命名需遵循字母数字下划线规则,不可数字开头,大小写敏感,避免关键字,本文给大家介绍Python变量与数据类型全解析... 目录1、变量变量命名规范python数据类型1、基本数据类型数值类型(Number):布尔类型(bo

一文全面详解Python变量作用域

《一文全面详解Python变量作用域》变量作用域是Python中非常重要的概念,它决定了在哪里可以访问变量,下面我将用通俗易懂的方式,结合代码示例和图表,带你全面了解Python变量作用域,需要的朋友... 目录一、什么是变量作用域?二、python的四种作用域作用域查找顺序图示三、各作用域详解1. 局部作

Python使用Code2flow将代码转化为流程图的操作教程

《Python使用Code2flow将代码转化为流程图的操作教程》Code2flow是一款开源工具,能够将代码自动转换为流程图,该工具对于代码审查、调试和理解大型代码库非常有用,在这篇博客中,我们将深... 目录引言1nVflRA、为什么选择 Code2flow?2、安装 Code2flow3、基本功能演示

java变量内存中存储的使用方式

《java变量内存中存储的使用方式》:本文主要介绍java变量内存中存储的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、变量的定义3、 变量的类型4、 变量的作用域5、 内存中的存储方式总结1、介绍在 Java 中,变量是用于存储程序中数据

MySQL中SQL的执行顺序详解

《MySQL中SQL的执行顺序详解》:本文主要介绍MySQL中SQL的执行顺序,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql中SQL的执行顺序SQL执行顺序MySQL的执行顺序SELECT语句定义SELECT语句执行顺序总结MySQL中SQL的执行顺序

JavaScript时间戳与时间的转化常用方法

《JavaScript时间戳与时间的转化常用方法》在JavaScript中,时间戳(Timestamp)通常指Unix时间戳,即从1970年1月1日00:00:00UTC到某个时间点经过的毫秒数,下面... 目录1. 获取当前时间戳2. 时间戳 → 时间对象3. 时间戳php → 格式化字符串4. 时间字符