详解 Flink CDC 的介绍和入门案例

2024-06-14 15:28

本文主要是介绍详解 Flink CDC 的介绍和入门案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、Flink CDC 简介

1. CDC 介绍

​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2. CDC 种类

基于查询的 CDC基于 Binlog 的 CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3. Flink CDC 介绍

​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 实现

1.1 导入依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
1.2 编写程序代码
public class FlinkCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序//1.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//1.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//1.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//1.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//1.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//1.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "lgb");//2. 创建 FlinkCDC Source/*StartupOptions 有 5 种类型:1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog3. latest:从 binlog 的最近位置监控读取4. specificOffset:从 binlog 的指定位置读取5. timestamp:从 binlog 的指定时间戳读取*/DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder().hostname("hadoop102") //Mysql所在主机名.port(3306) //mysql端口号.username("root") //登录mysql用户名.password("123456") //登录mysql密码.databaseList("cdc_test") //监控的数据库列表,可变参数.tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表.deserializer(new StringDebeziumDeserializationSchema()) //反序列化器.startupOptions(StartupOptions.initial()) //指定读取策略.build();//3. 通过 FlinkCDC Source 创建 DataStreamDataStream<String> dataStream = env.addSource(mysqlSource);//4. 打印输出流dataStream.print();//5. 启动任务env.execute("FlinkCDC");}
}
1.3 测试
1.3.1 本地测试
  • 开启 MySQL Binlog 并重启 MySQL
  • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
  • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
  • 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
  • 将 FlinkCDC 程序进行打包并上传到集群

  • 启动 Hadoop、zookeeper 和 Flink 集群

  • 运行 FlinkCDC 程序

    bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
  • 给当前的 Flink 程序创建 Savepoint

    bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
  • 停止 FlinkCDC 程序

  • 在Mysql数据表中进行增删改操作

  • 从 Savepoint 重启程序查看程序输出结果

    bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    

2. Flink SQL 实现

2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持

public class FlinkSQLCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2. 创建 FlinkSQL 表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offsettableEnv.executeSql("create table user_info (" +"id String primary key, name String, sex String) with (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'initial'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'cdc_test'," +" 'table-name' = 'user_info'" +")");//4. 查询输出表中数据Table table = tableEnv.sqlQuery("select * from user_info");DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);dataStream.print();//5. 启动任务env.execute("FlinkSqlCDC");}
}

3. 自定义反序列化器

规范化数据输出格式,方便后续解析

/**自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {/*想要展示的数据格式:{"dbName":"","tableName":"","before":{"field1":"value1",...},"after":{"field1":"value1",...},"op":""}*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {JSONObject result = new JSONObject();//1.获取库名和表名String topic = sourceRecord.topic();String[] fields = topic.split("\\.");//2. 获取 before 数据Struct value = (Struct) sourceRecord.value();Struct before = value.getStruct("before");JSONObject beforeJSON = new JSONObject();if(before != null) {Schema schema = before.schema();List<Field> fields = schema.fields();for(Field field : fields) {beforeJSON.put(field.name(), before.get(field));}}//3. 获取 after 数据Struct after = value.getStruct("after");JSONObject afterJSON = new JSONObject();if(after != null) {Schema schema = after.schema();List<Field> fields = schema.fields();for(Field field : fields) {afterJSON.put(field.name(), after.get(field));}}//4. 获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);result.put("dbName", fields[1]);result.put("tableName", fields[2]);result.put("before", beforeJSON);result.put("after", afterJSON);result.put("op", operation);collcetor.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}	
}

这篇关于详解 Flink CDC 的介绍和入门案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1060788

相关文章

HTML5 搜索框Search Box详解

《HTML5搜索框SearchBox详解》HTML5的搜索框是一个强大的工具,能够有效提升用户体验,通过结合自动补全功能和适当的样式,可以创建出既美观又实用的搜索界面,这篇文章给大家介绍HTML5... html5 搜索框(Search Box)详解搜索框是一个用于输入查询内容的控件,通常用于网站或应用程

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

CSS3中的字体及相关属性详解

《CSS3中的字体及相关属性详解》:本文主要介绍了CSS3中的字体及相关属性,详细内容请阅读本文,希望能对你有所帮助... 字体网页字体的三个来源:用户机器上安装的字体,放心使用。保存在第三方网站上的字体,例如Typekit和Google,可以link标签链接到你的页面上。保存在你自己Web服务器上的字

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

MyBatis ResultMap 的基本用法示例详解

《MyBatisResultMap的基本用法示例详解》在MyBatis中,resultMap用于定义数据库查询结果到Java对象属性的映射关系,本文给大家介绍MyBatisResultMap的基本... 目录MyBATis 中的 resultMap1. resultMap 的基本语法2. 简单的 resul

MybatisPlus service接口功能介绍

《MybatisPlusservice接口功能介绍》:本文主要介绍MybatisPlusservice接口功能介绍,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友... 目录Service接口基本用法进阶用法总结:Lambda方法Service接口基本用法MyBATisP

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Mybatis Plus Join使用方法示例详解

《MybatisPlusJoin使用方法示例详解》:本文主要介绍MybatisPlusJoin使用方法示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,... 目录1、pom文件2、yaml配置文件3、分页插件4、示例代码:5、测试代码6、和PageHelper结合6