基于trino实现Sort Merge Join

2023-10-20 10:59
文章标签 实现 join merge sort trino

本文主要是介绍基于trino实现Sort Merge Join,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

当前,社区Trino已对Equi join场景支持了Broadcast Hash Join和Shuffle Hash Join两种Join实现。Broadcast Hash Join适用于大表与小表之间的Join,当Build Side的数据量比较小时,使用Broadcast方式将小表发送给Probe Side,避免了Hash Exchange操作,因而性能较佳。而Shuffle Hash Join适用于大表与大表之间的Join,两个表都需要进行Hash Exchange操作,同时Probe Side需要将Build Side对应的Partition数据全部加载到内存中才能进行计算,因而在表较大时,需要增加Partition数来避免内存OOM问题;但如果存在Partition数据倾斜,解决内存OOM问题就会更加困难。

除了以上两种Join实现,实际上还有一种Sort Merge Join实现方式(目前Trino社区还未实现),该方式是目前主流批处理引擎Spark用于大表Join默认的实现方式,相对于Shuffle Hash Join对内存的要求更低,因而更加稳定可靠。本文将通过详细介绍Trino实现Sort Merge Join的方法 。
目前我已将功能实现代码提交到社区(https://github.com/trinodb/trino/pull/17423),大家可以结合代码理解相关细节。

Join实现调研

Shuffle Hash Join

The classic hash join algorithm for an inner join of two relations 
proceeds as follows:1. First, prepare a hash table using the contents of one relation, 
ideally whichever one is smaller after applying local predicates. 
This relation is called the build side of the join. The hash table entries 
are mappings from the value of the (composite) join attribute to the 
remaining attributes of that row (whichever ones are needed).2. Once the hash table is built, scan the other relation (the probe side). 
For each row of the probe relation, find the relevant rows from 
the build relation by looking in the hash table.

以上是对于hash join算法的描述,该算法主要针对早期关系型数据库。在大数据海量数据处理的背景下,shuffle hash join和broadcast hash join应运而生。

针对shuffle hash join,trino内部的实现可以分为三个阶段:

  • Exchange Phase。两边进行equiJoinClause字段的exchange shuffle,保证相同的key落到同一个partition分区。
  • Build Phase。对build side构建hash table。
  • Lookup Phase。在完成hash table的构建之后,就开始Lookup阶段,probe side的每一条记录都去hash table中查找匹配的记录。

以下SQL案例可以更好地说明trino hash join的实现过程。

trino:tpcds_100_orc> select a.order_key,a.orderdate,b.orderstatus from test_t1 a join test_t2 b on a.order_key = b.order_key and a.orderdate = b.orderdate;order_key | orderdate  | orderstatus
-----------+------------+-------------20 | 2001-09-22 | success15 | 2001-08-20 | success

由于hash join算法具有内存消耗较高的特点,为了减少查询过程中的内存峰值,trino在HashBuilderOperator的实现中引入了spill功能。然而,在Lookup Phase阶段中,由于需要一个完整的hash table来进行匹配查询,所以其内存峰值大小必然大于或等于构建侧(build side)的最大数据分区大小。此外,数据倾斜可能还是会导致hash join算法出现OOM的风险。

Sort Merge Join

image.png

以上是wikipedia对sort merge join算法的inner join场景的伪代码描述。

现在,我们来看一下Spark中不同类型Join的实现及其特性,从sort merge join算法的特性来看,它满足大部分Join场景(除了non-equi Join)。
image.png

Spark中sort merge join的实现主要包括三个阶段:

  • Shuffle 阶段。Join两个数据集根据equiJoinClause字段进行exchange shuffle,确保相同的key位于同一个分区,并且两边的分区数相同。
  • Sort 阶段。在exchange之后,需要增加Sort算子,根据equiJoinClause字段进行排序,并且确保相同的排序策略(例如都选择ASC_NULLS_LAST)。
  • Merge 阶段。使用SortMergeJoin算子进行merge。实际上,可能需要考虑不同Join类型带来的算法差异,同时还需要支持处理单Key数据倾斜导致right_subset过大的情况,以及支持spill。

Trino如何实现SMJ

trino实现sort merge join总体上参考了spark的设计,下面就来讲trino如何实现。

核心流程

  1. 定义新的PlanNode - SortMergeJoinNode。相对于JoinNode,SortMergeJoinNode增加了needSortLeft和needSortRight属性,用于标识左右两边子PlanNode是否需要进行排序,以实现sort消除优化。
  2. 增加优化规则 - TransformHashJoinToSortMergeJoin。该规则能够在满足一定条件的情况下将JoinNode转换成SortMergeJoinNode。
  3. worker节点使用LocalExecutionPlanner::Visitor::visitSortMergeJoin方法来解析plan fragment中的SortMergeJoinNode,将其转换成两个左右side的SortMergeJoinOperator的Operator Pipeline。
  4. worker会根据SortMergeJoinNode的needSortLeft和needSortRight属性来决定是否对左右的child进行排序。对于需要排序的,会各自增加前置OrderByOperator,以确保排序顺序正确。

最终得到的执行算子Pipeline结构如下图,可以与前文所述的Hash Join生成的算子Pipeline进行对比,以便更好地了解其差异之处。:

SortMergeJoinNode定义如下

public SortMergeJoinNode(@JsonProperty("id") PlanNodeId id,@JsonProperty("type") JoinNode.Type type,@JsonProperty("left") PlanNode left,@JsonProperty("right") PlanNode right,@JsonProperty("criteria") List<JoinNode.EquiJoinClause> criteria,@JsonProperty("leftOutputSymbols") List<Symbol> leftOutputSymbols,@JsonProperty("rightOutputSymbols") List<Symbol> rightOutputSymbols,@JsonProperty("filter") Optional<Expression> filter,  //暂不支持@JsonProperty("distributionType") Optional<JoinNode.DistributionType> distributionType,@JsonProperty("dynamicFilters") Map<DynamicFilterId, Symbol> dynamicFilters, //暂不支持@JsonProperty("needSortLeft") Boolean needSortLeft,@JsonProperty("needSortRight") Boolean needSortRight)

TransformHashJoinToSortMergeJoin优化规则
触发条件:

  1. 存在EquiJoinClause的hash join。
  2. 左右两边DistributionType为PARTITIONED,即存在exchange。

执行核心逻辑:

  1. 以JoinNode为基础,替换成SortMergeJoinNode。
  2. 如果JoinNode带有filter,需新增前置FilterNode以解决此问题。
@Overridepublic Result apply(JoinNode node, Captures captures, Context context){if (node.getFilter().isPresent()) {PlanNode smj = new SortMergeJoinNode(context.getIdAllocator().getNextId(), node.getType(),node.getLeft(), node.getRight(), node.getCriteria(),node.getLeft().getOutputSymbols(), node.getRight().getOutputSymbols(),Optional.empty(), node.getDistributionType(), node.getDynamicFilters(), true, true);PlanNode filter = new FilterNode(context.getIdAllocator().getNextId(), smj, node.getFilter().get());List<Symbol> output = new ArrayList<>();output.addAll(node.getLeftOutputSymbols());output.addAll(node.getRightOutputSymbols());PlanNode project = new ProjectNode(context.getIdAllocator().getNextId(), filter,Assignments.identity(output));return Result.ofPlanNode(project);}else {PlanNode smj = new SortMergeJoinNode(context.getIdAllocator().getNextId(), node.getType(),node.getLeft(), node.getRight(), node.getCriteria(),node.getLeftOutputSymbols(), node.getRightOutputSymbols(),node.getFilter(), node.getDistributionType(), node.getDynamicFilters(), true, true);return Result.ofPlanNode(smj);}
}

数据倾斜如何处理

首先,我们来了解一下Sort Merge Join Scaner匹配join key的流程:

  1. 当左侧和右侧匹配上某一个join key后,右侧会将join key相同的所有记录加载到SpillableMatchedPages集合中。
  2. 左侧的当前记录与SpillableMatchedPages中的记录产生join操作,生成join结果。
  3. 左侧的位点移动到下一条记录,如果其join key与之前相同,则SpillableMatchedPages会被重复利用进行join操作。
  4. 因此SpillableMatchedPages主要用于缓存右侧join key相等的记录,便于可以重复消耗。

然而,由于数据倾斜的存在,右侧可能会有过多的重复join key,导致SpillableMatchedPages内存消耗过大,引起OOM问题。我们可以通过增加spill模式来解决这个问题,通过控制numRowsInMemoryBufferThreshold阈值来控制是否触发spill模式。当SpillableMatchedPages记录数超过numRowsInMemoryBufferThreshold时,就会切换到spill模式。这样既可以避免OOM问题,又能保证高效的join操作。

    public void insertRow(Row row){...positionCount++;if (positionCount >= numRowsInMemoryBufferThreshold && spiller == null) {spillToDisk();}}

通过sort消除提升性能

我们知道,在sort merge join中,左右两表的数据需要进行排序才能进行连接。但是,如果左右两表的数据本身已经有序,就可以避免不必要的排序,从而提升性能。此外,在某些join类型的sort merge join之后,数据已具有相关的order排序属性,因此可以利用这个属性来分析是否可以避免不必要的排序。

该优化主要应用于多个表存在多个sort merge join的情况,未来还可以扩展支持具有order属性的bucket表。

对于不同的Join类型,其最终数据呈现的order排序属性也各不相同。

Join类型order排序属性
inner join即按左边join key排序,又按右边join key排序
left join按左边join key排序
right join按右边join key排序
full join无排序

下面sort消除的各个案例:

-- sort消除生效。 第二个join的lefe side不需要sort
select * from  t1 join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1
select * from  t1 left join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1-- sort消除不生效。 第二个join的lefe side是按t2的c1排序,无法匹配order需求。
select * from  t1 right join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1-- sort消除不生效。 full join无法保证order顺序。
select * from  t1 full join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1-- sort消除不生效。 第二个join的join key与第一个join不一致。
select * from  t1 join t2 b on t1.c1 = t2.c1 join t3 c on t1.c2 = t3.c2    

下面sort消除的优化规则的核心代码:

@Override
public PlanWithProperties visitSortMergeJoin(SortMergeJoinNode node, Void context)
{PlanWithProperties left = node.getLeft().accept(this, context);PlanWithProperties right = node.getRight().accept(this, context);OrderingScheme leftOrderingRequire = node.getLeftOrdering();OrderingScheme rightOrderingRequire = node.getRightOrdering();List<OrderingScheme> ordering = new ArrayList<>();switch (node.getType()) {case INNER:ordering.add(leftOrderingRequire);ordering.add(rightOrderingRequire);break;case LEFT:ordering.add(leftOrderingRequire);break;case RIGHT:ordering.add(rightOrderingRequire);break;case FULL:break;}return new PlanWithProperties(node.replaceChildren(ImmutableList.of(left.getNode(), right.getNode()),!left.isOrderingSatisfiedBy(leftOrderingRequire), !right.isOrderingSatisfiedBy(rightOrderingRequire)),ordering);
}

测试

接下来我们做一下功能测试和性能测试。

测试环境

机型虚拟机
CPU8Core
内存32G
网络5Gbps
硬盘本地盘1 × 2TB, 单盘(磁盘阵列)最大吞吐1500 MBps
机器数量3台

功能测试

基本功能测试

测试场景: 在小内存限制下,同时应用了shuffle partition的较小设置,测试两种Join运行的成功率。
测试用例:

  • 1000G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。
  • query40
  • trino worker实例:1core4G,数量为1个。
task.concurrency=1
fault-tolerant-execution-partition-count=1 // exchange shuffle分区数设置为1
# 开启spill 
spill-enabled=true
spiller-spill-path=/mnt/data1/spill-path
selectw_state,i_item_id,sum(case when (cast(d_date as date) < cast ('1998-04-08' as date))then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as sales_before,sum(case when (cast(d_date as date) >= cast ('1998-04-08' as date))then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as sales_afterfromcatalog_sales left outer join catalog_returns on(cs_order_number = cr_order_numberand cs_item_sk = cr_item_sk),warehouse,item,date_dimwherei_current_price between 0.99 and 1.49and i_item_sk          = cs_item_skand cs_warehouse_sk    = w_warehouse_skand cs_sold_date_sk    = d_date_skand d_date between (cast ('1998-04-08' as date) - INTERVAL '30' day)and (cast ('1998-04-08' as date) + INTERVAL '30' day)group byw_state,i_item_idorder by w_state,i_item_id
limit 100;

测试结果:

  • 经过开启sort-merge-join配置后,q40查询任务成功执行。
  • 未开启sort-merge-join配置时,Hash join所需内存不足,导致查询任务失败。
场景结果
sort merge joinsuccess (耗时54.57m)
shuffle hash joinfailed

对比分析:
经详细对比分析,我们可以发现,针对catalog_sales和catalog_returns表的join过程,在开启sort merge join情况下,stage 2成功处理了约15亿行数据,且peak mem仅需898MB。然而,若采用hash join策略,当stage 2处理5000千万行数据时,peak mem高达2GB,导致任务由于内存不足而一直阻塞。从性能和内存消耗的角度考虑,采用sort merge join方法显然更为优越。
image.png
image.png

数据倾斜场景测试

测试场景: 在内存受限的条件下,针对部分partition可能存在数据倾斜的情形,验证两种Join的运行是否均能成功。
测试用例:

  • 1000G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。
  • trino worker实例:1core4G,数量为1个。
set session fault_tolerant_execution_partition_count=10;
use iceberg.tpcds_100_iceberg;
create table store_sales_new_full
select 
coalesce(inc_t.ss_sold_date_sk,full_t.ss_sold_date_sk) as ss_sold_date_sk,
coalesce(inc_t.ss_sold_time_sk,full_t.ss_sold_time_sk) as ss_sold_time_sk,
coalesce(inc_t.ss_item_sk,full_t.ss_item_sk) as ss_item_sk,
coalesce(inc_t.ss_customer_sk,full_t.ss_customer_sk) as ss_customer_sk,
coalesce(inc_t.ss_cdemo_sk,full_t.ss_cdemo_sk) as ss_cdemo_sk,
coalesce(inc_t.ss_hdemo_sk,full_t.ss_hdemo_sk) as ss_hdemo_sk,
coalesce(inc_t.ss_addr_sk,full_t.ss_addr_sk) as ss_addr_sk,
coalesce(inc_t.ss_store_sk,full_t.ss_store_sk) as ss_store_sk,
coalesce(inc_t.ss_promo_sk,full_t.ss_promo_sk) as ss_promo_sk,
coalesce(inc_t.ss_ticket_number,full_t.ss_ticket_number) as ss_ticket_number,
coalesce(inc_t.ss_quantity,full_t.ss_quantity) as ss_quantity,
coalesce(inc_t.ss_wholesale_cost,full_t.ss_wholesale_cost) as ss_wholesale_cost,
coalesce(inc_t.ss_list_price,full_t.ss_list_price) as ss_list_price,
coalesce(inc_t.ss_sales_price,full_t.ss_sales_price) as ss_sales_price,
coalesce(inc_t.ss_ext_discount_amt,full_t.ss_ext_discount_amt) as ss_ext_discount_amt,
coalesce(inc_t.ss_ext_sales_price,full_t.ss_ext_sales_price) as ss_ext_sales_price,
coalesce(inc_t.ss_ext_wholesale_cost,full_t.ss_ext_wholesale_cost) as ss_ext_wholesale_cost,
coalesce(inc_t.ss_ext_list_price,full_t.ss_ext_list_price) as ss_ext_list_price,
coalesce(inc_t.ss_ext_tax,full_t.ss_ext_tax) as ss_ext_tax,
coalesce(inc_t.ss_coupon_amt,full_t.ss_coupon_amt) as ss_coupon_amt,
coalesce(inc_t.ss_net_paid,full_t.ss_net_paid) as ss_net_paid,
coalesce(inc_t.ss_net_paid_inc_tax,full_t.ss_net_paid_inc_tax) as ss_net_paid_inc_tax,
coalesce(inc_t.ss_net_profit,full_t.ss_net_profit) as ss_net_profit
from
(select * from store_sales where ss_store_sk is not null ) full_t
full join 
(select * from store_sales where ss_store_sk is null) inc_t
on full_t.ss_store_sk = inc_t.ss_store_sk ;

测试结果:

  • 在开启sort-merge-join配置情况下,任务执行成功。
  • 在未开启sort-merge-join,task 2.0.0存在比较大的数据倾斜而导致任务失败。
场景结果
sort merge joinsuccess (耗时31.57m)
shuffle hash joinfailed

image.png
image.png

sort消除优化场景测试

测试场景: 在多张数据表进行join的情况下,若触发sort merge join时,是否可能进行sort消除。
测试用例:

  • 1000G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。
  • query10

测试结果:

  • 最后一个join的左右side都触发了sort消除。(sortLeft=false, sortRight=false表示两边都没有执行排序)

image.png

性能测试

测试场景: 采用了trino批处理模式,并使用了100G的tpcds数据集进行了性能测试,旨在比较不同的join算法对性能的影响
测试用例:

  • 100G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。

测试结果:

  • 在开启sort-merge-join后,性能出现10%左右的退化。(注:并非所有query都会触发hash join,因此有些query会全部使用broadcast join)
场景总耗时(单位:秒)
sort merge join3370
shuffle hash join3073

总结

本文主要介绍了Trino如何实现Sort Merge Join算法,并与传统的Hash Join算法进行了对比。通过分析两种算法的特性,我们发现Sort Merge Join相对于Hash Join具有更低的内存要求和更高的稳定性,在大数据场景下具有更好的表现。因此,在实际的应用中,可以根据实际的业务场景来选择合适的Join算法。同时,我们通过功能测试和性能测试验证了Trino的Sort Merge Join算法在实际应用中的表现非常优秀,能够满足大数据批处理场景下高效稳定的处理需求。

这篇关于基于trino实现Sort Merge Join的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/246662

相关文章

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

Nexus安装和启动的实现教程

《Nexus安装和启动的实现教程》:本文主要介绍Nexus安装和启动的实现教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、Nexus下载二、Nexus安装和启动三、关闭Nexus总结一、Nexus下载官方下载链接:DownloadWindows系统根

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

MySQL 横向衍生表(Lateral Derived Tables)的实现

《MySQL横向衍生表(LateralDerivedTables)的实现》横向衍生表适用于在需要通过子查询获取中间结果集的场景,相对于普通衍生表,横向衍生表可以引用在其之前出现过的表名,本文就来... 目录一、横向衍生表用法示例1.1 用法示例1.2 使用建议前面我们介绍过mysql中的衍生表(From子句

Mybatis的分页实现方式

《Mybatis的分页实现方式》MyBatis的分页实现方式主要有以下几种,每种方式适用于不同的场景,且在性能、灵活性和代码侵入性上有所差异,对Mybatis的分页实现方式感兴趣的朋友一起看看吧... 目录​1. 原生 SQL 分页(物理分页)​​2. RowBounds 分页(逻辑分页)​​3. Page

Mybatis Plus Join使用方法示例详解

《MybatisPlusJoin使用方法示例详解》:本文主要介绍MybatisPlusJoin使用方法示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,... 目录1、pom文件2、yaml配置文件3、分页插件4、示例代码:5、测试代码6、和PageHelper结合6