Hive的UDF开发之向量化表达式(VectorizedExpressions)

2024-02-22 02:04

本文主要是介绍Hive的UDF开发之向量化表达式(VectorizedExpressions),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 背景

笔者的大数据平台XSailboat的SailWorks模块包含离线分析功能。离线分析的后台实现,包含调度引擎、执行引擎、计算引擎和存储引擎。计算和存储引擎由Hive提供,调度引擎和执行引擎由我们自己实现。调度引擎根据DAG图和调度计划,安排执行顺序,监控执行过程。执行引擎接收调度引擎安排的任务,向Yarn申请容器,在容器中执行具体的任务。

我们的离线分析支持编写Hive的UDF函数,打包上传,并声明使用函数。
在这里插入图片描述
我们通常会通过继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF来自定义自己的UDF函数,再参考Hive实现的内置UDF函数时,经常会看到在它的类名上,有@VectorizedExpressions注解,翻译过来即“向量化表达式”。在此记录一下自己学习到的知识和理解。

官方文档《Vectorized Query Execution》
有以下应该至少知道的点:

  1. 向量化查询缺省是关闭的;
  2. 要能支持向量化查询,数据存储格式必需是ORC格式(我们主要是用CSV格式)。

通常所说的向量化计算主要是从以下几个方面提升效率:

  1. 利用CPU底册指令对向量的运算
  2. 利用多核/多线程的能力进行并发计算

而Hive的向量化执行,主要是代码逻辑聚合并充分利用上下文,减少判断次数,减少对象的访问处理和序列化次数,数据切块并行。

2. 实践

package com.cimstech.udf.date;import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;import com.cimstech.xfront.common.excep.WrapException;
import com.cimstech.xfront.common.text.XString;public class VectorUDFStringToTimstamp extends VectorExpression
{private static final long serialVersionUID = 1L;/*** 列序号*/int mColNum0 ;/*** 时间格式*/String mDateFmt ;transient SimpleDateFormat mSdf ;/*** 必需得有1个无参的构造函数.		<br />* hive会先通过无参构造函数创建一个实例,然后调用getDescriptor()方法,取得描述。* 通过描述知道有哪几列,分别是什么格式的,才知道怎么调用有参构造函数。*/public VectorUDFStringToTimstamp(){super() ;}/*** 有参构造函数的参数要和getDescriptor中取得的描述相对应。* Column类型的输入,在此用int类型列序号表示			<br />* 标量列直接是相应类型即可。						* @param aColNum0* @param aDateFmt* @param aOutputColumnNum*/public VectorUDFStringToTimstamp(int aColNum0 , String aDateFmt, int aOutputColumnNum){super(aOutputColumnNum) ;mColNum0 = aColNum0 ;mDateFmt = aDateFmt ;}@Overridepublic String vectorExpressionParameters(){return getColumnParamString(0 , mColNum0)+ " , val " + mDateFmt ;}private void setDatetime(TimestampColumnVector aTimestampColVector, byte[][] aVector, int aElementNum) throws HiveException{if(mSdf == null)mSdf = new SimpleDateFormat(mDateFmt) ;String dateStr = null ;try{dateStr = new String(aVector[aElementNum] , "UTF-8") ;aTimestampColVector.getScratchTimestamp().setTime(mSdf.parse(dateStr).getTime()) ;}catch (UnsupportedEncodingException e){WrapException.wrapThrow(e) ;return ;		// dead code}catch(ParseException e){throw new HiveException(XString.msgFmt("时间字符串[{}]无法按模式[{}]解析!" , dateStr , mDateFmt)) ;}aTimestampColVector.setFromScratchTimestamp(aElementNum);}@Overridepublic void evaluate(VectorizedRowBatch aBatch) throws HiveException{if (childExpressions != null){evaluateChildren(aBatch);}int n = aBatch.size;if (n == 0)return;BytesColumnVector inputColVector = (BytesColumnVector) aBatch.cols[mColNum0];TimestampColumnVector outputColVector = (TimestampColumnVector) aBatch.cols[outputColumnNum];boolean[] inputIsNull = inputColVector.isNull;boolean[] outputIsNull = outputColVector.isNull;byte[][] vector = inputColVector.vector;if (inputColVector.isRepeating){// 如果是重复的,那么只需要解析第1个就行if (inputColVector.noNulls || !inputIsNull[0]){outputIsNull[0] = false;setDatetime(outputColVector, vector, 0);}else{// 重复,且都是null,那么没有可解析的,如下设置即可outputIsNull[0] = true;outputColVector.noNulls = false;}outputColVector.isRepeating = true;return;}elseoutputColVector.isRepeating = false;if (inputColVector.noNulls) 	// 没有为null的{// selectedInUse为true,表示选中输入中的指定行进行处理。if (aBatch.selectedInUse){int[] sel = aBatch.selected;if (!outputColVector.noNulls)		// 全局被标为了有null值,那么各个为止都需要单独设置是否为null{for (int j = 0; j != n; j++){final int i = sel[j] ;outputIsNull[i] = false;		// 某一行,单独设置不为nullsetDatetime(outputColVector, vector, i);}}else{for (int j = 0; j != n; j++){final int i = sel[j];// 全局被标为了没有null值,那么无需一行行标注非nullsetDatetime(outputColVector, vector, i);}}}else{// 输入是全局没有null值的,输出被全局标为了有null值,那么把输出改过来,改为全局没有null值if (!outputColVector.noNulls)		{Arrays.fill(outputIsNull, false);		// 所有输出都非nulloutputColVector.noNulls = true;			// 改为全局没有null值}for (int i = 0; i != n; i++){setDatetime(outputColVector, vector, i);}}}else	// 输入数据是有null的{outputColVector.noNulls = false;if (aBatch.selectedInUse){int[] sel = aBatch.selected;for (int j = 0; j != n; j++){int i = sel[j] ;outputIsNull[i] = inputIsNull[i] ;if(!outputIsNull[i])setDatetime(outputColVector, vector, i) ;}}else{System.arraycopy(inputIsNull, 0, outputIsNull, 0, n);for (int i = 0; i != n; i++){if(!outputIsNull[i])setDatetime(outputColVector, vector, i) ;}}}}@Overridepublic Descriptor getDescriptor(){return (new VectorExpressionDescriptor.Builder())// 不是过滤,都认为是投影(Projection)。投影是数据库理论中的专业术语// 投影是根据输入,构造输出,填充输出列// 过滤就是设置aBatch.selected.setMode(VectorExpressionDescriptor.Mode.PROJECTION)		.setNumArguments(2).setArgumentTypes(VectorExpressionDescriptor.ArgumentType.STRING, VectorExpressionDescriptor.ArgumentType.STRING).setInputExpressionTypes(VectorExpressionDescriptor.InputExpressionType.COLUMN, VectorExpressionDescriptor.InputExpressionType.SCALAR)		// 标量,指定的字符串常量,就是标量.build();}}

这篇关于Hive的UDF开发之向量化表达式(VectorizedExpressions)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/733811

相关文章

Django开发时如何避免频繁发送短信验证码(python图文代码)

《Django开发时如何避免频繁发送短信验证码(python图文代码)》Django开发时,为防止频繁发送验证码,后端需用Redis限制请求频率,结合管道技术提升效率,通过生产者消费者模式解耦业务逻辑... 目录避免频繁发送 验证码1. www.chinasem.cn避免频繁发送 验证码逻辑分析2. 避免频繁

Spring Boot集成/输出/日志级别控制/持久化开发实践

《SpringBoot集成/输出/日志级别控制/持久化开发实践》SpringBoot默认集成Logback,支持灵活日志级别配置(INFO/DEBUG等),输出包含时间戳、级别、类名等信息,并可通过... 目录一、日志概述1.1、Spring Boot日志简介1.2、日志框架与默认配置1.3、日志的核心作用

C++11右值引用与Lambda表达式的使用

《C++11右值引用与Lambda表达式的使用》C++11引入右值引用,实现移动语义提升性能,支持资源转移与完美转发;同时引入Lambda表达式,简化匿名函数定义,通过捕获列表和参数列表灵活处理变量... 目录C++11新特性右值引用和移动语义左值 / 右值常见的左值和右值移动语义移动构造函数移动复制运算符

PyQt5 GUI 开发的基础知识

《PyQt5GUI开发的基础知识》Qt是一个跨平台的C++图形用户界面开发框架,支持GUI和非GUI程序开发,本文介绍了使用PyQt5进行界面开发的基础知识,包括创建简单窗口、常用控件、窗口属性设... 目录简介第一个PyQt程序最常用的三个功能模块控件QPushButton(按钮)控件QLable(纯文本

基于Python开发一个图像水印批量添加工具

《基于Python开发一个图像水印批量添加工具》在当今数字化内容爆炸式增长的时代,图像版权保护已成为创作者和企业的核心需求,本方案将详细介绍一个基于PythonPIL库的工业级图像水印解决方案,有需要... 目录一、系统架构设计1.1 整体处理流程1.2 类结构设计(扩展版本)二、核心算法深入解析2.1 自

SpringBoot开发中十大常见陷阱深度解析与避坑指南

《SpringBoot开发中十大常见陷阱深度解析与避坑指南》在SpringBoot的开发过程中,即使是经验丰富的开发者也难免会遇到各种棘手的问题,本文将针对SpringBoot开发中十大常见的“坑... 目录引言一、配置总出错?是不是同时用了.properties和.yml?二、换个位置配置就失效?搞清楚加

Python中对FFmpeg封装开发库FFmpy详解

《Python中对FFmpeg封装开发库FFmpy详解》:本文主要介绍Python中对FFmpeg封装开发库FFmpy,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、FFmpy简介与安装1.1 FFmpy概述1.2 安装方法二、FFmpy核心类与方法2.1 FF

基于Python开发Windows屏幕控制工具

《基于Python开发Windows屏幕控制工具》在数字化办公时代,屏幕管理已成为提升工作效率和保护眼睛健康的重要环节,本文将分享一个基于Python和PySide6开发的Windows屏幕控制工具,... 目录概述功能亮点界面展示实现步骤详解1. 环境准备2. 亮度控制模块3. 息屏功能实现4. 息屏时间

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

使用Python开发一个现代化屏幕取色器

《使用Python开发一个现代化屏幕取色器》在UI设计、网页开发等场景中,颜色拾取是高频需求,:本文主要介绍如何使用Python开发一个现代化屏幕取色器,有需要的小伙伴可以参考一下... 目录一、项目概述二、核心功能解析2.1 实时颜色追踪2.2 智能颜色显示三、效果展示四、实现步骤详解4.1 环境配置4.