Spark SQL Columnar模块源码分析

2023-10-11 10:10

本文主要是介绍Spark SQL Columnar模块源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概述

本文介绍Spark SQL增加的Columnar模块代码实现。
首先介绍Columnar内的代码结构和实现,然后介绍在SqlContext里的使用方式。

Columnar

InMemoryColumnarTableScan

实现

InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划。

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)extends LeafNode {

传入的child是一个SparkPlan(确认了的物理执行计划)和一个属性序列。

行转列并cache的过程如下:

  lazy val cachedColumnBuffers = {val output = child.output// 遍历每个RDD的partiti	onval cached = child.execute().mapPartitions { iterator =>// 把属性Seq转换成为ColumnBuilder数组val columnBuilders = output.map { attribute =>// 都是基本ColumnBuilder,默认ByteBuffer大小ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)}.toArrayvar row: Row = null// RDD每个Partition的Rows,每个Row的所有field信息存到ColumnBuilder里while (iterator.hasNext) {row = iterator.next()var i = 0while (i < row.length) {columnBuilders(i).appendFrom(row, i)i += 1}}Iterator.single(columnBuilders.map(_.build()))}.cache()cached.setName(child.toString)// Force the materialization of the cached RDD.cached.count()cached}

ColumnType类用于表示Column的类型,他的typeId变量用来区分数据类型,生成对应的ColumnBuilder(typeId, initialSize=0, columnName)。ColumnBuilder的生成如下:

  def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {val builder = (typeId match {case INT.typeId     => new IntColumnBuildercase LONG.typeId    => new LongColumnBuildercase FLOAT.typeId   => new FloatColumnBuildercase DOUBLE.typeId  => new DoubleColumnBuildercase BOOLEAN.typeId => new BooleanColumnBuildercase BYTE.typeId    => new ByteColumnBuildercase SHORT.typeId   => new ShortColumnBuildercase STRING.typeId  => new StringColumnBuildercase BINARY.typeId  => new BinaryColumnBuildercase GENERIC.typeId => new GenericColumnBuilder}).asInstanceOf[ColumnBuilder]builder.initialize(initialSize, columnName)builder}

他的继承结构如下,主要有三大体系:



这里涉及到的是Basic这个体系,继承结构如下:

BasicColumnBuilder里,initialSize = 0,指使用ByteBuffer的默认大小,即10*1024*104。然后在initialize()方法,会初始化ByteBuffer。

接下来,针对RDD每个partition,

      var row: Row = nullwhile (iterator.hasNext) {row = iterator.next()var i = 0while (i < row.length) {columnBuilders(i).appendFrom(row, i)i += 1}}

进行了appendFrom操作:

  override def appendFrom(row: Row, ordinal: Int) {val field = columnType.getField(row, ordinal)buffer = ensureFreeSpace(buffer, columnType.actualSize(field))columnType.append(field, buffer)}

用于把一个Row的每一个field,都存到一个ColumnBuilder里。在这里指BasicColumnBuilder这个类,维护了一个自己的ByteBuffer,把row里的各个field信息都存在了buffer里。


最后ColumnBuilders里的每个ColumnBuilder进行build(),即BasicColumnBuilder.build()方法,进行了一次ByteBuffer的rewind()方法。

这个方法的结果是一个RDD集合。由于在结束前调用了.count()方法,所以RDD的计算是被执行了的,返回的是新的RDD。

在Spark SQL里,外部调用cachedColumnBuffers方法只有在uncache table的时候,进行了unpersisit()操作。


下面看execute()方法:

  override def execute() = {
cachedColumnBuffers.mapPartitions { iterator =>// 在RDD partition里,iterator.next()返回的是一个ByteBuffer// 也就是说,cachedColumnBuffers返回的结果RDD,类型是ByteBufferval columnBuffers = iterator.next()assert(!iterator.hasNext)new Iterator[Row] {// 访问每一个ByteBuffer里的列信息val columnAccessors = columnBuffers.map(ColumnAccessor(_))val nextRow = new GenericMutableRow(columnAccessors.length)override def next() = {var i = 0// 把column里的信息再转到Row里while (i < nextRow.length) {columnAccessors(i).extractTo(nextRow, i)i += 1}nextRow}override def hasNext = columnAccessors.head.hasNext}}}

使用

在SqlContext里选择cache table的时候,会使用该类。

其实在cache的时候,首先去catalog里寻找这个table的信息和table的执行计划,然后会进行执行(执行到物理执行计划生成),然后把这个table再放回catalog里维护起来,这个时候的执行计划已经是最终要执行的物理执行计划了。但是此时Columner模块相关的转换等操作都是没有触发的。

真正的触发还是在execute()的时候,同其他SparkPlan的execute()方法触发场景是一样的。


ColumnBuilder 与 ColumnAccessor

一个包装Row的每个field成Column;一个访问column,然后可以转回Row


关于压缩

private[sql] abstract class NativeColumnBuilder[T <: NativeType](override val columnStats: NativeColumnStats[T],override val columnType: NativeColumnType[T])extends BasicColumnBuilder[T, T#JvmType](columnStats, columnType)with NullableColumnBuilderwith AllCompressionSchemeswith CompressibleColumnBuilder[T]private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)


从继承结构看,压缩的builder和Accessor都以trait的方式继承了ColumnBuilder,而子类比如IntColumnBuilder,不但继承了BaseColumnBuilder,同时也具备压缩处理能力。

具体压缩处理可以参考CompressibleColumnBuilder类里的实现。

是否压缩会做一次判断,压缩比在0.8以下才执行压缩。

在build()的时候实施压缩,并且按照以下结构存在bytebuffer内。

 *    .--------------------------- Column type ID (4 bytes)*    |   .----------------------- Null count N (4 bytes)*    |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)*    |   |   |     .------------- Compression scheme ID (4 bytes)*    |   |   |     |   .--------- Compressed non-null elements*    V   V   V     V   V*    +---+---+-----+---+---------+*    |   |   | ... |   | ... ... |*    +---+---+-----+---+---------+*    \-----------/ \-----------/*        header         body


CompressionScheme子类是不同的压缩实现

都是scala实现的,未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比较每种压缩,选择压缩率最小的(若仍大于0.8就不压缩了)。

这里的估算能力,在子类实现里,好像是由gatherCompressibilityStats方法实现的。


SqlContext

分析SqlContext内目前cache和uncache table的实现细节与Columnar的关系。


Cache Table

  /** Caches the specified table in-memory. */def cacheTable(tableName: String): Unit = {// 得到的是一个logicalPlanval currentTable = catalog.lookupRelation(None, tableName)// 物理执行计划生成之后交给InMemoryColumnarTableScanval asInMemoryRelation =InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)// SparkLogicalPlan接受的Plan必须是已经确定plan好的SparkPlancatalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))}

从上面那段代码可以看到,cache之前,需要先把本次cache的table的物理执行计划生成出来。上述的currentTable其实是一个logicalPlan,来自catalog的lookupRelation。

最后注册表的时候,涉及到的SparkLogicalPlan类是LogicalPlan的实现类(但是本身其实是一个SparkPlan),它接受的是SparkPlan,并且是已经确定Plan好了的逻辑执行计划,目前接受两类:ExistingRdd和InMemoryColumnarTableScan。

在cache这个过程里,InMemoryColumnarTableScan并没有执行,但是生成了以InMemoryColumnarTableScan为物理执行计划的SparkLogicalPlan,并存成table的plan。


Uncache Table

在这一步,除了删除catalog里的table信息之外,还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,并进行了unpersist()操作。cacheColumnBuffers方法具体见Columner内,主要做了把RDD每个partition里的ROW的每个Field存到了ColumnBuilder内。




全文完 :)

这篇关于Spark SQL Columnar模块源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/187314

相关文章

MySQL中比较运算符的具体使用

《MySQL中比较运算符的具体使用》本文介绍了SQL中常用的符号类型和非符号类型运算符,符号类型运算符包括等于(=)、安全等于(=)、不等于(/!=)、大小比较(,=,,=)等,感兴趣的可以了解一下... 目录符号类型运算符1. 等于运算符=2. 安全等于运算符<=>3. 不等于运算符<>或!=4. 小于运

虚拟机Centos7安装MySQL数据库实践

《虚拟机Centos7安装MySQL数据库实践》用户分享在虚拟机安装MySQL的全过程及常见问题解决方案,包括处理GPG密钥、修改密码策略、配置远程访问权限及防火墙设置,最终通过关闭防火墙和停止Net... 目录安装mysql数据库下载wget命令下载MySQL安装包安装MySQL安装MySQL服务安装完成

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL逻辑删除与唯一索引冲突解决方案

《MySQL逻辑删除与唯一索引冲突解决方案》本文探讨MySQL逻辑删除与唯一索引冲突问题,提出四种解决方案:复合索引+时间戳、修改唯一字段、历史表、业务层校验,推荐方案1和方案3,适用于不同场景,感兴... 目录问题背景问题复现解决方案解决方案1.复合唯一索引 + 时间戳删除字段解决方案2:删除后修改唯一字

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

解决1093 - You can‘t specify target table报错问题及原因分析

《解决1093-Youcan‘tspecifytargettable报错问题及原因分析》MySQL1093错误因UPDATE/DELETE语句的FROM子句直接引用目标表或嵌套子查询导致,... 目录报js错原因分析具体原因解决办法方法一:使用临时表方法二:使用JOIN方法三:使用EXISTS示例总结报错原

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

MySQL深分页进行性能优化的常见方法

《MySQL深分页进行性能优化的常见方法》在Web应用中,分页查询是数据库操作中的常见需求,然而,在面对大型数据集时,深分页(deeppagination)却成为了性能优化的一个挑战,在本文中,我们将... 目录引言:深分页,真的只是“翻页慢”那么简单吗?一、背景介绍二、深分页的性能问题三、业务场景分析四、