Flink写出数据到 MySql 控制事务,保证Exactly_Once

2024-05-10 13:32

本文主要是介绍Flink写出数据到 MySql 控制事务,保证Exactly_Once,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、MySql Sink
    • 二、控制事务代码
        • 1、主线代码
        • 2、Druid 数据库连接池类

一、MySql Sink

要想使用TwoPhaseCommitSinkFunction,存储系统必须支持事务

Mysql Sink继承TwoPhaseCommitSinkFunction抽象类,分两个阶段提交Sink,保证Exactly_Once:

  • ①做checkpoint
  • ② 提交事务

二、控制事务代码

1、主线代码

参数详解:

  • ① 输入的类型
  • ② connection数据库连接对象
  • ③ 什么都不指定,为了泛型,写void
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>,MySqlTwoPhaseCommitSink.ConnectionState, Void> {// 定义可用的构造函数public MySqlTwoPhaseCommitSink() {super(new KryoSerializer<>(MySqlTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()),VoidSerializer.INSTANCE);}@Overrideprotected ConnectionState beginTransaction() throws Exception {System.out.println("=====> beginTransaction... ");//使用连接池,不使用单个连接//Class.forName("com.mysql.jdbc.Driver");//Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200// .101:3306/bigdata?characterEncoding=UTF-8", "root", "123456");Connection connection = DruidConnectionPool.getConnection();connection.setAutoCommit(false);//设定不自动提交return new ConnectionState(connection);}@Overrideprotected void invoke(ConnectionState transaction, Tuple2<String, Integer> value, Context context) throws Exception {Connection connection = transaction.connection;PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON" +" DUPLICATE KEY UPDATE counts = ?");pstm.setString(1, value.f0);pstm.setInt(2, value.f1);pstm.setInt(3, value.f1);pstm.executeUpdate();pstm.close();}// 先不做处理@Overrideprotected void preCommit(ConnectionState transaction) throws Exception {System.out.println("=====> preCommit... " + transaction);}//提交事务@Overrideprotected void commit(ConnectionState transaction) {System.out.println("=====> commit... ");Connection connection = transaction.connection;try {connection.commit();connection.close();} catch (SQLException e) {throw new RuntimeException("提交事物异常");}}//回滚事务@Overrideprotected void abort(ConnectionState transaction) {System.out.println("=====> abort... ");Connection connection = transaction.connection;try {connection.rollback();connection.close();} catch (SQLException e) {throw new RuntimeException("回滚事物异常");}}//定义建立数据库连接的方法public static class ConnectionState {private final transient Connection connection;public ConnectionState(Connection connection) {this.connection = connection;}}
}
2、Druid 数据库连接池类

为什么要用连接池?

  • 因为每个数据库连接都要控制事务
import com.alibaba.druid.pool.DruidDataSourceFactory;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;public class DruidConnectionPool {private transient static DataSource dataSource = null;private transient static Properties props = new Properties();// 静态代码块static {props.put("driverClassName", "com.mysql.jdbc.Driver");props.put("url", "jdbc:mysql://localhost:3306/day01?characterEncoding=utf8");props.put("username", "root");props.put("password", "123456");try {dataSource = DruidDataSourceFactory.createDataSource(props);} catch (Exception e) {e.printStackTrace();}}private DruidConnectionPool() {}public static Connection getConnection() throws SQLException {return dataSource.getConnection();}
}

这篇关于Flink写出数据到 MySql 控制事务,保证Exactly_Once的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/976573

相关文章

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL逻辑删除与唯一索引冲突解决方案

《MySQL逻辑删除与唯一索引冲突解决方案》本文探讨MySQL逻辑删除与唯一索引冲突问题,提出四种解决方案:复合索引+时间戳、修改唯一字段、历史表、业务层校验,推荐方案1和方案3,适用于不同场景,感兴... 目录问题背景问题复现解决方案解决方案1.复合唯一索引 + 时间戳删除字段解决方案2:删除后修改唯一字

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

MySQL深分页进行性能优化的常见方法

《MySQL深分页进行性能优化的常见方法》在Web应用中,分页查询是数据库操作中的常见需求,然而,在面对大型数据集时,深分页(deeppagination)却成为了性能优化的一个挑战,在本文中,我们将... 目录引言:深分页,真的只是“翻页慢”那么简单吗?一、背景介绍二、深分页的性能问题三、业务场景分析四、

MySQL 迁移至 Doris 最佳实践方案(最新整理)

《MySQL迁移至Doris最佳实践方案(最新整理)》本文将深入剖析三种经过实践验证的MySQL迁移至Doris的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于CDC(ChangeData... 目录一、China编程JDBC Catalog 联邦查询方案(适合跨库实时查询)1. 方案概述2. 环境要求3.

SQL server数据库如何下载和安装

《SQLserver数据库如何下载和安装》本文指导如何下载安装SQLServer2022评估版及SSMS工具,涵盖安装配置、连接字符串设置、C#连接数据库方法和安全注意事项,如混合验证、参数化查... 目录第一步:打开官网下载对应文件第二步:程序安装配置第三部:安装工具SQL Server Manageme