Spark SQL重点知识总结

2024-09-06 21:58

本文主要是介绍Spark SQL重点知识总结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、Spark SQL的概念理解

Spark SQL是spark套件中一个模板,它将数据的计算任务通过SQL的形式转换成了RDD的计算,类似于Hive通过SQL的形式将数据的计算任务转换成了MapReduce。

Spark SQL的特点:
1、和Spark Core的无缝集成,可以在写整个RDD应用的时候,配置Spark SQL来完成逻辑实现。
2、统一的数据访问方式,Spark SQL提供标准化的SQL查询。
3、Hive的继承,Spark SQL通过内嵌的hive或者连接外部已经部署好的hive案例,实现了对hive语法的继承和操作。
4、标准化的连接方式,Spark SQL可以通过启动thrift Server来支持JDBC、ODBC的访问,将自己作为一个BI Server使用

Spark SQL数据抽象:
1、RDD(Spark1.0)->DataFrame(Spark1.3)->DataSet(Spark1.6)
2、Spark SQL提供了DataFrame和DataSet的数据抽象
3、DataFrame就是RDD+Schema,可以认为是一张二维表格,劣势在于编译器不进行表格中的字段的类型检查,在运行期进行检查
4、DataSet是Spark最新的数据抽象,Spark的发展会逐步将DataSet作为主要的数据抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的优化机制。除此之外提供了以样例类为Schema模型的强类型
5、DataFrame=DataSet[Row]
6、DataFrame和DataSet都有可控的内存管理机制,所有数据都保存在非堆上,都使用了catalyst进行SQL的优化。

Spark SQL客户端查询:
1、可以通过Spark-shell来操作Spark SQL,spark作为SparkSession的变量名,sc作为SparkContext的变量名
2、可以通过Spark提供的方法读取json文件,将json文件转换成DataFrame
3、可以通过DataFrame提供的API来操作DataFrame里面的数据。
4、可以通过将DataFrame注册成为一个临时表的方式,来通过Spark.sql方法运行标准的SQL语句来查询。

二、Spark SQL查询方式

DataFrame查询方式

1、DataFrame支持两种查询方式:一种是DSL风格,另外一种是SQL风格
(1)、DSL风格:
需要引入import spark.implicit._这个隐式转换,可以将DataFrame隐式转换成RDD
(2)、SQL风格:
a、需要将DataFrame注册成一张表格,如果通过CreateTempView这种方式来创建,那么该表格Session有效,如果通过CreateGlobalTempView来创建,那么该表格跨Session有效,但是SQL语句访问该表格的时候需要加上前缀global_temp
b、需要通过sparkSession.sql方法来运行你的SQL语句


DataSet查询方式

定义一个DataSet,先定义一个Case类


三、DataFrame、Dataset和RDD互操作

1、RDD->DataFrame:

  • 普通方式:例如rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF("name","age")

  • 通过反射来设置schema,例如:

 
#通过反射设置schema,数据集是spark自带的people.txt,路径在下面的代码中	
case class Person(name:String,age:Int)	
val peopleDF=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt").map(_.split(",")).map(para=>Person(para(0).trim,para(1).trim.toInt)).toDF	
peopleDF.show

640

 
#注册成一张临时表	
peopleDF.createOrReplaceTempView("persons")	
val teen=spark.sql("select name,age from persons where age between 13 and 29")	
teen.show

640

这时teen是一张表,每一行是一个row对象,如果需要访问Row对象中的每一个元素,可以通过下标 row(0);你也可以通过列名 row.getAs[String]("name")

640也可以使用getAs方法:

640

3、通过编程的方式来设置schema,适用于编译器不能确定列的情况

 
val peopleRDD=spark.sparkContext.textFile("file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt")	
val schemaString="name age"	
val filed=schemaString.split(" ").map(filename=> org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))	
val schema=org.apache.spark.sql.types.StructType(filed)	
peopleRDD.map(_.split(",")).map(para=>org.apache.spark.sql.Row(para(0).trim,para(1).trim))	
val peopleDF=spark.createDataFrame(res6,schema)	
peopleDF.show

640

640

640


2、DataFrame->RDD:

 
dataFrame.rdd

3、RDD->DataSet:

 
 
rdd.map(para=> Person(para(0).trim(),para(1).trim().toInt)).toDS

4、DataSet->DataSet:

 
dataSet.rdd

5、DataFrame -> DataSet:

 
dataFrame.to[Person]

6、DataSet -> DataFrame:

 
dataSet.toDF


四、用户自定义函数

1、用户自定义UDF函数

通过spark.udf功能用户可以自定义函数
自定义udf函数:
1、  通过spark.udf.register(name,func)来注册一个UDF函数,name是UDF调用时的标识符,fun是一个函数,用于处理字段。
2、  需要将一个DF或者DS注册为一个临时表
3、  通过spark.sql去运行一个SQL语句,在SQL语句中可以通过name(列名)方式来应用UDF函数

2、用户自定义聚合函数


弱类型用户自定义聚合函数

  • 新建一个Class 继承UserDefinedAggregateFunction  ,然后复写方法:

 
override def inputSchema: StructType = ???	
override def bufferSchema: StructType = ???	
override def dataType: DataType = ???	
override def deterministic: Boolean = true	
override def initialize(buffer: MutableAggregationBuffer): Unit = ???	
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???	
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???	
override def evaluate(buffer: Row): Any = ???
  • 你需要通过spark.udf.resigter去注册你的UDAF函数。

  • 需要通过spark.sql去运行你的SQL语句,可以通过 select UDAF(列名) 来应用你的用户自定义聚合函数。


强类型用户自定义聚合函数

1、新建一个class,继承Aggregator[Employee, Average, Double],其中Employee是在应用聚合函数的时候传入的对象,Average是聚合函数在运行的时候内部需要的数据结构,Double是聚合函数最终需要输出的类型。这些可以根据自己的业务需求去调整。复写相对应的方法:

 
override def zero: Average = ???	
override def reduce(b: Average, a: Employee): Average = ???	
override def merge(b1: Average, b2: Average): Average = ???	
override def finish(reduction: Average): Double = ???	
override def bufferEncoder: Encoder[Average] = ???	
override def outputEncoder: Encoder[Double] = ???

2、新建一个UDAF实例,通过DF或者DS的DSL风格语法去应用。

五、Spark SQL和Hive的继承

内置Hive

1、Spark内置有Hive,Spark2.1.1 内置的Hive是1.2.1。
2、需要将core-site.xml和hdfs-site.xml 拷贝到spark的conf目录下。如果Spark路径下发现metastore_db,需要删除【仅第一次启动的时候】。
3、在你第一次启动创建metastore的时候,你需要指定spark.sql.warehouse.dir这个参数,
比如:
bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
4、注意,如果你在load数据的时候,需要将数据放到HDFS上。

外部Hive(这里主要使用这个方法)

1、需要将hive-site.xml 拷贝到spark的conf目录下。
2、如果hive的metestore使用的是mysql数据库,那么需要将mysql的jdbc驱动包放到spark的jars目录下。

3、可以通过spark-sql或者spark-shell来进行sql的查询。完成和hive的连接。

640

这就是hive里面的表

640


六、Spark SQL的数据源

输入

对于Spark SQL的输入需要使用sparkSession.read方法

1、通用模式   sparkSession.read.format("json").load("path")   支持类型:parquet、json、text、csv、orc、jdbc
2、专业模式   sparkSession.read.json、 csv  直接指定类型。

输出

对于Spark SQL的输出需要使用  sparkSession.write方法

1、通用模式   dataFrame.write.format("json").save("path")  支持类型:parquet、json、text、csv、orc

2、专业模式   dataFrame.write.csv("path")  直接指定类型

3、如果你使用通用模式,spark默认parquet是默认格式、sparkSession.read.load 加载的默认是parquet格式dataFrame.write.save也是默认保存成parquet格式。

4、如果需要保存成一个text文件,那么需要dataFrame里面只有一列(只需要一列即可)。

七、Spark SQL实战

1、数据说明(有需要的可以下方留言)

这里有三个数据集,合起来大概有几十万条数据,是关于货品交易的数据集。

640?wx_fmt=other

2、任务

这里有三个需求:
1、计算所有订单中每年的销售单数、销售总额
2、计算所有订单每年最大金额订单的销售额
3、计算所有订单中每年最畅销货品

3、步骤


1、加载数据:

tbStock.txt

 
#代码	
case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable	
val tbStockRdd=spark.sparkContext.textFile("file:///root/dataset/tbStock.txt")	
val tbStockDS=tbStockRdd.map(_.split(",")).map(attr=>tbStock(attr(0),attr(1),attr(2))).toDS	
tbStockDS.show()

640

640

640

640

tbStockDetail.txt

 
case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable	
val tbStockDetailRdd=spark.sparkContext.textFile("file:///root/dataset/tbStockDetail.txt")	
val tbStockDetailDS=tbStockDetailRdd.map(_.split(",")).map(attr=>tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS	
tbStockDetailDS.show()


640

640

640

640

tbDate.txt

 
case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable	
val tbDateRdd=spark.sparkContext.textFile("file:///root/dataset/tbDate.txt")	
val tbDateDS=tbDateRdd.map(_.split(",")).map(attr=>tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS	
tbDateDS.show()

640640640640

2、注册表

 
 
tbStockDS.createOrReplaceTempView("tbStock")	
tbDateDS.createOrReplaceTempView("tbDate")	
tbStockDetailDS.createOrReplaceTempView("tbStockDetail")

640

3、解析表

1、计算所有订单中每年的销售单数、销售总额

 
select c.theyear,count(distinct a.ordernumber),sum(b.amount)	
from tbStock a	
join tbStockDetail b on a.ordernumber=b.ordernumber	
join tbDate c on a.dateid=c.dateid	
group by c.theyear	
order by c.theyear

640

2、计算所有订单每年最大金额订单的销售额

a、先统计每年每个订单的销售额

 
select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount	
from tbStock a	
join tbStockDetail b on a.ordernumber=b.ordernumber	
group by a.dateid,a.ordernumber

640

b、计算最大金额订单的销售额

 
select d.theyear,c.SumOfAmount as SumOfAmount 	
from	
(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount 	
from tbStock a	
join tbStockDetail b on a.ordernumber=b.ordernumber  	
group by a.dateid,a.ordernumber) c  	
join tbDate d on c.dateid=d.dateid  	
group by d.theyear	
order by theyear desc

640

3、计算所有订单中每年最畅销货品

a、求出每年每个货品的销售额

 
select c.theyear,b.itemid,sum(b.amount) as SumOfAmount 	
from tbStock a 	
join tbStockDetail b on a.ordernumber=b.ordernumber 	
join tbDate c on a.dateid=c.dateid 	
group by c.theyear,b.itemid

640

b、在a的基础上,统计每年单个货品的最大金额

 
select d.theyear,max(d.SumOfAmount) as MaxOfAmount 	
from	
(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount 	
from tbStock a 	
join tbStockDetail b on a.ordernumber=b.ordernumber 	
join tbDate c on a.dateid=c.dateid 	
group by c.theyear,b.itemid) d 	
group by theyear

640

c、用最大销售额和统计好的每个货品的销售额join,以及用年join,集合得到最畅销货品那一行信息

 
select distinct e.theyear,e.itemid,f.maxofamount 	
from	
(select c.theyear,b.itemid,sum(b.amount) as sumofamount 	
from tbStock a 	
join tbStockDetail b on a.ordernumber=b.ordernumber 	
join tbDate c on a.dateid=c.dateid 	
group by c.theyear,b.itemid) e 	
join	
(select d.theyear,max(d.sumofamount) as maxofamount 	
from	
(select c.theyear,b.itemid,sum(b.amount) as sumofamount 	
from tbStock a 	
join tbStockDetail b on a.ordernumber=b.ordernumber 	
join tbDate c on a.dateid=c.dateid 	
group by c.theyear,b.itemid) d 	
group by d.theyear) f on e.theyear=f.theyear 	
and e.sumofamount=f.maxofamount order by e.theyear

640

640?wx_fmt=gif

640?wx_fmt=jpeg

这篇关于Spark SQL重点知识总结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1143230

相关文章

MySQL多实例管理如何在一台主机上运行多个mysql

《MySQL多实例管理如何在一台主机上运行多个mysql》文章详解了在Linux主机上通过二进制方式安装MySQL多实例的步骤,涵盖端口配置、数据目录准备、初始化与启动流程,以及排错方法,适用于构建读... 目录一、什么是mysql多实例二、二进制方式安装MySQL1.获取二进制代码包2.安装基础依赖3.清

详解MySQL中JSON数据类型用法及与传统JSON字符串对比

《详解MySQL中JSON数据类型用法及与传统JSON字符串对比》MySQL从5.7版本开始引入了JSON数据类型,专门用于存储JSON格式的数据,本文将为大家简单介绍一下MySQL中JSON数据类型... 目录前言基本用法jsON数据类型 vs 传统JSON字符串1. 存储方式2. 查询方式对比3. 索引

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

Spring Boot 与微服务入门实战详细总结

《SpringBoot与微服务入门实战详细总结》本文讲解SpringBoot框架的核心特性如快速构建、自动配置、零XML与微服务架构的定义、演进及优缺点,涵盖开发环境准备和HelloWorld实战... 目录一、Spring Boot 核心概述二、微服务架构详解1. 微服务的定义与演进2. 微服务的优缺点三

SpringBoot集成MyBatis实现SQL拦截器的实战指南

《SpringBoot集成MyBatis实现SQL拦截器的实战指南》这篇文章主要为大家详细介绍了SpringBoot集成MyBatis实现SQL拦截器的相关知识,文中的示例代码讲解详细,有需要的小伙伴... 目录一、为什么需要SQL拦截器?二、MyBATis拦截器基础2.1 核心接口:Interceptor

MySQL 8 中的一个强大功能 JSON_TABLE示例详解

《MySQL8中的一个强大功能JSON_TABLE示例详解》JSON_TABLE是MySQL8中引入的一个强大功能,它允许用户将JSON数据转换为关系表格式,从而可以更方便地在SQL查询中处理J... 目录基本语法示例示例查询解释应用场景不适用场景1. ‌jsON 数据结构过于复杂或动态变化‌2. ‌性能要

MySQL字符串常用函数详解

《MySQL字符串常用函数详解》本文给大家介绍MySQL字符串常用函数,本文结合实例代码给大家介绍的非常详细,对大家学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql字符串常用函数一、获取二、大小写转换三、拼接四、截取五、比较、反转、替换六、去空白、填充MySQL字符串常用函数一、

MySQL中比较运算符的具体使用

《MySQL中比较运算符的具体使用》本文介绍了SQL中常用的符号类型和非符号类型运算符,符号类型运算符包括等于(=)、安全等于(=)、不等于(/!=)、大小比较(,=,,=)等,感兴趣的可以了解一下... 目录符号类型运算符1. 等于运算符=2. 安全等于运算符<=>3. 不等于运算符<>或!=4. 小于运

虚拟机Centos7安装MySQL数据库实践

《虚拟机Centos7安装MySQL数据库实践》用户分享在虚拟机安装MySQL的全过程及常见问题解决方案,包括处理GPG密钥、修改密码策略、配置远程访问权限及防火墙设置,最终通过关闭防火墙和停止Net... 目录安装mysql数据库下载wget命令下载MySQL安装包安装MySQL安装MySQL服务安装完成

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java