Spark 入门之十一:Spark数据处理常用的那几招

2024-02-14 11:18

本文主要是介绍Spark 入门之十一:Spark数据处理常用的那几招,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

最近看完了《Spark 大数据处理》一数,收益非浅,又结合平时工作中用到的一些开发实践,用Python实现了Spark编程过程中经常用到且比较基础的编程模型,拿出来与大家分享,如有不足还请补充。
《Spark 大数据处理》 一书中也有相关的例子,但是是用Scala实现的,个人觉得还是Python API的语法还是更加简洁清晰,所以选择了用Python来实现,语言都是浮云,主要还是看思路,由于篇幅限制,只列出部分核心代码。 

  • 第一招:wordcount
    • 介绍:对于大数据编程领域的经典例子,是肯定要第一招学会的,其实wordcount并不只是个wordcount,它还有很用应用的场景,例如统计过去一段时间网站中各个商品的浏览量,最近一段时间相同查询的数量等
    • 代码逻辑
      实现该功能主要包括以下四步
      • 将文件内容转换成RDD
      • 对文件的行按照特定字符分割
      • 将每个提取到的数据转换成(element,1)的格式
      • 把所有的(element,1)的数据使用reduceBy进行element的叠加统计
    • 实现代码
      完整功能的核心代码加上打印只需要两行
      data_file=sc.textFile("hdfs://10.5.24.137:9990/temp/2016052512/tf_00000000")
      result=data_file.map(lambda x:x.split("\t")[0]).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
      result.foreach(print)

  • 第二招:TopK
    • 介绍
      TopK也是进行大数据编程的经典案例。改编程模型主要用在“最近一段时间内登录系统最多的IP,文章/商品浏览排行TopN”等应用场景
    • 代码逻辑
      代码逻辑比wordcount多了一个TopN的特性,考虑到数据量可能很大,如果将数据从各个节点传到单一节点进行排序的话,在网络上损耗的时间可能会比较大,这里采用了先在各个分区上选出TopN,再对每个分区的TopN结果再进行一次TopN的处理,从而减少的数据的传输。在单个分区上选择TopN的代码中使用了heap作为底层的数据结构
    • 实现代码
      data_file=sc.textFile("hdfs://10.5.24.137:9990/temp/2016052512/tf_00000000")
      #先算出全部的wordcount
      result=data_file.map(lambda x:x.split("\t")[0]).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
      topn=10
      #使用heapq.nlarges方法算出每个分区的topn,排序的key使用e[1]
      par_topk=result.mapPartitions(lambda elements:heapq.nlargest(topn,elements,key=lambda e:e[1]))
      #汇总每个分区的topn,并对所有分区的topn结果再次取topn,排序的key使用e[1]
      final_result=heapq.nlargest(topn,par_topk.collect(),key=lambda e:e[1])
      print final_result

  • 第三招:中位数
    • 介绍
      首先说一下啥叫中位数:
      百度的定义为:中位数(又称中值,英语:Median),统计学中的专有名词,代表一个样本、种群或概率分布中的一个数值,其可将数值集合划分为相等的上下两部分。对于有限的数集,可以通过把所有观察值高低排序后找出正中间的一个作为中位数。如果观察值有偶数个,通常取最中间的两个数值的平均数作为中位数。
      说白了,就是一个有序数据中,处于中间位置的那个数

      对比前面两招,这一招似乎不是那么的出名,其实这一招的应用场景也很广,比如对于海量的数据需要统计某一列数据的中位数,从而了解数据的分布情况。

    • 代码逻辑
      1,将整体的数据分为K个桶,统计每个桶内的数据量,然后统计整个数据量
      2,根据桶的数量和总的数据量,可以判断数据落在哪个桶里,以及中位数的偏移量
      3,取出这个中位数

      对于数据量不大的情况,使用上面的逻辑是没有任何好处的,只有在海量数据的情况下,才能体现出上面算法的威力

    • 实现代码
      #对排序数组进行分组,分组的数量和数据量相关
      group_element=sorted_array.map(lambda e:(e/10,e)).sortByKey()
      #统计每个分组的元素个数
      group_element_count=sorted_array.map(lambda e:(e/10,1)).reduceByKey(lambda x,y:x+y).sortByKey()
      group_element_count_map=group_element_count.collectAsMap()
      #算出总的元素个数
      element_count=group_element_count.map(lambda (k,v):v).sum()temp=0
      index=0
      mid=0
      temp2=0
      if element_count%2!=0:mid=element_count/2+1
      else:mid=element_count/2
      pcount=group_element_count.count()
      for i in range(pcount):temp+=group_element_count_map[i]temp2=temp-group_element_count_map[i]if temp>=mid:#得到中位的indexindex=ibreak
      offset=mid-temp2
      result=group_element.filter(lambda (k,v):k==index).takeOrdered(offset)


  • 第四招:Count Once
    • 介绍
      如果原始数组中所有的元素理论上都应该出现偶数次,该程序可以方便的找到出现奇数次的数,此招主要的应用场景为海量数据块损坏检测,例如每个数据块都有一个副本,有一个数据块损坏,要从海量的元数据信息中找出损坏的那个块
    • 代码逻辑
      1,对RDD中每个分区的数据进行异或操作
      2,对步骤1的结果再次进行异或操作
      3,当一个数字进行偶数次异或时,结果等于0,否则等于该数本身,由此得到出现奇数次的那个数
    • 实现代码
      base_array=range(10000000)*4
      base_array.append(1883)
      odd_rdd=sc.parallelize(base_array)
      #异或函数
      odd_func=lambda x,y:x^y
      #对每个分区进行处理的函数
      def odd(chain):result=reduce(odd_func,chain)yield (1,result)
      par_rdd=odd_rdd.mapPartitions(odd).cache()
      par_result=par_rdd.collect()
      #对每个分区的结果再次进行异或操作,最后的结果就是奇数次出现的那个数
      final_result=par_rdd.reduceByKey(lambda x,y:x^y).collect()[0][1]


  • 第五招:数据倾斜
    • 介绍
      此招要解决的问题也是实际数据分析过程中经常出现的问题,在执行一个RDD的操作时,有没有发现有个别的task会拖慢整个job的情况?有的话就要注意了,这种问题产生的原因一般都是由于数据分区不均匀导致,具体不均匀的原因可能是分区的key本身在业务上可能就存在倾斜,或是数据本身就有倾斜
    • 代码逻辑
      1,先对rdd进行采样,假设只有一个key倾斜,获取倾斜率最大的key
      2,根据步骤1得到的max_count_key,将原rdd进行拆分,一部分只包括max_count_key,另外一部分不包括max_count_key,然后分别与normal_rdd进行join,最后将结果union
    • 实现代码
      #存在数据倾斜的列表
      skew_list=[('a',random.randint(1,1000)) for i in range(1000000)]
      skew_list.append(('b',10))
      skew_list.append(('c',8))
      #正常的数据列表
      normal_list=[('a',9),('b',3),('c',8)]skew_rdd=sc.parallelize(skew_list)
      normal_rdd=sc.parallelize(normal_list)#进行倾斜处理
      #1,先对skew_rdd进行采样,假设只有一个key倾斜,获取倾斜率最大的key
      skew_sample=skew_rdd.sample(False,0.3,9).groupByKey()
      skew_sample.cache()
      skew_sample_count_map=skew_sample.map(lambda (k,v):(len(v),k))
      skew_sample_count_map.cache()
      max_count=skew_sample_count_map.reduce(lambda x,y:max(x[0],y[0]))[0]
      max_count_key=skew_sample_count_map.filter(lambda x:x[0]==max_count).collect()[0][1]#2,根据步骤1得到的max_count_key,将skew_rdd进行拆分,一部分只包括max_count_key,另外一部分不包括max_count_key,然后分别与normal_rdd进行join,最后将结果unionmax_key_rdd=skew_rdd.filter(lambda x:x[0]==max_count_key)
      other_key_rdd=skew_rdd.filter(lambda x:x[0]!=max_count_key)
      result1=max_key_rdd.join(normal_rdd) 
      result2=other_key_rdd.join(normal_rdd) 
      print result1.union(result2).count()



这篇关于Spark 入门之十一:Spark数据处理常用的那几招的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/708344

相关文章

Spring Boot中WebSocket常用使用方法详解

《SpringBoot中WebSocket常用使用方法详解》本文从WebSocket的基础概念出发,详细介绍了SpringBoot集成WebSocket的步骤,并重点讲解了常用的使用方法,包括简单消... 目录一、WebSocket基础概念1.1 什么是WebSocket1.2 WebSocket与HTTP

golang中reflect包的常用方法

《golang中reflect包的常用方法》Go反射reflect包提供类型和值方法,用于获取类型信息、访问字段、调用方法等,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值... 目录reflect包方法总结类型 (Type) 方法值 (Value) 方法reflect包方法总结

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

C# 比较两个list 之间元素差异的常用方法

《C#比较两个list之间元素差异的常用方法》:本文主要介绍C#比较两个list之间元素差异,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. 使用Except方法2. 使用Except的逆操作3. 使用LINQ的Join,GroupJoin

从入门到精通C++11 <chrono> 库特性

《从入门到精通C++11<chrono>库特性》chrono库是C++11中一个非常强大和实用的库,它为时间处理提供了丰富的功能和类型安全的接口,通过本文的介绍,我们了解了chrono库的基本概念... 目录一、引言1.1 为什么需要<chrono>库1.2<chrono>库的基本概念二、时间段(Durat

python常用的正则表达式及作用

《python常用的正则表达式及作用》正则表达式是处理字符串的强大工具,Python通过re模块提供正则表达式支持,本文给大家介绍python常用的正则表达式及作用详解,感兴趣的朋友跟随小编一起看看吧... 目录python常用正则表达式及作用基本匹配模式常用正则表达式示例常用量词边界匹配分组和捕获常用re

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

从入门到精通MySQL 数据库索引(实战案例)

《从入门到精通MySQL数据库索引(实战案例)》索引是数据库的目录,提升查询速度,主要类型包括BTree、Hash、全文、空间索引,需根据场景选择,建议用于高频查询、关联字段、排序等,避免重复率高或... 目录一、索引是什么?能干嘛?核心作用:二、索引的 4 种主要类型(附通俗例子)1. BTree 索引(

Redis 配置文件使用建议redis.conf 从入门到实战

《Redis配置文件使用建议redis.conf从入门到实战》Redis配置方式包括配置文件、命令行参数、运行时CONFIG命令,支持动态修改参数及持久化,常用项涉及端口、绑定、内存策略等,版本8... 目录一、Redis.conf 是什么?二、命令行方式传参(适用于测试)三、运行时动态修改配置(不重启服务

MySQL DQL从入门到精通

《MySQLDQL从入门到精通》通过DQL,我们可以从数据库中检索出所需的数据,进行各种复杂的数据分析和处理,本文将深入探讨MySQLDQL的各个方面,帮助你全面掌握这一重要技能,感兴趣的朋友跟随小... 目录一、DQL 基础:SELECT 语句入门二、数据过滤:WHERE 子句的使用三、结果排序:ORDE