Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look

本文主要是介绍Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、以本地模式实战map和filter

2、以集群模式实战textFile和cache

3、对Job输出结果进行升和降序

4、union

5、groupByKey

6、join

7、reduce

8、lookup

 

 

1、以本地模式实战map和filter

以local的方式,运行spark-shell。

spark@SparkSingleNode:~$ cd /usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ pwd
/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell

 

 从集合中创建RDD,spark中主要提供了两种函数:parallelize和makeRDD,

 

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val mappedRDD = rdd.map(2*_)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23

scala> mappedRDD.collect

得到

res0: Array[Int] = Array(2, 4, 6, 8, 10)

scala>

 

 

 

scala> val filteredRDD = mappedRDD.filter(_ > 4)
16/09/26 20:32:29 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on localhost:40688 in memory (size: 1218.0 B, free: 534.5 MB)
16/09/26 20:32:30 INFO spark.ContextCleaner: Cleaned accumulator 1
filteredRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:25

scala> filteredRDD.collect

 

 

注意,一般,生产环境和正宗的写法是。

scala> val filteredRDDAgain = sc.parallelize(List(1,2,3,4,5)).map(2 * _).filter(_ > 4).collect

 

 

 

 

 

 

2、以集群模式实战textFile和cache

 启动hadoop集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ jps
8457 Jps
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

启动spark集群

 

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 

 

 

读取该文件

scala> val rdd = sc.textFile("/README.md")

 使用count统计一下该文件的行数

scala> rdd.count

 

took 7.018386 s

res0: Long = 98

花了时间7.018386 s

 

通过观察RDD.scala源代码即可知道cache和persist的区别:

def persist(newLevel: StorageLevel): this.type = {
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
    throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level")
  }
  sc.persistRDD(this)
  sc.cleaner.foreach(_.registerRDDForCleanup(this))
  storageLevel = newLevel
  this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

可知:
1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;
2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
3)cache或者persist并不是action;
附:cache和persist都可以用unpersist来取消

 

进行缓存

scala> rdd.cache
res1: rdd.type = MapPartitionsRDD[1] at textFile at <console>:21

执行count,使得缓存生效

scala> rdd.count

 

took 2.055063 s
res2: Long = 98

花了时间 2.055063 s

 

再执行,count

took 0.583177 s
res3: Long = 98

花了时间 0.583177 s

 

总结,我们直接基于cache缓存后的数据,计算所消耗时间大大减少。

 

 正在进行中的spark-shell

 

 

 

 接着,对上面的RDD,进行wordcount操作

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23

scala> wordcount.collect

 

 通过saveAsTextFile把数据保存起来

 

res4: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (DataFrames...
scala> wordcount.saveAsTextFile("/result")

只是,仅仅对每行,做了wordcount而已。

 

 

3、对Job输出结果进行升和降序

升序

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted")

 

同理,去下载,不多赘述。

变了

 

 

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortBy(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted")
<console>:23: error: type mismatch;
found : Boolean(true)
required: ((Int, String)) => ?
val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortBy(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted")
^

scala>

 

 

 降序

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("/resultDescSorted")

 

下载,同理

 此刻,成功对Job输出结果进行了排序。

 

4、union

union的使用

scala> val rdd1 = sc.parallelize(List(('a',1),('b',1)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(('c',1),('d',1)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:21

scala> rdd1 union rdd2
res6: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[28] at union at <console>:26

scala> val result = rdd1 union rdd2
result: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[29] at union at <console>:25

 

 

使用collect操作,查看一下执行结果

scala> result.collect

res7: Array[(Char, Int)] = Array((a,1), (b,1), (c,1), (d,1))

 

5、groupByKey

 

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).groupByKey
wordcount: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[32] at groupByKey at <console>:23

scala> wordcount.collect

res8: Array[(String, Iterable[Int])] = Array((package,CompactBuffer(1)), (this,CompactBuffer(1)), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),CompactBuffer(1)), (Because,CompactBuffer(1)), (Python,CompactBuffer(1, 1)), (cluster.,CompactBuffer(1)), (its,CompactBuffer(1)), ([run,CompactBuffer(1)), (general,CompactBuffer(1, 1)), (YARN,,CompactBuffer(1)), (have,CompactBuffer(1)), (pre-built,CompactBuffer(1)), (locally.,CompactBuffer(1)), (locally,CompactBuffer(1, 1)), (changed,CompactBuffer(1)), (sc.parallelize(1,CompactBuffer(1)), (only,CompactBuffer(1)), (several,CompactBuffer(1)), (learning,,CompactBuffer(1)), (basic,CompactBuffer(1)), (first,CompactBuffer(1)), (This,CompactBuffer(1, 1)), (documentation,CompactBuffer(1, 1, 1)), (Confi...
scala>

 

6、join

 概念知识,参考

http://www.cnblogs.com/goforward/p/4748128.html  

scala> val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:21

scala> rdd1 join rdd2
res9: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[37] at join at <console>:26

scala> val result = rdd1 join rdd2
result: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[40] at join at <console>:25

scala> result.collect

 

res10: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))

scala>

 

可见,join操作,完全是一个笛卡尔积的操作。

 

 

 

7、reduce

reduce本身啊,在RDD操作里,属于一个action类型的操作,会导致job作业的提交和执行。

 

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:21

scala> rdd.reduce(_+_)

res11: Int = 15

 

8、lookup

scala> val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:21

scala> rdd2.lookup('a')    //返回一个seq, (5, 6) 是把a对应的所有元素的value提出来组成一个seq

 

res12: Seq[Int] = WrappedArray(5, 6)

 

这篇关于Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/778694

相关文章

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

Java中Map.Entry()含义及方法使用代码

《Java中Map.Entry()含义及方法使用代码》:本文主要介绍Java中Map.Entry()含义及方法使用的相关资料,Map.Entry是Java中Map的静态内部接口,用于表示键值对,其... 目录前言 Map.Entry作用核心方法常见使用场景1. 遍历 Map 的所有键值对2. 直接修改 Ma

Mybatis Plus Join使用方法示例详解

《MybatisPlusJoin使用方法示例详解》:本文主要介绍MybatisPlusJoin使用方法示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,... 目录1、pom文件2、yaml配置文件3、分页插件4、示例代码:5、测试代码6、和PageHelper结合6

Linux使用scp进行远程目录文件复制的详细步骤和示例

《Linux使用scp进行远程目录文件复制的详细步骤和示例》在Linux系统中,scp(安全复制协议)是一个使用SSH(安全外壳协议)进行文件和目录安全传输的命令,它允许在远程主机之间复制文件和目录,... 目录1. 什么是scp?2. 语法3. 示例示例 1: 复制本地目录到远程主机示例 2: 复制远程主

Java中JSON格式反序列化为Map且保证存取顺序一致的问题

《Java中JSON格式反序列化为Map且保证存取顺序一致的问题》:本文主要介绍Java中JSON格式反序列化为Map且保证存取顺序一致的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录背景问题解决方法总结背景做项目涉及两个微服务之间传数据时,需要提供方将Map类型的数据序列化为co

java String.join()方法实例详解

《javaString.join()方法实例详解》String.join()是Java提供的一个实用方法,用于将多个字符串按照指定的分隔符连接成一个字符串,这一方法是Java8中引入的,极大地简化了... 目录bVARxMJava String.join() 方法详解1. 方法定义2. 基本用法2.1 拼接

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

windows系统上如何进行maven安装和配置方式

《windows系统上如何进行maven安装和配置方式》:本文主要介绍windows系统上如何进行maven安装和配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不... 目录1. Maven 简介2. maven的下载与安装2.1 下载 Maven2.2 Maven安装2.

C/C++的OpenCV 进行图像梯度提取的几种实现

《C/C++的OpenCV进行图像梯度提取的几种实现》本文主要介绍了C/C++的OpenCV进行图像梯度提取的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录预www.chinasem.cn备知识1. 图像加载与预处理2. Sobel 算子计算 X 和 Y

Java Stream.reduce()方法操作实际案例讲解

《JavaStream.reduce()方法操作实际案例讲解》reduce是JavaStreamAPI中的一个核心操作,用于将流中的元素组合起来产生单个结果,:本文主要介绍JavaStream.... 目录一、reduce的基本概念1. 什么是reduce操作2. reduce方法的三种形式二、reduce