实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作

本文主要是介绍实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

系列文章目录

实践数据湖iceberg 第一课 入门
实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式
实践数据湖iceberg 第三课 在sqlclient中,以sql方式从kafka读数据到iceberg
实践数据湖iceberg 第四课 在sqlclient中,以sql方式从kafka读数据到iceberg(升级版本到flink1.12.7)
实践数据湖iceberg 第五课 hive catalog特点
实践数据湖iceberg 第六课 从kafka写入到iceberg失败问题 解决
实践数据湖iceberg 第七课 实时写入到iceberg
实践数据湖iceberg 第八课 hive与iceberg集成
实践数据湖iceberg 第九课 合并小文件
实践数据湖iceberg 第十课 快照删除
实践数据湖iceberg 第十一课 测试分区表完整流程(造数、建表、合并、删快照)
实践数据湖iceberg 第十二课 catalog是什么
实践数据湖iceberg 第十三课 metadata比数据文件大很多倍的问题
实践数据湖iceberg 第十四课 元数据合并(解决元数据随时间增加而元数据膨胀的问题)
实践数据湖iceberg 第十五课 spark安装与集成iceberg(jersey包冲突)
实践数据湖iceberg 第十六课 通过spark3打开iceberg的认知之门
实践数据湖iceberg 第十七课 hadoop2.7,spark3 on yarn运行iceberg配置
实践数据湖iceberg 第十八课 多种客户端与iceberg交互启动命令(常用命令)
实践数据湖iceberg 第十九课 flink count iceberg,无结果问题
实践数据湖iceberg 第二十课 flink + iceberg CDC场景(版本问题,测试失败)
实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败)
实践数据湖iceberg 第二十二课 flink1.13.5 + iceberg0.131 CDC(CRUD测试成功)
实践数据湖iceberg 第二十三课 flink-sql从checkpoint重启
实践数据湖iceberg 第二十四课 iceberg元数据详细解析
实践数据湖iceberg 第二十五课 后台运行flink sql 增删改的效果
实践数据湖iceberg 第二十六课 checkpoint设置方法
实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作
实践数据湖iceberg 第二十八课 把公有仓库上不存在的包部署到本地仓库
实践数据湖iceberg 第二十九课 如何优雅高效获取flink的jobId
实践数据湖iceberg 第三十课 mysql->iceberg,不同客户端有时区问题
实践数据湖iceberg 更多的内容目录

文章目录

  • 系列文章目录
  • 前言
  • 一、初始化
    • 1.1 代码
    • 1.2 启动命令
    • 1.3.引入库
    • 1.4 sink的iceberg表查询:
    • 1.5 页面查看,开启了checkpoint
  • 二、停止作业
    • 2.1 cancel作业
    • 2.2 写入2条数据
  • 三、 从checkpoint恢复
  • 总结


前言

程序化部署,测试flink cdc重启恢复
测试思路:1.程序停止时,进行checkpoint记录,记录checkpoint的位置 2.程序停止时,写入数据, 记录写入的数据, 测试重启后,能否从故障点开始恢复。
结论:能


一、初始化

1.1 代码

代码思路: 1. 定义source表, 2.定义sink表 3. 写入sink from source

public static void main(String[] args) throws Exception {FromTableToIcebergSqlTemple temple = new FromMysqlToIcebergSql();String fromSql = temple.createFromTableSql();String createToTableSql = temple.createIcebergTableSql();String createIcebergCatalog = temple.createIcebergCatalogSql();System.setProperty("HADOOP_USER_NAME", "root");//TODO 1.准备环境//1.1流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(10000L);env.setParallelism(1);//1.2 表执行环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//1.3 建source表tableEnv.executeSql(fromSql);//1.4 建iceberg表tableEnv.executeSql(createIcebergCatalog);tableEnv.executeSql("use catalog "+ temple.icebergCatalogName());tableEnv.executeSql("use "+ temple.icebergDbName());tableEnv.executeSql(createToTableSql);//1.5 执行sqltableEnv.executeSql("use catalog default_catalog");tableEnv.executeSql("use default_database");tableEnv.executeSql(temple.sourceToIcebergSinkSql());//TODO 6.执行任务env.execute();}

1.2 启动命令

108060 YarnCoarseGrainedExecutorBackend
[root@hadoop101 apps]# [root@hadoop101 apps]# flink run -c com.jintemg.cdc.FlinkSqlCdcRunner -C file:///opt/software/flink1.13-iceberg0131/iceberg-flink-runtime-1.13-0.13.1.jar -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-mysql-cdc-2.1.1.jar flink-iceberg-learning-1.0-SNAPSHOT.jar

1.3.引入库

清空表,写入3条数据

INSERT INTO `stock_basic` VALUES ('0', '000001.SZ', '000001', '平安银行', '深圳', '银行', '19910403', null);
INSERT INTO `stock_basic` VALUES ('1', '000002.SZ', '000002', '万科A', '深圳', '全国地产', '19910129', null);
INSERT INTO `stock_basic` VALUES ('2', '000004.SZ', '000004', '国华网安', '深圳', '软件服务', '19910114', '李映彤');

1.4 sink的iceberg表查询:

Time taken: 0.4 seconds, Fetched 3 row(s)
spark-sql (default)> select * from stock_basic_iceberg_sink;
22/04/07 16:06:45 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
i       ts_code symbol  name    area    industry        list_date       actural_controller
0       000001.SZ       000001  平安银行        深圳    银行    19910403        NULL
1       000002.SZ       000002  万科A   深圳    全国地产        19910129        NULL
2       000004.SZ       000004  国华网安        深圳    软件服务        19910114        李映彤
Time taken: 0.519 seconds, Fetched 3 row(s)

1.5 页面查看,开启了checkpoint

在这里插入图片描述

checkpint地址
Path: hdfs:/flink/checkpoints/aa8a8b5313bee126401e30e8e03491de/chk-223

二、停止作业

2.1 cancel作业

2.2 写入2条数据

写入2条数据

INSERT INTO `stock_basic` VALUES ('3', '000005.SZ', '000005', 'ST星源', '深圳', '环境保护', '19901210', '郑列列,丁芃');
INSERT INTO `stock_basic` VALUES ('4', '000006.SZ', '000006', '深振业A', '深圳', '区域地产', '19920427', '深圳市人民政府国有资产监督管理委员会');

三、 从checkpoint恢复

整体思路:检查是否从id=3开始同步,把3,4同步过来,0,1,2没有同步.

从checkpoint恢复命令:

[root@hadoop101 apps]#  flink run -s hdfs:///flink/checkpoints/aa8a8b5313bee126401e30e8e03491de/chk-224  -c com.jintemg.cdc.FlinkSqlCdcRunner  -C file:///opt/software/flink1.13-iceberg0131/iceberg-flink-runtime-1.13-0.13.1.jar -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar  -C file:///opt/software/flink1.13-iceberg0131/flink-sql-connector-mysql-cdc-2.1.1.jar  flink-iceberg-learning-1.0-SNAPSHOT.jar

结果: 到iceberg查,发现从上次中断消费开始继续

spark-sql (default)> select * from stock_basic_iceberg_sink;
22/04/07 16:58:55 WARN conf.HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
i       ts_code symbol  name    area    industry        list_date       actural_controller
0       000001.SZ       000001  平安银行        深圳    银行    19910403        NULL
1       000002.SZ       000002  万科A   深圳    全国地产        19910129        NULL
2       000004.SZ       000004  国华网安        深圳    软件服务        19910114        李映彤
3       000005.SZ       000005  ST星源  深圳    环境保护        19901210        郑列列,丁芃
4       000006.SZ       000006  深振业A 深圳    区域地产        19920427        深圳市人民政府国有资产监督管理委员会

总结

发现从cdc是能从checkpoint恢复,程序正常运行。
现在有下一个问题:如何在程序中获取 本任务的checkpoint位置? 本任务的jobId?

这篇关于实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/642889

相关文章

JDK21对虚拟线程的几种用法实践指南

《JDK21对虚拟线程的几种用法实践指南》虚拟线程是Java中的一种轻量级线程,由JVM管理,特别适合于I/O密集型任务,:本文主要介绍JDK21对虚拟线程的几种用法,文中通过代码介绍的非常详细,... 目录一、参考官方文档二、什么是虚拟线程三、几种用法1、Thread.ofVirtual().start(

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

springboot依靠security实现digest认证的实践

《springboot依靠security实现digest认证的实践》HTTP摘要认证通过加密参数(如nonce、response)验证身份,避免明文传输,但存在密码存储风险,相比基本认证更安全,却因... 目录概述参数Demopom.XML依赖Digest1Application.JavaMyPasswo

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

C#利用Free Spire.XLS for .NET复制Excel工作表

《C#利用FreeSpire.XLSfor.NET复制Excel工作表》在日常的.NET开发中,我们经常需要操作Excel文件,本文将详细介绍C#如何使用FreeSpire.XLSfor.NET... 目录1. 环境准备2. 核心功能3. android示例代码3.1 在同一工作簿内复制工作表3.2 在不同

Java 结构化并发Structured Concurrency实践举例

《Java结构化并发StructuredConcurrency实践举例》Java21结构化并发通过作用域和任务句柄统一管理并发生命周期,解决线程泄漏与任务追踪问题,提升代码安全性和可观测性,其核心... 目录一、结构化并发的核心概念与设计目标二、结构化并发的核心组件(一)作用域(Scopes)(二)任务句柄

Java中的Schema校验技术与实践示例详解

《Java中的Schema校验技术与实践示例详解》本主题详细介绍了在Java环境下进行XMLSchema和JSONSchema校验的方法,包括使用JAXP、JAXB以及专门的JSON校验库等技术,本文... 目录1. XML和jsON的Schema校验概念1.1 XML和JSON校验的必要性1.2 Sche

SpringBoot集成WebService(wsdl)实践

《SpringBoot集成WebService(wsdl)实践》文章介绍了SpringBoot项目中通过缓存IWebService接口实现类的泛型入参类型,减少反射调用提升性能的实现方案,包含依赖配置... 目录pom.XML创建入口ApplicationContextUtils.JavaJacksonUt

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程