Flink写出数据到Hbase的Sink

2024-05-10 13:32
文章标签 数据 flink sink hbase 写出

本文主要是介绍Flink写出数据到Hbase的Sink,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、MyHbaseSink
        • 1、继承RichSinkFunction<输入的数据类型>类
        • 2、实现open方法,创建连接对象
        • 3、实现invoke方法,批次写入数据到Hbase
        • 4、实现close方法,关闭连接
    • 二、HBaseUtil工具类

一、MyHbaseSink

1、继承RichSinkFunction<输入的数据类型>类
public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {private transient Integer maxSize = 1000;private transient Long delayTime = 5000L;public MyHbaseSink() {}public MyHbaseSink(Integer maxSize, Long delayTime) {this.maxSize = maxSize;this.delayTime = delayTime;}private transient Connection connection;private transient Long lastInvokeTime;private transient List<Put> puts = new ArrayList<>(maxSize);
2、实现open方法,创建连接对象
    // 创建连接@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 获取全局配置文件,并转为ParameterToolParameterTool params =(ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//创建一个Hbase的连接connection = HBaseUtil.getConnection(params.getRequired("hbase.zookeeper.quorum"),params.getInt("hbase.zookeeper.property.clientPort", 2181));// 获取系统当前时间lastInvokeTime = System.currentTimeMillis();}
3、实现invoke方法,批次写入数据到Hbase
    @Overridepublic void invoke(Tuple2<String, Double> value, Context context) throws Exception {String rk = value.f0;//创建put对象,并赋rk值Put put = new Put(rk.getBytes());// 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25put.addColumn("f1".getBytes(), "order".getBytes(), value.f1.toString().getBytes());puts.add(put);// 添加put对象到list集合//使用ProcessingTimelong currentTime = System.currentTimeMillis();//开始批次提交数据if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {//获取一个Hbase表Table table = connection.getTable(TableName.valueOf("database:table"));table.put(puts);//批次提交puts.clear();lastInvokeTime = currentTime;table.close();}}
4、实现close方法,关闭连接
    @Overridepublic void close() throws Exception {connection.close();}

二、HBaseUtil工具类

  • Hbase的工具类,用来创建Hbase的Connection
public class HBaseUtil {/*** @param zkQuorum zookeeper地址,多个要用逗号分隔* @param port     zookeeper端口号* @return connection*/public static Connection getConnection(String zkQuorum, int port) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", zkQuorum);conf.set("hbase.zookeeper.property.clientPort", port + "");Connection connection = ConnectionFactory.createConnection(conf);return connection;}
}

这篇关于Flink写出数据到Hbase的Sink的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/976574

相关文章

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模