Hadoop+Spark大数据技术(微课版)曾国荪、曹洁版 第七章 Spark RDD编程实验

本文主要是介绍Hadoop+Spark大数据技术(微课版)曾国荪、曹洁版 第七章 Spark RDD编程实验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Spark中,`sc` 和 `reduce` 分别代表 SparkContext 和 reduce 函数。

1. SparkContext (`sc`):是 Spark 的主要入口点,它代表与 Spark 集群的连接。
在 Spark 应用程序中,需要首先创建一个 SparkContext 对象,负责与集群通信并管理资源。通过 SparkContext,可以创建 RDDs(Resilient Distributed Datasets)、Broadcast Variables、Accumulators 等。
2. Reduce 函数:在分布式计算中,reduce 函数是一个用于聚合数据的操作。
在 Spark 中,reduce 函数通常用于对 RDD(Resilient Distributed Dataset)中的元素执行聚合操作。
接受一个函数作为参数,函数定义了如何将两个元素聚合成一个新元素。
reduce 函数会在集群中的多个节点上并行执行,并在最后将结果聚合起来。
常见的用例包括对数字进行求和、查找最大值或最小值等。

使用文本文件创建RDD

注意:需要先在hadoop分布式文件系统中创建文件

1.先在本地文件系统创建data.txt文件

cd /usr/local/hadoop/input

gedit data.txt

2.启动hadoop分布式文件系统

./sbin/start-dfs.sh

3.上传本地文件data.txt到hadoop分布式文件系统

hdfs dfs -put /usr/local/hadoop/input/data.txt input

查看分布式文件系统中是否存在data.txt
hdfs dfs -ls input
e92c49aae00645cb8a704563a14caca6.png

textFile 函数用于读取文本文件中的内容,并将其转换为RDD。

rdd.map(line => line.length):这一行代码将每一行文本映射为其长度,即字符数。

.map()函数对RDD中的每个元素应用给定的函数,并返回结果组成的新RDD。

.reduce(_ + _):这一行代码对RDD中的所有元素进行累加,得到它们的总和。.reduce()函数将RDD中的元素两两合并,直到只剩下一个元素,这里使用的匿名函数 _ + _ 表示将两个元素相加。

val wordCount = rdd.map(line => line.length).reduce(_ + _):这行代码将文本文件中所有行的字符数求和,并将结果保存在wordCount变量中。

wordCount:最后,wordCount 变量中保存着文本文件中所有行的字符总数。

val rdd = sc.textFile("/user/hadoop/input/data.txt")
rdd: org.apache.spark.rdd.RDD[String] = /user/hadoop/input/data.txt MapPartitionsRDD[31] at textFile at <console>:25val wordCount = rdd.map(line => line.length).reduce(_ + _)
wordCountwordCount: Int = 274
res11: Int = 274

一些JSON数据

{
  "name": "中国",
  "province": [
    {
      "name": "河南",
      "cities": [
        {
          "city": ["郑州","洛阳"]
      ]
    }
  ]
}
```

{
  "code": 0,
  "msg": "",
  "count": 2,
  "data": [
    {
      "id": 101,
      "username": "zhangsan",
      "city": "xiamen"
    },
    {
      "id": 102,
      "username": "liming",
      "city": "zhengzhou"
    }
  ]
}
{"学号":"106","姓名":"李明","数据结构":"92"}

{"学号":"242","姓名":"李乐","数据结构":"93"}

{"学号":"107","姓名":"冯涛","数据结构":"99"}

完整程序练习

seq是一个包含元组的列表,每个元组都有一个字符串键和一个字符串列表值。

sc.parallelize方法将seq转换为一个RDD。

rddP.partitions.size是用来获取RDD的分区数量的。RDD的分区是数据的逻辑划分,决定了数据在集群中的分布方式。每个分区都可以在集群的不同节点上进行并行处理。

//使用程序中的数据创建RDDval arr = Array(1,2,3,4,5,6)val rdd = Array(1,2,3,4,5,6)arr: Array[Int] = Array(1, 2, 3, 4, 5, 6)
rdd: Array[Int] = Array(1, 2, 3, 4, 5, 6)val sum = rdd.reduce(_ + _)sum: Int = 21//parallelize() 创建RDDval seq = List( ("num", List("one","two","three") ), ("study",List("Scala","Python","Hadoop")) , ("color", List("blue","white","black")) )val rddP = sc.parallelize(seq)   rddp.partitions.sizeseq: List[(String, List[String])] = List((num,List(one, two, three)), (study,List(Scala, Python, Hadoop)), (color,List(blue, white, black)))
rddP: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[7] at parallelize at <console>:31
res5: Int = 2//makeRDD() 创建RDDval rddM = sc.makeRDD(seq)rddM.partitions.sizerddM: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at makeRDD at <console>:30
res6: Int = 3//hdfs的文件创建RDDval rdd = sc.textFile("/user/hadoop/input/data.txt")rdd: org.apache.spark.rdd.RDD[String] = /user/hadoop/input/data.txt MapPartitionsRDD[31] at textFile at <console>:25val wordCount = rdd.map(line => line.length).reduce(_ + _)wordCountwordCount: Int = 274
res11: Int = 274//本地文件创建RDDval rdd = sc.textFile("file:/home/hadoop/data.txt")val wordCount = rdd.map(line => line.length).reduce(_ + _)wordCountrdd: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/data.txt MapPartitionsRDD[34] at textFile at <console>:29
wordCount: Int = 274
res12: Int = 274val rddw1 = sc.textFile("file:/home/hadoop/input")rddw1.collect()rddw1: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/input MapPartitionsRDD[37] at textFile at <console>:25
res13: Array[String] = Array(Hello Spark!, Hello Scala!)val rddw1 = sc.wholeTextFiles("file:/home/hadoop/input")rddw1.collect()rddw1: org.apache.spark.rdd.RDD[(String, String)] = file:/home/hadoop/input MapPartitionsRDD[39] at wholeTextFiles at <console>:27
res14: Array[(String, String)] =
Array((file:/home/hadoop/input/text1.txt,"Hello Spark!
"), (file:/home/hadoop/input/text2.txt,"Hello Scala!
"))val jsonStr = sc.textFile("file:/home/hadoop/student.json")jsonStr.collect()jsonStr: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/student.json MapPartitionsRDD[48] at textFile at <console>:28
res18: Array[String] = Array({"学号":"106","姓名":"李明","数据结构":"92"}, {"学号":"242","姓名":"李乐","数据结构":"93"}, {"学号":"107","姓名":"冯涛","数据结构":"99"})import scala.util.parsing.json.JSONval jsonStr = sc.textFile("file:/home/hadoop/student.json")val result  = jsonStr.map(s => JSON.parseFull(s))result.foreach(println)Some(Map(学号 -> 107, 姓名 -> 冯涛, 数据结构 -> 99))
Some(Map(学号 -> 106, 姓名 -> 李明, 数据结构 -> 92))
Some(Map(学号 -> 242, 姓名 -> 李乐, 数据结构 -> 93))import scala.util.parsing.json.JSON
jsonStr: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/student.json MapPartitionsRDD[50] at textFile at <console>:31
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[51] at map at <console>:32import java.io.StringReaderimport au.com.bytecode.opencsv.CSVReaderval gradeRDD = sc.textFile("file:/home/hadoop/sparkdata/grade.csv")val result = gradeRDD.map{line => val reader = new CSVReader(new StringReader(line));reader.readNext()}result.collect().foreach(x => println(x(0),x(1),x(2)))(101,LiNing,95)
(102,LiuTao,90)
(103,WangFei,96)import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
gradeRDD: org.apache.spark.rdd.RDD[String] = file:/home/hadoop/sparkdata/grade.csv MapPartitionsRDD[53] at textFile at <console>:31
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[54] at map at <console>:32

 

 

 

 

 

这篇关于Hadoop+Spark大数据技术(微课版)曾国荪、曹洁版 第七章 Spark RDD编程实验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/951054

相关文章

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

Python实现PDF按页分割的技术指南

《Python实现PDF按页分割的技术指南》PDF文件处理是日常工作中的常见需求,特别是当我们需要将大型PDF文档拆分为多个部分时,下面我们就来看看如何使用Python创建一个灵活的PDF分割工具吧... 目录需求分析技术方案工具选择安装依赖完整代码实现使用说明基本用法示例命令输出示例技术亮点实际应用场景扩

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I