Flink Hive Catalog操作案例

2023-10-27 16:12
文章标签 操作 案例 flink hive catalog

本文主要是介绍Flink Hive Catalog操作案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。

基础环境

Hive-3.1.3
Flink-1.17.1

基本操作与准备

1、上传依赖jar包到flink/lib目录下

cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar

2、更换planner依赖(Hive集成的推荐设置)

mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/

3、启动Hive MetaStore

nohup hive --service metastore 2>&1 &

4、启动flink集群和sql-client

yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session

5、在flink sql-client中创建hive catalog

CREATE CATALOG hive WITH ('type' = 'hive','default-database' = 'sty','hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);

非分区表读写

1、Hive中建表并插入数据

create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');

2、使用hive catalog

use catalog hive;

2、flink sql-client中执行数据插入与数据查询(和常规sql一致)

insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;

在这里插入图片描述

分区表读写

这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092

写入到hive分区表
streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
hive表ddl中需要指定以下TBLPROPERTIES:
sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
作者:spongebobZ
链接:https://www.jianshu.com/p/295066a24092
来源:简书

1、hive创建分区表并插入数据

create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties('sink.partition-commit.trigger' = 'partition-time','sink.partition-commit.policy.kind'='metastore,success-file','partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss','sink.partition-commit.delay' = '10'
);insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);

注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:

Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind

2、flink sql-client插入与查询数据

insert into  userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;

在这里插入图片描述

这篇关于Flink Hive Catalog操作案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/286800

相关文章

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

Python操作PDF文档的主流库使用指南

《Python操作PDF文档的主流库使用指南》PDF因其跨平台、格式固定的特性成为文档交换的标准,然而,由于其复杂的内部结构,程序化操作PDF一直是个挑战,本文主要为大家整理了Python操作PD... 目录一、 基础操作1.PyPDF2 (及其继任者 pypdf)2.PyMuPDF / fitz3.Fre

Python对接支付宝支付之使用AliPay实现的详细操作指南

《Python对接支付宝支付之使用AliPay实现的详细操作指南》支付宝没有提供PythonSDK,但是强大的github就有提供python-alipay-sdk,封装里很多复杂操作,使用这个我们就... 目录一、引言二、准备工作2.1 支付宝开放平台入驻与应用创建2.2 密钥生成与配置2.3 安装ali

MySQL 强制使用特定索引的操作

《MySQL强制使用特定索引的操作》MySQL可通过FORCEINDEX、USEINDEX等语法强制查询使用特定索引,但优化器可能不采纳,需结合EXPLAIN分析执行计划,避免性能下降,注意版本差异... 目录1. 使用FORCE INDEX语法2. 使用USE INDEX语法3. 使用IGNORE IND

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

Ubuntu 24.04启用root图形登录的操作流程

《Ubuntu24.04启用root图形登录的操作流程》Ubuntu默认禁用root账户的图形与SSH登录,这是为了安全,但在某些场景你可能需要直接用root登录GNOME桌面,本文以Ubuntu2... 目录一、前言二、准备工作三、设置 root 密码四、启用图形界面 root 登录1. 修改 GDM 配

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

JSONArray在Java中的应用操作实例

《JSONArray在Java中的应用操作实例》JSONArray是org.json库用于处理JSON数组的类,可将Java对象(Map/List)转换为JSON格式,提供增删改查等操作,适用于前后端... 目录1. jsONArray定义与功能1.1 JSONArray概念阐释1.1.1 什么是JSONA

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数