Debezium日常分享系列之:Debezium2.5稳定版本之Mysql连接器的数据变更事件

本文主要是介绍Debezium日常分享系列之:Debezium2.5稳定版本之Mysql连接器的数据变更事件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Debezium日常分享系列之:Debezium2.5稳定版本之Mysql连接器的数据变更事件

  • 一、数据变更事件
  • 二、更改事件键
  • 三、更改事件值
  • 四、创建事件
  • 五、更新事件
  • 六、主键更新
  • 七、删除事件
  • 八、墓碑事件
  • 九、截断事件
  • 十、Debezium技术总结

一、数据变更事件

Debezium MySQL 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 是围绕连续的事件消息流而设计的。然而,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的架构,或者,如果您使用架构注册表,则还包含消费者可用于从注册表获取架构的架构 ID。这使得每个事件都是独立的。

以下 JSON 框架显示了更改事件的基本四个部分。但是,您选择在应用程序中使用的 Kafka Connect 转换器的配置方式决定了这四个部分在更改事件中的表示。仅当您配置转换器来生成模式字段时,模式字段才会处于更改事件中。同样,仅当您配置转换器来生成事件键和事件负载时,事件键和事件负载才会出现在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:

{"schema": { 1...},"payload": { 2...},"schema": { 3...},"payload": { 4...},
}

表 7. 变更事件基本内容概述

序列字段描述
1schema第一个架构字段是事件键的一部分。它指定了一个 Kafka Connect 架构,该架构描述了事件键的有效负载部分中的内容。换句话说,第一个架构字段描述了已更改表的主键结构,或者如果表没有主键,则描述唯一键的结构。可以通过设置 message.key.columns 连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述由该属性标识的键的结构。
2payload第一个有效负载字段是事件键的一部分。它具有先前架构字段描述的结构,并且包含已更改的行的键。
3schema第二个架构字段是事件值的一部分。它指定 Kafka Connect 架构,该架构描述事件值的有效负载部分中的内容。换句话说,第二个架构描述了已更改的行的结构。通常,此模式包含嵌套模式。
4payload第二个有效负载字段是事件值的一部分。它具有先前架构字段描述的结构,并且包含已更改的行的实际数据。

默认情况下,连接器流将事件记录更改为名称与事件的原始表相同的主题。请参阅主题名称。

注意:

  • MySQL 连接器确保所有 Kafka Connect 架构名称都遵循 Avro 架构名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,则将其替换为下划线字符。
  • 如果逻辑服务器名称、数据库名称或表名称包含无效字符,并且唯一区分名称的字符无效并用下划线替换,则可能会导致意外冲突。

二、更改事件键

更改事件的键包含已更改表的键和已更改行的实际键的架构。连接器创建事件时,架构及其相应的负载都包含已更改表的主键(或唯一约束)中每一列的字段。

考虑以下客户表,后面是该表的更改事件键的示例。

CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(255) NOT NULL,last_name VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

捕获客户表更改的每个更改事件都具有相同的事件键架构。只要客户表具有先前的定义,捕获客户表更改的每个更改事件都具有以下键结构。在 JSON 中,它看起来像这样:

{"schema": { 1"type": "struct","name": "mysql-server-1.inventory.customers.Key", 2"optional": false, 3"fields": [ 4{"field": "id","type": "int32","optional": false}]},"payload": { 5"id": 1001}
}

表8. 变更事件键说明

序列字段描述
1schema密钥的架构部分指定 Kafka Connect 架构,该架构描述密钥的有效负载部分中的内容。
2mysql-server-1.inventory.customers.Key定义密钥有效负载结构的架构名称。此架构描述了已更改的表的主键结构。键架构名称的格式为连接器名称.数据库名称.表名称.Key。在这个例子中:mysql-server-1 是生成此事件的连接器的名称。inventory 是包含已更改的表的数据库。customer 是已更新的表。
3optional指示事件键是否必须在其负载字段中包含值。在此示例中,需要密钥有效负载中的值。当表没有主键时,键的有效负载字段中的值是可选的。
4fields指定负载中预期的每个字段,包括每个字段的名称、类型以及是否必需。
5payload包含生成此更改事件的行的键。在此示例中,键包含一个值为 1001 的 id 字段。

三、更改事件值

更改事件中的值比键稍微复杂一些。与键一样,值也具有模式部分和有效负载部分。模式部分包含描述有效负载部分的 Envelope 结构的模式,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都具有带有信封结构的值有效负载。

考虑用于显示更改事件键示例的相同示例表:

CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(255) NOT NULL,last_name VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

对此表的更改的更改事件的值部分描述为:

  • 创建事件
  • 更新事件
  • 主键更新
  • 删除事件
  • 墓碑事件
  • 截断事件

四、创建事件

以下示例显示连接器为在客户表中创建数据的操作生成的更改事件的值部分:

{"schema": { 1"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory.customers.Value", 2"field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory.customers.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "boolean","optional": true,"default": false,"field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "table"},{"type": "int64","optional": false,"field": "server_id"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "query"}],"optional": false,"name": "io.debezium.connector.mysql.Source", 3"field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "mysql-server-1.inventory.customers.Envelope" 4},"payload": { 5"op": "c", 6"ts_ms": 1465491411815, 7"before": null, 8"after": { 9"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": { 10"version": "2.5.3.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 0,"snapshot": false,"db": "inventory","table": "customers","server_id": 0,"gtid": null,"file": "mysql-bin.000003","pos": 154,"row": 0,"thread": 7,"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"}}
}

表 9. 创建事件值字段的描述

序列字段描述
1schema值的架构,描述值有效负载的结构。连接器为特定表生成的每个更改事件中,更改事件的值架构都是相同的。
2name在架构部分中,每个名称字段指定值的有效负载中字段的架构。mysql-server-1.inventory.customers.Value 是负载的 before 和 after 字段的架构。此模式特定于客户表。before 和 after 字段的架构名称的格式为 LogicalName.tableName.Value,这可确保架构名称在数据库中是唯一的。这意味着,当使用 Avro 转换器时,每个逻辑源中每个表的最终 Avro 模式都有其自己的演变和历史。
3nameio.debezium.connector.mysql.Source 是有效负载源字段的架构。此架构特定于 MySQL 连接器。连接器将其用于它生成的所有事件。
4namemysql-server-1.inventory.customers.Envelope 是负载整体结构的架构,其中 mysql-server-1 是连接器名称,inventory 是数据库,customers 是表。
5payload该值是实际数据。这是更改事件提供的信息。事件的 JSON 表示形式可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的架构和有效负载部分。但是,通过使用 Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。
6op强制字符串,描述导致连接器生成事件的操作类型。在此示例中,c 表示该操作创建了一行。有效值为:c=create u=update d = delete r =read(仅适用于快照)
7ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。
8before一个可选字段,指定事件发生之前行的状态。当 op 字段是 c(表示创建)时(如本例所示),before 字段为空,因为此更改事件是针对新内容的。
9after一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行的 id、first_name、last_name 和 email 列的值。
10source描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括:debezium版本,连接器名称,记录事件的 binlog 名称,二进制日志位置,事件内的行,如果事件是快照的一部分,包含新行的数据库和表的名称,创建事件的 MySQL 线程的 ID(仅限非快照),MySQL 服务器 ID(如果可用),数据库中发生更改的时间戳。如果启用了 binlog_rows_query_log_events MySQL 配置选项并且启用了连接器配置 include.query 属性,则源字段还提供查询字段,其中包含导致更改事件的原始 SQL 语句。在 MariaDB 上,配置选项是 binlog_annotate_row_events。

五、更新事件

示例客户表中更新的更改事件的值与该表的创建事件具有相同的架构。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。以下是连接器为更新客户表而生成的事件中的更改事件值的示例:

{"schema": { ... },"payload": {"before": { (1)"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"after": { (2)"id": 1004,"first_name": "Anne Marie","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": { (3)"version": "2.5.3.Final","name": "mysql-server-1","connector": "mysql","name": "mysql-server-1","ts_ms": 1465581029100,"snapshot": false,"db": "inventory","table": "customers","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 484,"row": 0,"thread": 7,"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"},"op": "u", (4)"ts_ms": 1465581029523  (5)}
}

表 10. 更新事件值字段说明

序列字段描述
1before一个可选字段,指定事件发生之前行的状态。在更新事件值中,before 字段包含每个表列的字段以及数据库提交之前该列中的值。在此示例中,first_name 值为 Anne。
2after一个可选字段,指定事件发生后行的状态。您可以比较前后结构以确定对此行的更新内容。在示例中,first_name 值现在是 Anne Marie。
3source描述事件源元数据的必填字段。源字段结构与创建事件中的字段相同,但有些值不同,例如样本更新事件来自 binlog 中的不同位置。源元数据包括:debezium版本;连接器名称;记录事件的 binlog 名称;二进制日志位置;事件内的行;如果事件是快照的一部分;包含更新行的数据库和表的名称;创建事件的 MySQL 线程的 ID(仅限非快照);MySQL 服务器 ID(如果可用);数据库中发生更改的时间戳。如果启用了 binlog_rows_query_log_events MySQL 配置选项并且启用了连接器配置 include.query 属性,则源字段还提供查询字段,其中包含导致更改事件的原始 SQL 语句。在 MariaDB 上,配置选项是 binlog_annotate_row_events。
4op描述操作类型的强制字符串。在更新事件值中,op 字段值为 u,表示该行因更新而更改。
5显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。

注意:

  • 更新行的主/唯一键的列会更改行的键的值。当键更改时,Debezium 输出三个事件:DELETE 事件和带有该行旧键的逻辑删除事件,然后是带有该行新键的事件。详细信息在下一节中。

六、主键更新

更改行的主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器将发出旧键的 DELETE 事件记录和新(更新的)键的 CREATE 事件记录,以代替 UPDATE 事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • DELETE 事件记录具有 __debezium.newkey 作为消息标头。此标头的值是更新行的新主键。
  • CREATE 事件记录将 __debezium.oldkey 作为消息头。此标头的值是更新的行所具有的先前(旧)主键。

七、删除事件

删除更改事件中的值与同一表的创建和更新事件具有相同的架构部分。示例客户表的删除事件中的有效负载部分如下所示:

{"schema": { ... },"payload": {"before": { (1)"id": 1004,"first_name": "Anne Marie","last_name": "Kretchmar","email": "annek@noanswer.org"},"after": null, (2)"source": { (3)"version": "2.5.3.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 1465581902300,"snapshot": false,"db": "inventory","table": "customers","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 805,"row": 0,"thread": 7,"query": "DELETE FROM customers WHERE id=1004"},"op": "d", (4)"ts_ms": 1465581902461 (5)}
}

表 11. 删除事件值字段说明

序列字段描述
1before可选字段,指定事件发生之前行的状态。在删除事件值中,before 字段包含通过数据库提交删除该行之前的值。
2after可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为空,表示该行不再存在。
3source描述事件源元数据的必填字段。在删除事件值中,源字段结构与同一表的创建和更新事件的源字段结构相同。许多源字段值也是相同的。在删除事件值中,ts_ms 和 pos 字段值以及其他值可能已更改。但删除事件值中的源字段提供相同的元数据:Debezium版本;连接器名称;记录事件的 binlog 名称;二进制日志位置;事件内的行;如果事件是快照的一部分;包含更新行的数据库和表的名称;创建事件的 MySQL 线程的 ID(仅限非快照);MySQL 服务器 ID(如果可用);数据库中发生更改的时间戳。如果启用了 binlog_rows_query_log_events MySQL 配置选项并且启用了连接器配置 include.query 属性,则源字段还提供查询字段,其中包含导致更改事件的原始 SQL 语句。在 MariaDB 上,配置选项是 binlog_annotate_row_events。
4op描述操作类型的强制字符串。 op字段值为d,表示该行被删除。
5ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。

删除更改事件记录为使用者提供处理删除该行所需的信息。包含旧值是因为某些消费者可能需要它们才能正确处理删除。

MySQL 连接器事件旨在与 Kafka 日志压缩配合使用。只要保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这使得 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并且可用于重新加载基于键的状态。

八、墓碑事件

当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除具有相同键的所有早期消息。但是,为了让 Kafka 删除具有相同键的所有消息,消息值必须为 null。为了实现这一点,在 Debezium 的 MySQL 连接器发出删除事件后,连接器会发出一个特殊的逻辑删除事件,该事件具有相同的键但为空值。

九、截断事件

截断更改事件表示表已被截断。在这种情况下,消息键为 null,消息值如下所示:

{"schema": { ... },"payload": {"source": { (1)"version": "2.5.3.Final","name": "mysql-server-1","connector": "mysql","name": "mysql-server-1","ts_ms": 1465581029100,"snapshot": false,"db": "inventory","table": "customers","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 484,"row": 0,"thread": 7,"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"},"op": "t", (2)"ts_ms": 1465581029523 (3)}
}

表 12. 截断事件值字段的描述

序列字段描述
1source描述事件源元数据的必填字段。在截断事件值中,源字段结构与同一表的创建、更新和删除事件相同,提供以下元数据:debezium版本;连接器类型和名称;记录事件的 Binlog 名称;二进制日志位置;事件内的行;如果事件是快照的一部分;数据库和表的名称;截断事件的 MySQL 线程的 ID(仅限非快照);MySQL 服务器 ID(如果可用);数据库中发生更改的时间戳
2op描述操作类型的强制字符串。 op字段值为t,表示该表被截断。
3ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。+ 在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。

如果单个 TRUNCATE 语句应用于多个表,则每个截断表都会发出一个截断更改事件记录。

请注意,由于截断事件表示对整个表所做的更改并且没有消息键,因此除非您正在处理具有单个分区的主题,否则与表相关的更改事件没有顺序保证(创建、更新等)并截断该表的事件。例如,当从不同分区读取这些事件时,消费者仅在该表的截断事件之后才可以接收更新事件。

十、Debezium技术总结

更多Debezium技术请参考:

  • Debezium技术专栏

这篇关于Debezium日常分享系列之:Debezium2.5稳定版本之Mysql连接器的数据变更事件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/833008

相关文章

一文详解MySQL索引(六张图彻底搞懂)

《一文详解MySQL索引(六张图彻底搞懂)》MySQL索引的建立对于MySQL的高效运行是很重要的,索引可以大大提高MySQL的检索速度,:本文主要介绍MySQL索引的相关资料,文中通过代码介绍的... 目录一、什么是索引?为什么需要索引?二、索引该用哪种数据结构?1. 哈希表2. 跳表3. 二叉排序树4.

MySQL批量替换数据库字符集的实用方法(附详细代码)

《MySQL批量替换数据库字符集的实用方法(附详细代码)》当需要修改数据库编码和字符集时,通常需要对其下属的所有表及表中所有字段进行修改,下面:本文主要介绍MySQL批量替换数据库字符集的实用方法... 目录前言为什么要批量修改字符集?整体脚本脚本逻辑解析1. 设置目标参数2. 生成修改表默认字符集的语句3

python库pydantic数据验证和设置管理库的用途

《python库pydantic数据验证和设置管理库的用途》pydantic是一个用于数据验证和设置管理的Python库,它主要利用Python类型注解来定义数据模型的结构和验证规则,本文给大家介绍p... 目录主要特点和用途:Field数值验证参数总结pydantic 是一个让你能够 confidentl

MySQL8.0临时表空间的使用及解读

《MySQL8.0临时表空间的使用及解读》MySQL8.0+引入会话级(temp_N.ibt)和全局(ibtmp1)InnoDB临时表空间,用于存储临时数据及事务日志,自动创建与回收,重启释放,管理高... 目录一、核心概念:为什么需要“临时表空间”?二、InnoDB 临时表空间的两种类型1. 会话级临时表

MySQL之复合查询使用及说明

《MySQL之复合查询使用及说明》文章讲解了SQL复合查询中emp、dept、salgrade三张表的使用,涵盖多表连接、自连接、子查询(单行/多行/多列)及合并查询(UNION/UNIONALL)等... 目录复合查询基本查询回顾多表查询笛卡尔积自连接子查询单行子查询多行子查询多列子查询在from子句中使

JAVA实现亿级千万级数据顺序导出的示例代码

《JAVA实现亿级千万级数据顺序导出的示例代码》本文主要介绍了JAVA实现亿级千万级数据顺序导出的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 前提:主要考虑控制内存占用空间,避免出现同时导出,导致主程序OOM问题。实现思路:A.启用线程池

MySQL使用EXISTS检查记录是否存在的详细过程

《MySQL使用EXISTS检查记录是否存在的详细过程》EXISTS是SQL中用于检查子查询是否返回至少一条记录的运算符,它通常用于测试是否存在满足特定条件的记录,从而在主查询中进行相应操作,本文给大... 目录基本语法示例数据库和表结构1. 使用 EXISTS 在 SELECT 语句中2. 使用 EXIS

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很