Spark SQL 源码分析之 In-Memory Columnar Storage 之 in-memory query

2023-10-11 10:10

本文主要是介绍Spark SQL 源码分析之 In-Memory Columnar Storage 之 in-memory query,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。

    那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式。

一、引子

本例使用hive console里查询cache后的src表。

select value from src

当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用。

即parse后,会形成InMemoryRelation结点,最后执行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。

如下:

 

 
  1. scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)

  2. 14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src

  3. 14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed

  4. exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =

  5. == Parsed Logical Plan ==

  6. Project [value#5]

  7. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

  8.  
  9. == Analyzed Logical Plan ==

  10. Project [value#5]

  11. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

  12.  
  13. == Optimized Logical Plan ==

  14. Project [value#5]

  15. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

  16.  
  17. == Physical Plan ==

  18. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口

  19.  
  20. Code Generation: false

  21. == RDD ==

 

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包含了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。

执行叶子节点,出发execute方法对内存数据进行查询。

1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每个分区进行操作。

2、获取要请求的attributes,如上,查询请求的是src表的value属性。

3、根据目的查询表达式,来获取在对应存储结构中,请求列的index索引。

4、通过ColumnAccessor来对每个buffer进行访问,获取对应查询数据,并封装为Row对象返回。

 

 
  1. private[sql] case class InMemoryColumnarTableScan(

  2.     attributes: Seq[Attribute],

  3.     relation: InMemoryRelation)

  4.   extends LeafNode {

  5.  
  6.  
  7.   override def output: Seq[Attribute] = attributes

  8.  
  9.  
  10.   override def execute() = {

  11.     relation.cachedColumnBuffers.mapPartitions { iterator =>

  12.       // Find the ordinals of the requested columns.  If none are requested, use the first.

  13.       val requestedColumns = if (attributes.isEmpty) {

  14.         Seq(0)

  15.       } else {

  16.         attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根据表达式exprId找出对应列的ByteBuffer的索引

  17.       }

  18.  
  19.  
  20.       iterator

  21.         .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根据索引取得对应请求列的ByteBuffer,并封装为ColumnAccessor。

  22.         .flatMap { columnAccessors =>

  23.           val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度

  24.           new Iterator[Row] {

  25.             override def next() = {

  26.               var i = 0

  27.               while (i < nextRow.length) {

  28.                 columnAccessors(i).extractTo(nextRow, i) //根据对应index和长度,从byterbuffer里取得值,封装到row里

  29.                 i += 1

  30.               }

  31.               nextRow

  32.             }

  33.  
  34.  
  35.             override def hasNext = columnAccessors.head.hasNext

  36.           }

  37.         }

  38.     }

  39.   }

  40. }

 

查询请求的列,如下:

 
  1. scala> exe.optimizedPlan

  2. res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =

  3. Project [value#5]

  4. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

  5.  
  6.  
  7. scala> val relation = exe.optimizedPlan(1)

  8. relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =

  9. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

  10.  
  11.  
  12. scala> val request_relation = exe.executedPlan

  13. request_relation: org.apache.spark.sql.execution.SparkPlan =

  14. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))

  15.  
  16.  
  17. scala> request_relation.output //请求的列,我们请求的只有value列

  18. res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)

  19.  
  20. scala> relation.output //默认保存在relation中的所有列

  21. res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)

  22.  
  23.  
  24. scala> val attributes = request_relation.output

  25. attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)


 

整个流程很简洁,关键步骤是第三步。根据ExprId来查找到,请求列的索引

attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

 

 
  1. //根据exprId找出对应ID

  2. scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

  3. attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据

  4.  
  5. scala> relation.output.foreach(e=>println(e.exprId))

  6. ExprId(4) //对应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>

  7. ExprId(5)

  8.  
  9. scala> request_relation.output.foreach(e=>println(e.exprId))

  10. ExprId(5)

三、ColumnAccessor

ColumnAccessor对应每一种类型,类图如下:

最后返回一个新的迭代器:

 

 
  1. new Iterator[Row] {

  2. override def next() = {

  3. var i = 0

  4. while (i < nextRow.length) { //请求列的长度

  5. columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer

  6. i += 1

  7. }

  8. nextRow//返回解析后的row

  9. }

  10.  
  11. override def hasNext = columnAccessors.head.hasNext

  12. }

 

四、总结

 

    Spark SQL In-Memory Columnar Storage的查询相对来说还是比较简单的,其查询思想主要和存储的数据结构有关。

    即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。

    查询时,根据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。

这篇关于Spark SQL 源码分析之 In-Memory Columnar Storage 之 in-memory query的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/187311

相关文章

MySQL中比较运算符的具体使用

《MySQL中比较运算符的具体使用》本文介绍了SQL中常用的符号类型和非符号类型运算符,符号类型运算符包括等于(=)、安全等于(=)、不等于(/!=)、大小比较(,=,,=)等,感兴趣的可以了解一下... 目录符号类型运算符1. 等于运算符=2. 安全等于运算符<=>3. 不等于运算符<>或!=4. 小于运

虚拟机Centos7安装MySQL数据库实践

《虚拟机Centos7安装MySQL数据库实践》用户分享在虚拟机安装MySQL的全过程及常见问题解决方案,包括处理GPG密钥、修改密码策略、配置远程访问权限及防火墙设置,最终通过关闭防火墙和停止Net... 目录安装mysql数据库下载wget命令下载MySQL安装包安装MySQL安装MySQL服务安装完成

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL逻辑删除与唯一索引冲突解决方案

《MySQL逻辑删除与唯一索引冲突解决方案》本文探讨MySQL逻辑删除与唯一索引冲突问题,提出四种解决方案:复合索引+时间戳、修改唯一字段、历史表、业务层校验,推荐方案1和方案3,适用于不同场景,感兴... 目录问题背景问题复现解决方案解决方案1.复合唯一索引 + 时间戳删除字段解决方案2:删除后修改唯一字

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

MySQL深分页进行性能优化的常见方法

《MySQL深分页进行性能优化的常见方法》在Web应用中,分页查询是数据库操作中的常见需求,然而,在面对大型数据集时,深分页(deeppagination)却成为了性能优化的一个挑战,在本文中,我们将... 目录引言:深分页,真的只是“翻页慢”那么简单吗?一、背景介绍二、深分页的性能问题三、业务场景分析四、