基于trino实现Sort Merge Join

2023-10-20 10:59
文章标签 实现 join merge sort trino

本文主要是介绍基于trino实现Sort Merge Join,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

当前,社区Trino已对Equi join场景支持了Broadcast Hash Join和Shuffle Hash Join两种Join实现。Broadcast Hash Join适用于大表与小表之间的Join,当Build Side的数据量比较小时,使用Broadcast方式将小表发送给Probe Side,避免了Hash Exchange操作,因而性能较佳。而Shuffle Hash Join适用于大表与大表之间的Join,两个表都需要进行Hash Exchange操作,同时Probe Side需要将Build Side对应的Partition数据全部加载到内存中才能进行计算,因而在表较大时,需要增加Partition数来避免内存OOM问题;但如果存在Partition数据倾斜,解决内存OOM问题就会更加困难。

除了以上两种Join实现,实际上还有一种Sort Merge Join实现方式(目前Trino社区还未实现),该方式是目前主流批处理引擎Spark用于大表Join默认的实现方式,相对于Shuffle Hash Join对内存的要求更低,因而更加稳定可靠。本文将通过详细介绍Trino实现Sort Merge Join的方法 。
目前我已将功能实现代码提交到社区(https://github.com/trinodb/trino/pull/17423),大家可以结合代码理解相关细节。

Join实现调研

Shuffle Hash Join

The classic hash join algorithm for an inner join of two relations 
proceeds as follows:1. First, prepare a hash table using the contents of one relation, 
ideally whichever one is smaller after applying local predicates. 
This relation is called the build side of the join. The hash table entries 
are mappings from the value of the (composite) join attribute to the 
remaining attributes of that row (whichever ones are needed).2. Once the hash table is built, scan the other relation (the probe side). 
For each row of the probe relation, find the relevant rows from 
the build relation by looking in the hash table.

以上是对于hash join算法的描述,该算法主要针对早期关系型数据库。在大数据海量数据处理的背景下,shuffle hash join和broadcast hash join应运而生。

针对shuffle hash join,trino内部的实现可以分为三个阶段:

  • Exchange Phase。两边进行equiJoinClause字段的exchange shuffle,保证相同的key落到同一个partition分区。
  • Build Phase。对build side构建hash table。
  • Lookup Phase。在完成hash table的构建之后,就开始Lookup阶段,probe side的每一条记录都去hash table中查找匹配的记录。

以下SQL案例可以更好地说明trino hash join的实现过程。

trino:tpcds_100_orc> select a.order_key,a.orderdate,b.orderstatus from test_t1 a join test_t2 b on a.order_key = b.order_key and a.orderdate = b.orderdate;order_key | orderdate  | orderstatus
-----------+------------+-------------20 | 2001-09-22 | success15 | 2001-08-20 | success

由于hash join算法具有内存消耗较高的特点,为了减少查询过程中的内存峰值,trino在HashBuilderOperator的实现中引入了spill功能。然而,在Lookup Phase阶段中,由于需要一个完整的hash table来进行匹配查询,所以其内存峰值大小必然大于或等于构建侧(build side)的最大数据分区大小。此外,数据倾斜可能还是会导致hash join算法出现OOM的风险。

Sort Merge Join

image.png

以上是wikipedia对sort merge join算法的inner join场景的伪代码描述。

现在,我们来看一下Spark中不同类型Join的实现及其特性,从sort merge join算法的特性来看,它满足大部分Join场景(除了non-equi Join)。
image.png

Spark中sort merge join的实现主要包括三个阶段:

  • Shuffle 阶段。Join两个数据集根据equiJoinClause字段进行exchange shuffle,确保相同的key位于同一个分区,并且两边的分区数相同。
  • Sort 阶段。在exchange之后,需要增加Sort算子,根据equiJoinClause字段进行排序,并且确保相同的排序策略(例如都选择ASC_NULLS_LAST)。
  • Merge 阶段。使用SortMergeJoin算子进行merge。实际上,可能需要考虑不同Join类型带来的算法差异,同时还需要支持处理单Key数据倾斜导致right_subset过大的情况,以及支持spill。

Trino如何实现SMJ

trino实现sort merge join总体上参考了spark的设计,下面就来讲trino如何实现。

核心流程

  1. 定义新的PlanNode - SortMergeJoinNode。相对于JoinNode,SortMergeJoinNode增加了needSortLeft和needSortRight属性,用于标识左右两边子PlanNode是否需要进行排序,以实现sort消除优化。
  2. 增加优化规则 - TransformHashJoinToSortMergeJoin。该规则能够在满足一定条件的情况下将JoinNode转换成SortMergeJoinNode。
  3. worker节点使用LocalExecutionPlanner::Visitor::visitSortMergeJoin方法来解析plan fragment中的SortMergeJoinNode,将其转换成两个左右side的SortMergeJoinOperator的Operator Pipeline。
  4. worker会根据SortMergeJoinNode的needSortLeft和needSortRight属性来决定是否对左右的child进行排序。对于需要排序的,会各自增加前置OrderByOperator,以确保排序顺序正确。

最终得到的执行算子Pipeline结构如下图,可以与前文所述的Hash Join生成的算子Pipeline进行对比,以便更好地了解其差异之处。:

SortMergeJoinNode定义如下

public SortMergeJoinNode(@JsonProperty("id") PlanNodeId id,@JsonProperty("type") JoinNode.Type type,@JsonProperty("left") PlanNode left,@JsonProperty("right") PlanNode right,@JsonProperty("criteria") List<JoinNode.EquiJoinClause> criteria,@JsonProperty("leftOutputSymbols") List<Symbol> leftOutputSymbols,@JsonProperty("rightOutputSymbols") List<Symbol> rightOutputSymbols,@JsonProperty("filter") Optional<Expression> filter,  //暂不支持@JsonProperty("distributionType") Optional<JoinNode.DistributionType> distributionType,@JsonProperty("dynamicFilters") Map<DynamicFilterId, Symbol> dynamicFilters, //暂不支持@JsonProperty("needSortLeft") Boolean needSortLeft,@JsonProperty("needSortRight") Boolean needSortRight)

TransformHashJoinToSortMergeJoin优化规则
触发条件:

  1. 存在EquiJoinClause的hash join。
  2. 左右两边DistributionType为PARTITIONED,即存在exchange。

执行核心逻辑:

  1. 以JoinNode为基础,替换成SortMergeJoinNode。
  2. 如果JoinNode带有filter,需新增前置FilterNode以解决此问题。
@Overridepublic Result apply(JoinNode node, Captures captures, Context context){if (node.getFilter().isPresent()) {PlanNode smj = new SortMergeJoinNode(context.getIdAllocator().getNextId(), node.getType(),node.getLeft(), node.getRight(), node.getCriteria(),node.getLeft().getOutputSymbols(), node.getRight().getOutputSymbols(),Optional.empty(), node.getDistributionType(), node.getDynamicFilters(), true, true);PlanNode filter = new FilterNode(context.getIdAllocator().getNextId(), smj, node.getFilter().get());List<Symbol> output = new ArrayList<>();output.addAll(node.getLeftOutputSymbols());output.addAll(node.getRightOutputSymbols());PlanNode project = new ProjectNode(context.getIdAllocator().getNextId(), filter,Assignments.identity(output));return Result.ofPlanNode(project);}else {PlanNode smj = new SortMergeJoinNode(context.getIdAllocator().getNextId(), node.getType(),node.getLeft(), node.getRight(), node.getCriteria(),node.getLeftOutputSymbols(), node.getRightOutputSymbols(),node.getFilter(), node.getDistributionType(), node.getDynamicFilters(), true, true);return Result.ofPlanNode(smj);}
}

数据倾斜如何处理

首先,我们来了解一下Sort Merge Join Scaner匹配join key的流程:

  1. 当左侧和右侧匹配上某一个join key后,右侧会将join key相同的所有记录加载到SpillableMatchedPages集合中。
  2. 左侧的当前记录与SpillableMatchedPages中的记录产生join操作,生成join结果。
  3. 左侧的位点移动到下一条记录,如果其join key与之前相同,则SpillableMatchedPages会被重复利用进行join操作。
  4. 因此SpillableMatchedPages主要用于缓存右侧join key相等的记录,便于可以重复消耗。

然而,由于数据倾斜的存在,右侧可能会有过多的重复join key,导致SpillableMatchedPages内存消耗过大,引起OOM问题。我们可以通过增加spill模式来解决这个问题,通过控制numRowsInMemoryBufferThreshold阈值来控制是否触发spill模式。当SpillableMatchedPages记录数超过numRowsInMemoryBufferThreshold时,就会切换到spill模式。这样既可以避免OOM问题,又能保证高效的join操作。

    public void insertRow(Row row){...positionCount++;if (positionCount >= numRowsInMemoryBufferThreshold && spiller == null) {spillToDisk();}}

通过sort消除提升性能

我们知道,在sort merge join中,左右两表的数据需要进行排序才能进行连接。但是,如果左右两表的数据本身已经有序,就可以避免不必要的排序,从而提升性能。此外,在某些join类型的sort merge join之后,数据已具有相关的order排序属性,因此可以利用这个属性来分析是否可以避免不必要的排序。

该优化主要应用于多个表存在多个sort merge join的情况,未来还可以扩展支持具有order属性的bucket表。

对于不同的Join类型,其最终数据呈现的order排序属性也各不相同。

Join类型order排序属性
inner join即按左边join key排序,又按右边join key排序
left join按左边join key排序
right join按右边join key排序
full join无排序

下面sort消除的各个案例:

-- sort消除生效。 第二个join的lefe side不需要sort
select * from  t1 join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1
select * from  t1 left join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1-- sort消除不生效。 第二个join的lefe side是按t2的c1排序,无法匹配order需求。
select * from  t1 right join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1-- sort消除不生效。 full join无法保证order顺序。
select * from  t1 full join t2 b on t1.c1 = t2.c1 join t3 c on t1.c1 = t3.c1-- sort消除不生效。 第二个join的join key与第一个join不一致。
select * from  t1 join t2 b on t1.c1 = t2.c1 join t3 c on t1.c2 = t3.c2    

下面sort消除的优化规则的核心代码:

@Override
public PlanWithProperties visitSortMergeJoin(SortMergeJoinNode node, Void context)
{PlanWithProperties left = node.getLeft().accept(this, context);PlanWithProperties right = node.getRight().accept(this, context);OrderingScheme leftOrderingRequire = node.getLeftOrdering();OrderingScheme rightOrderingRequire = node.getRightOrdering();List<OrderingScheme> ordering = new ArrayList<>();switch (node.getType()) {case INNER:ordering.add(leftOrderingRequire);ordering.add(rightOrderingRequire);break;case LEFT:ordering.add(leftOrderingRequire);break;case RIGHT:ordering.add(rightOrderingRequire);break;case FULL:break;}return new PlanWithProperties(node.replaceChildren(ImmutableList.of(left.getNode(), right.getNode()),!left.isOrderingSatisfiedBy(leftOrderingRequire), !right.isOrderingSatisfiedBy(rightOrderingRequire)),ordering);
}

测试

接下来我们做一下功能测试和性能测试。

测试环境

机型虚拟机
CPU8Core
内存32G
网络5Gbps
硬盘本地盘1 × 2TB, 单盘(磁盘阵列)最大吞吐1500 MBps
机器数量3台

功能测试

基本功能测试

测试场景: 在小内存限制下,同时应用了shuffle partition的较小设置,测试两种Join运行的成功率。
测试用例:

  • 1000G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。
  • query40
  • trino worker实例:1core4G,数量为1个。
task.concurrency=1
fault-tolerant-execution-partition-count=1 // exchange shuffle分区数设置为1
# 开启spill 
spill-enabled=true
spiller-spill-path=/mnt/data1/spill-path
selectw_state,i_item_id,sum(case when (cast(d_date as date) < cast ('1998-04-08' as date))then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as sales_before,sum(case when (cast(d_date as date) >= cast ('1998-04-08' as date))then cs_sales_price - coalesce(cr_refunded_cash,0) else 0 end) as sales_afterfromcatalog_sales left outer join catalog_returns on(cs_order_number = cr_order_numberand cs_item_sk = cr_item_sk),warehouse,item,date_dimwherei_current_price between 0.99 and 1.49and i_item_sk          = cs_item_skand cs_warehouse_sk    = w_warehouse_skand cs_sold_date_sk    = d_date_skand d_date between (cast ('1998-04-08' as date) - INTERVAL '30' day)and (cast ('1998-04-08' as date) + INTERVAL '30' day)group byw_state,i_item_idorder by w_state,i_item_id
limit 100;

测试结果:

  • 经过开启sort-merge-join配置后,q40查询任务成功执行。
  • 未开启sort-merge-join配置时,Hash join所需内存不足,导致查询任务失败。
场景结果
sort merge joinsuccess (耗时54.57m)
shuffle hash joinfailed

对比分析:
经详细对比分析,我们可以发现,针对catalog_sales和catalog_returns表的join过程,在开启sort merge join情况下,stage 2成功处理了约15亿行数据,且peak mem仅需898MB。然而,若采用hash join策略,当stage 2处理5000千万行数据时,peak mem高达2GB,导致任务由于内存不足而一直阻塞。从性能和内存消耗的角度考虑,采用sort merge join方法显然更为优越。
image.png
image.png

数据倾斜场景测试

测试场景: 在内存受限的条件下,针对部分partition可能存在数据倾斜的情形,验证两种Join的运行是否均能成功。
测试用例:

  • 1000G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。
  • trino worker实例:1core4G,数量为1个。
set session fault_tolerant_execution_partition_count=10;
use iceberg.tpcds_100_iceberg;
create table store_sales_new_full
select 
coalesce(inc_t.ss_sold_date_sk,full_t.ss_sold_date_sk) as ss_sold_date_sk,
coalesce(inc_t.ss_sold_time_sk,full_t.ss_sold_time_sk) as ss_sold_time_sk,
coalesce(inc_t.ss_item_sk,full_t.ss_item_sk) as ss_item_sk,
coalesce(inc_t.ss_customer_sk,full_t.ss_customer_sk) as ss_customer_sk,
coalesce(inc_t.ss_cdemo_sk,full_t.ss_cdemo_sk) as ss_cdemo_sk,
coalesce(inc_t.ss_hdemo_sk,full_t.ss_hdemo_sk) as ss_hdemo_sk,
coalesce(inc_t.ss_addr_sk,full_t.ss_addr_sk) as ss_addr_sk,
coalesce(inc_t.ss_store_sk,full_t.ss_store_sk) as ss_store_sk,
coalesce(inc_t.ss_promo_sk,full_t.ss_promo_sk) as ss_promo_sk,
coalesce(inc_t.ss_ticket_number,full_t.ss_ticket_number) as ss_ticket_number,
coalesce(inc_t.ss_quantity,full_t.ss_quantity) as ss_quantity,
coalesce(inc_t.ss_wholesale_cost,full_t.ss_wholesale_cost) as ss_wholesale_cost,
coalesce(inc_t.ss_list_price,full_t.ss_list_price) as ss_list_price,
coalesce(inc_t.ss_sales_price,full_t.ss_sales_price) as ss_sales_price,
coalesce(inc_t.ss_ext_discount_amt,full_t.ss_ext_discount_amt) as ss_ext_discount_amt,
coalesce(inc_t.ss_ext_sales_price,full_t.ss_ext_sales_price) as ss_ext_sales_price,
coalesce(inc_t.ss_ext_wholesale_cost,full_t.ss_ext_wholesale_cost) as ss_ext_wholesale_cost,
coalesce(inc_t.ss_ext_list_price,full_t.ss_ext_list_price) as ss_ext_list_price,
coalesce(inc_t.ss_ext_tax,full_t.ss_ext_tax) as ss_ext_tax,
coalesce(inc_t.ss_coupon_amt,full_t.ss_coupon_amt) as ss_coupon_amt,
coalesce(inc_t.ss_net_paid,full_t.ss_net_paid) as ss_net_paid,
coalesce(inc_t.ss_net_paid_inc_tax,full_t.ss_net_paid_inc_tax) as ss_net_paid_inc_tax,
coalesce(inc_t.ss_net_profit,full_t.ss_net_profit) as ss_net_profit
from
(select * from store_sales where ss_store_sk is not null ) full_t
full join 
(select * from store_sales where ss_store_sk is null) inc_t
on full_t.ss_store_sk = inc_t.ss_store_sk ;

测试结果:

  • 在开启sort-merge-join配置情况下,任务执行成功。
  • 在未开启sort-merge-join,task 2.0.0存在比较大的数据倾斜而导致任务失败。
场景结果
sort merge joinsuccess (耗时31.57m)
shuffle hash joinfailed

image.png
image.png

sort消除优化场景测试

测试场景: 在多张数据表进行join的情况下,若触发sort merge join时,是否可能进行sort消除。
测试用例:

  • 1000G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。
  • query10

测试结果:

  • 最后一个join的左右side都触发了sort消除。(sortLeft=false, sortRight=false表示两边都没有执行排序)

image.png

性能测试

测试场景: 采用了trino批处理模式,并使用了100G的tpcds数据集进行了性能测试,旨在比较不同的join算法对性能的影响
测试用例:

  • 100G数据集tpcds,iceberg表。
  • 批处理模式。retry-policy=TASK。

测试结果:

  • 在开启sort-merge-join后,性能出现10%左右的退化。(注:并非所有query都会触发hash join,因此有些query会全部使用broadcast join)
场景总耗时(单位:秒)
sort merge join3370
shuffle hash join3073

总结

本文主要介绍了Trino如何实现Sort Merge Join算法,并与传统的Hash Join算法进行了对比。通过分析两种算法的特性,我们发现Sort Merge Join相对于Hash Join具有更低的内存要求和更高的稳定性,在大数据场景下具有更好的表现。因此,在实际的应用中,可以根据实际的业务场景来选择合适的Join算法。同时,我们通过功能测试和性能测试验证了Trino的Sort Merge Join算法在实际应用中的表现非常优秀,能够满足大数据批处理场景下高效稳定的处理需求。

这篇关于基于trino实现Sort Merge Join的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/246662

相关文章

Redis客户端连接机制的实现方案

《Redis客户端连接机制的实现方案》本文主要介绍了Redis客户端连接机制的实现方案,包括事件驱动模型、非阻塞I/O处理、连接池应用及配置优化,具有一定的参考价值,感兴趣的可以了解一下... 目录1. Redis连接模型概述2. 连接建立过程详解2.1 连php接初始化流程2.2 关键配置参数3. 最大连

Python实现网格交易策略的过程

《Python实现网格交易策略的过程》本文讲解Python网格交易策略,利用ccxt获取加密货币数据及backtrader回测,通过设定网格节点,低买高卖获利,适合震荡行情,下面跟我一起看看我们的第一... 网格交易是一种经典的量化交易策略,其核心思想是在价格上下预设多个“网格”,当价格触发特定网格时执行买

python设置环境变量路径实现过程

《python设置环境变量路径实现过程》本文介绍设置Python路径的多种方法:临时设置(Windows用`set`,Linux/macOS用`export`)、永久设置(系统属性或shell配置文件... 目录设置python路径的方法临时设置环境变量(适用于当前会话)永久设置环境变量(Windows系统

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

PyCharm中配置PyQt的实现步骤

《PyCharm中配置PyQt的实现步骤》PyCharm是JetBrains推出的一款强大的PythonIDE,结合PyQt可以进行pythion高效开发桌面GUI应用程序,本文就来介绍一下PyCha... 目录1. 安装China编程PyQt1.PyQt 核心组件2. 基础 PyQt 应用程序结构3. 使用 Q

Java Thread中join方法使用举例详解

《JavaThread中join方法使用举例详解》JavaThread中join()方法主要是让调用改方法的thread完成run方法里面的东西后,在执行join()方法后面的代码,这篇文章主要介绍... 目录前言1.join()方法的定义和作用2.join()方法的三个重载版本3.join()方法的工作原

Python实现批量提取BLF文件时间戳

《Python实现批量提取BLF文件时间戳》BLF(BinaryLoggingFormat)作为Vector公司推出的CAN总线数据记录格式,被广泛用于存储车辆通信数据,本文将使用Python轻松提取... 目录一、为什么需要批量处理 BLF 文件二、核心代码解析:从文件遍历到数据导出1. 环境准备与依赖库

linux下shell脚本启动jar包实现过程

《linux下shell脚本启动jar包实现过程》确保APP_NAME和LOG_FILE位于目录内,首次启动前需手动创建log文件夹,否则报错,此为个人经验,供参考,欢迎支持脚本之家... 目录linux下shell脚本启动jar包样例1样例2总结linux下shell脚本启动jar包样例1#!/bin

go动态限制并发数量的实现示例

《go动态限制并发数量的实现示例》本文主要介绍了Go并发控制方法,通过带缓冲通道和第三方库实现并发数量限制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录带有缓冲大小的通道使用第三方库其他控制并发的方法因为go从语言层面支持并发,所以面试百分百会问到