Hive On Tez小文件合并的技术调研

2024-03-18 20:40
文章标签 技术 合并 hive 调研 tez

本文主要是介绍Hive On Tez小文件合并的技术调研,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hive On Tez小文件合并的技术调研

背景

在升级到CDP7.1.5之后,默认的运算引擎变成了Tez,之前这篇有讲过:

https://lizhiyong.blog.csdn.net/article/details/126688391

具体参考Cloudera的官方文档:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html

并且只能用Tez,调度、血缘等重度依赖租来的阿里云DataPhin,那么最常用的离线跑批任务还是要使用HQL【也就是Hive On Tez】。HQL上手门槛极低,之前搞那种Oracle数据库开发的也可以一周内升任Sql Boy岗位,这是其一;PySpark任务或者用Java/Scala打Jar包的Spark任务由于平台的缺陷无法记录血缘,做溯源及下游影响分析时也是极不方便,只能怼人天靠人工,效率低下。HQL虽然低端并且性能不高,但是一时半会儿还不能被取缔。

原先在CDH5.16中,HQL任务是Hive On MapReduce。写在HQL最上方用于合并小文件的参数到了CDP就不能使用了:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

所以急需合并小文件,最好还是这种方式,就可以不用做太大的改动。

方式

直接set启用

既然之前的MapReduce任务有小文件合并的功能,那么找一找,还真就找到了更多的参数,在HiveConf这个Java类里。

在Apache Hive3.1.2的Java源码找到:

package org.apache.hadoop.hive.conf;public class HiveConf extends Configuration {
HIVEMERGEMAPFILES("hive.merge.mapfiles", true,"Merge small files at the end of a map-only job"),HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false,"Merge small files at the end of a map-reduce job"),HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"),HIVEMERGESPARKFILES("hive.merge.sparkfiles", false, "Merge small files at the end of a Spark DAG Transformation"),HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000),"Size of merged files at the end of the job"),HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000),"When the average output file size of a job is less than this number, Hive will start an additional \n" +"map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \n" +"if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,"When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled\n" +"while writing a table with ORC file format, enabling this config will do stripe-level\n" +"fast merge for small ORC files. Note that enabling this config will not honor the\n" +"padding tolerance config (hive.exec.orc.block.padding.tolerance)."),MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,"Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +"table there is at most 1 matching row in the source table per SQL Specification.")}

参考:https://lizhiyong.blog.csdn.net/article/details/126688391

这篇有扒源码,看到了Hive最终会把AST解析成Tez或者Spark的DAG,那么此处的hive.merge.tezfileshive.merge.sparkfiles按照注释的字面意思,也就很容易看明白,是要在DAG的结尾处来一次merge。这当然就是最简单的方式。由于某些配置默认是false也就是停用状态,所以必须手动set启用才能生效。

所以只需要在HQL任务头上+这些参数即可:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.tezfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

使用Apache Hive自己搭建了Hive On Spark的难兄难弟们就可以照猫画虎:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.sparkfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

不过Hive的Calcite和CBO、RBO比起商业化DataBrick的Catalyst优势不明显,这种方式性能并不理想,使用SparkSQL的应该才是大多数。

验证

已经在prod环境充分验证,加入参数后敲:

hadoop fs -count
hadoop fs -du -s -h
hadoop fs -du -h

可以看到小文件情况明显改善。

到这里,肤浅的SQL Boy们就可以止步,已经够用了。

调整reducer个数

有时候,出于种种原因【数据量基本可以预知】,我们希望可以像Spark那样写死文件的个数。这样后续的任务调优、资源配额就比较方便。

Spark默认200个Task:

spark.sql.shuffle.partitions=200

只需要最后写文件或者sql跑insert overwrite前来一句免Shuffle高性能的:

df.coalesce(1)

或者肤浅的Sql Boy们比较能接受的Shuffle低性能的:

df1.repartition(1)

即可。Tez当然也是有办法实现这种写死文件个数的效果,那就是限制reducer个数,利用利用distribute by打散到reducer,每个reducer会写一个文件,最终文件个数就是写死的reducer个数。

验证

create external table if not exists test_small_file1(id int,message string
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_small_file2(id int,message string
)
partitioned by(dt string
)
stored as parquet
location '这里写自己集群的即可'
;

作为结果表。

create external table if not exists test_data_source(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_data_source_100w(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;create external table if not exists test_data_source_1000w(id int,message string,dt
)
stored as parquet
location '这里写自己集群的即可'
;

作为取数的数据源表。

然后insert数据产生小文件:

insert into test_data_source values(1,'a1',20230310);
...	--这里自己填充
insert into test_data_source values(20,'a20',20230310);insert into test_data_source values(21,'a21',20230311);
...
insert into test_data_source values(40,'a40',20230311);insert into test_data_source values(100,'a100',20230312);
...
insert into test_data_source values(199,'a199',20230312);insert into test_data_source values(200,'a200',20230313);
...
insert into test_data_source values(299,'a299',20230313);

此时在Impala执行:

show files in db_name.test_data_source;

或者直接HDFS查看:

hadoop fs -du -h

都可以看到240个小文件。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source;

可以看到结果是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source;

可以看到是6个小文件。

这点数据就会产生小文件。

Mock出百万级别的数据量:

insert overwrite table test_data_source_100w
select * from test_date_source;
;
insert overwrite table test_data_source_100w
select * from test_data_source_100w;
;

多次执行,直到数据量超过100w。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source_100w;

可以看到结果还是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w;

可以看到是8个小文件。

重头戏来了:

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file1
select id,message from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

打散为10个小文件。

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

由于4个日期,所以打散成40个小文件。

之后Mock出千万级别的也类似。直接说结果:

最终test_small_file1表有5个小文件,test_small_file2有20个小文件。

但是使用参数后:

最终test_small_file1表有10个小文件,test_small_file2有40个小文件。

所以可以看出,distribute by cast(rand() * 100 as int)这个操作利用了Hash Partitioner,结果散步到了Reduce Task。最后文件的个数=Reduce Task个数。

分区表是每个Partition的结果再次Hash Partitioner打散,所以每个分区路径的parquet文件个数都是=Reduce Task个数。

由于hive.exec.reducers.bytes.per.reduce可以设置的很大,那么只需要修改mapreduce.job.reduces的值,就可以让Tez跑HQL任务时写固定个数的文件。

ACID表的ORC小文件

由于Sql Boy们比较顽固,总是想着把Hive当RDBMS来用,于是。。。说多了都是泪。。。

Hive的ACID表会产生大量的小文件,阈值配置的不合适时,触发合并的次数就很少,导致小文件越来越多。这有点像Hudi的MOR【merge on read】。当下游HQL任务从这些ACID表读数据时,就会由于Map Task过多,出现极其严重的性能问题,这个故事慢慢讲。于是Sql Boy们养成了手动合并小文件的好习惯:

alter table tb_name compact 'major';
alter table tb_name compact 'minor';

可以通过:

show compactions;

查看提交到Yarn的合并任务的记录,自己把unix的timestamp换算成人类能看懂的时间,就知道近期的合并情况。。。任务跑得慢,想起来了就手动合并一下。。。

总结

Hive On Tez主要就是这3种合并小文件的方式。

在这里插入图片描述
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129511318

这篇关于Hive On Tez小文件合并的技术调研的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823585

相关文章

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

Python中高级文本模式匹配与查找技术指南

《Python中高级文本模式匹配与查找技术指南》文本处理是编程世界的永恒主题,而模式匹配则是文本处理的基石,本文将深度剖析PythonCookbook中的核心匹配技术,并结合实际工程案例展示其应用,希... 目录引言一、基础工具:字符串方法与序列匹配二、正则表达式:模式匹配的瑞士军刀2.1 re模块核心AP

MySQL进行分片合并的实现步骤

《MySQL进行分片合并的实现步骤》分片合并是指在分布式数据库系统中,将不同分片上的查询结果进行整合,以获得完整的查询结果,下面就来具体介绍一下,感兴趣的可以了解一下... 目录环境准备项目依赖数据源配置分片上下文分片查询和合并代码实现1. 查询单条记录2. 跨分片查询和合并测试结论分片合并(Shardin

基于Python实现进阶版PDF合并/拆分工具

《基于Python实现进阶版PDF合并/拆分工具》在数字化时代,PDF文件已成为日常工作和学习中不可或缺的一部分,本文将详细介绍一款简单易用的PDF工具,帮助用户轻松完成PDF文件的合并与拆分操作... 目录工具概述环境准备界面说明合并PDF文件拆分PDF文件高级技巧常见问题完整源代码总结在数字化时代,PD

解决hive启动时java.net.ConnectException:拒绝连接的问题

《解决hive启动时java.net.ConnectException:拒绝连接的问题》Hadoop集群连接被拒,需检查集群是否启动、关闭防火墙/SELinux、确认安全模式退出,若问题仍存,查看日志... 目录错误发生原因解决方式1.关闭防火墙2.关闭selinux3.启动集群4.检查集群是否正常启动5.

pandas数据的合并concat()和merge()方式

《pandas数据的合并concat()和merge()方式》Pandas中concat沿轴合并数据框(行或列),merge基于键连接(内/外/左/右),concat用于纵向或横向拼接,merge用于... 目录concat() 轴向连接合并(1) join='outer',axis=0(2)join='o

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

Python实现PDF按页分割的技术指南

《Python实现PDF按页分割的技术指南》PDF文件处理是日常工作中的常见需求,特别是当我们需要将大型PDF文档拆分为多个部分时,下面我们就来看看如何使用Python创建一个灵活的PDF分割工具吧... 目录需求分析技术方案工具选择安装依赖完整代码实现使用说明基本用法示例命令输出示例技术亮点实际应用场景扩

Python使用python-can实现合并BLF文件

《Python使用python-can实现合并BLF文件》python-can库是Python生态中专注于CAN总线通信与数据处理的强大工具,本文将使用python-can为BLF文件合并提供高效灵活... 目录一、python-can 库:CAN 数据处理的利器二、BLF 文件合并核心代码解析1. 基础合

Qt如何实现文本编辑器光标高亮技术

《Qt如何实现文本编辑器光标高亮技术》这篇文章主要为大家详细介绍了Qt如何实现文本编辑器光标高亮技术,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录实现代码函数作用概述代码详解 + 注释使用 QTextEdit 的高亮技术(重点)总结用到的关键技术点应用场景举例示例优化建议