详解 Flink CDC 的介绍和入门案例

2024-06-14 15:28

本文主要是介绍详解 Flink CDC 的介绍和入门案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、Flink CDC 简介

1. CDC 介绍

​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2. CDC 种类

基于查询的 CDC基于 Binlog 的 CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3. Flink CDC 介绍

​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 实现

1.1 导入依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
1.2 编写程序代码
public class FlinkCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序//1.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//1.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//1.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//1.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//1.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//1.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "lgb");//2. 创建 FlinkCDC Source/*StartupOptions 有 5 种类型:1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog3. latest:从 binlog 的最近位置监控读取4. specificOffset:从 binlog 的指定位置读取5. timestamp:从 binlog 的指定时间戳读取*/DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder().hostname("hadoop102") //Mysql所在主机名.port(3306) //mysql端口号.username("root") //登录mysql用户名.password("123456") //登录mysql密码.databaseList("cdc_test") //监控的数据库列表,可变参数.tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表.deserializer(new StringDebeziumDeserializationSchema()) //反序列化器.startupOptions(StartupOptions.initial()) //指定读取策略.build();//3. 通过 FlinkCDC Source 创建 DataStreamDataStream<String> dataStream = env.addSource(mysqlSource);//4. 打印输出流dataStream.print();//5. 启动任务env.execute("FlinkCDC");}
}
1.3 测试
1.3.1 本地测试
  • 开启 MySQL Binlog 并重启 MySQL
  • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
  • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
  • 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
  • 将 FlinkCDC 程序进行打包并上传到集群

  • 启动 Hadoop、zookeeper 和 Flink 集群

  • 运行 FlinkCDC 程序

    bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
  • 给当前的 Flink 程序创建 Savepoint

    bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
  • 停止 FlinkCDC 程序

  • 在Mysql数据表中进行增删改操作

  • 从 Savepoint 重启程序查看程序输出结果

    bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    

2. Flink SQL 实现

2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持

public class FlinkSQLCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2. 创建 FlinkSQL 表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offsettableEnv.executeSql("create table user_info (" +"id String primary key, name String, sex String) with (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'initial'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'cdc_test'," +" 'table-name' = 'user_info'" +")");//4. 查询输出表中数据Table table = tableEnv.sqlQuery("select * from user_info");DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);dataStream.print();//5. 启动任务env.execute("FlinkSqlCDC");}
}

3. 自定义反序列化器

规范化数据输出格式,方便后续解析

/**自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {/*想要展示的数据格式:{"dbName":"","tableName":"","before":{"field1":"value1",...},"after":{"field1":"value1",...},"op":""}*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {JSONObject result = new JSONObject();//1.获取库名和表名String topic = sourceRecord.topic();String[] fields = topic.split("\\.");//2. 获取 before 数据Struct value = (Struct) sourceRecord.value();Struct before = value.getStruct("before");JSONObject beforeJSON = new JSONObject();if(before != null) {Schema schema = before.schema();List<Field> fields = schema.fields();for(Field field : fields) {beforeJSON.put(field.name(), before.get(field));}}//3. 获取 after 数据Struct after = value.getStruct("after");JSONObject afterJSON = new JSONObject();if(after != null) {Schema schema = after.schema();List<Field> fields = schema.fields();for(Field field : fields) {afterJSON.put(field.name(), after.get(field));}}//4. 获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);result.put("dbName", fields[1]);result.put("tableName", fields[2]);result.put("before", beforeJSON);result.put("after", afterJSON);result.put("op", operation);collcetor.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}	
}

这篇关于详解 Flink CDC 的介绍和入门案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1060788

相关文章

Spring WebClient从入门到精通

《SpringWebClient从入门到精通》本文详解SpringWebClient非阻塞响应式特性及优势,涵盖核心API、实战应用与性能优化,对比RestTemplate,为微服务通信提供高效解决... 目录一、WebClient 概述1.1 为什么选择 WebClient?1.2 WebClient 与

idea的终端(Terminal)cmd的命令换成linux的命令详解

《idea的终端(Terminal)cmd的命令换成linux的命令详解》本文介绍IDEA配置Git的步骤:安装Git、修改终端设置并重启IDEA,强调顺序,作为个人经验分享,希望提供参考并支持脚本之... 目录一编程、设置前二、前置条件三、android设置四、设置后总结一、php设置前二、前置条件

python中列表应用和扩展性实用详解

《python中列表应用和扩展性实用详解》文章介绍了Python列表的核心特性:有序数据集合,用[]定义,元素类型可不同,支持迭代、循环、切片,可执行增删改查、排序、推导式及嵌套操作,是常用的数据处理... 目录1、列表定义2、格式3、列表是可迭代对象4、列表的常见操作总结1、列表定义是处理一组有序项目的

python使用try函数详解

《python使用try函数详解》Pythontry语句用于异常处理,支持捕获特定/多种异常、else/final子句确保资源释放,结合with语句自动清理,可自定义异常及嵌套结构,灵活应对错误场景... 目录try 函数的基本语法捕获特定异常捕获多个异常使用 else 子句使用 finally 子句捕获所

C++11范围for初始化列表auto decltype详解

《C++11范围for初始化列表autodecltype详解》C++11引入auto类型推导、decltype类型推断、统一列表初始化、范围for循环及智能指针,提升代码简洁性、类型安全与资源管理效... 目录C++11新特性1. 自动类型推导auto1.1 基本语法2. decltype3. 列表初始化3

SQL Server 中的 WITH (NOLOCK) 示例详解

《SQLServer中的WITH(NOLOCK)示例详解》SQLServer中的WITH(NOLOCK)是一种表提示,等同于READUNCOMMITTED隔离级别,允许查询在不获取共享锁的情... 目录SQL Server 中的 WITH (NOLOCK) 详解一、WITH (NOLOCK) 的本质二、工作

springboot自定义注解RateLimiter限流注解技术文档详解

《springboot自定义注解RateLimiter限流注解技术文档详解》文章介绍了限流技术的概念、作用及实现方式,通过SpringAOP拦截方法、缓存存储计数器,结合注解、枚举、异常类等核心组件,... 目录什么是限流系统架构核心组件详解1. 限流注解 (@RateLimiter)2. 限流类型枚举 (

Java Thread中join方法使用举例详解

《JavaThread中join方法使用举例详解》JavaThread中join()方法主要是让调用改方法的thread完成run方法里面的东西后,在执行join()方法后面的代码,这篇文章主要介绍... 目录前言1.join()方法的定义和作用2.join()方法的三个重载版本3.join()方法的工作原

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Spring AI使用tool Calling和MCP的示例详解

《SpringAI使用toolCalling和MCP的示例详解》SpringAI1.0.0.M6引入ToolCalling与MCP协议,提升AI与工具交互的扩展性与标准化,支持信息检索、行动执行等... 目录深入探索 Spring AI聊天接口示例Function CallingMCPSTDIOSSE结束语