Debezium日常分享系列之:Debezium and TimescaleDB

2024-01-16 18:28

本文主要是介绍Debezium日常分享系列之:Debezium and TimescaleDB,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Debezium日常分享系列之:Debezium and TimescaleDB

  • 一、TimescaleDB
  • 二、完整案例
  • 三、Hypertables
  • 四、Continuous aggregates
  • 五、Compression
  • 六、结论

一、TimescaleDB

TimescaleDB 是一个开源数据库,旨在使 SQL 对于时间序列数据具有可扩展性。它是作为 PostgreSQL 数据库的扩展实现的。这一事实促使我们重新使用标准 Debezium PostgreSQL 连接器,并将 TimescaleDB 支持实现为单个消息转换 (SMT)。

TimescaleDB 提供了三个基本构建块/概念:

  • Hypertables
  • Continuous aggregates
  • Compression

描述实例定义的元数据(目录)和原始数据通常存储在 _timescaledb_internal_schema 中。TimescaleDb SMT 连接到数据库并读取和处理元数据。然后,从数据库读取的原始消息会使用存储在 Kafka Connect 标头中的元数据进行丰富,从而创建物理数据和 TimescaleDB 逻辑结构之间的关系。

二、完整案例

Debezium 示例存储库包含基于 Docker Compose 的部署,该部署提供了完整的环境来演示 TimescaleDB 集成。

第一步,开始部署

$ docker-compose -f docker-compose-timescaledb.yaml up --build

该命令将启动 Debezium(Zookeeper、Kafka、Kafka Connect)和源 TimescaleDB 数据库。

启动的数据库已准备好以下数据库对象:

  • 将温度和湿度测量值表示为时间序列数据的超稳定条件;使用 DDL
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time')
  • 测量数据的单一记录
INSERT INTO conditions VALUES(NOW(), 'Prague', 22.8, 53.3)

PostgreSQL 出版物用于将时间序列数据发布到复制槽中,因为演示使用 pgoutput 解码插件

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

下一步需要注册 Debezium PostgreSQL 连接器以捕获数据库中的更改

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-timescaledb.yaml

注册请求文件与常规文件不同,增加了这些行

{"name": "inventory-connector","config": {
..."schema.include.list": "_timescaledb_internal","transforms": "timescaledb","transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb","transforms.timescaledb.database.hostname": "timescaledb","transforms.timescaledb.database.port": "5432","transforms.timescaledb.database.user": "postgres","transforms.timescaledb.database.password": "postgres","transforms.timescaledb.database.dbname": "postgres"}
}

三、Hypertables

连接器将捕获内部 TimescaleDB 架构以及包含原始数据的物理表,并且将应用 TimescaleDb SMT 来丰富消息并根据逻辑名称将它们路由到正确命名的主题。 SMT 配置选项包含连接到数据库所需的信息。在这种情况下,条件超表将物理存储在 _timescaledb_internal._hyper_1_1_chunk 中,并且当由 SMT 处理时,它将重新路由到根据固定配置的前缀 timescaledb 和逻辑名称 public.conditions 命名的 timescaledb.public.conditions 主题符合超表名称。

让我们在表中添加更多测量值

 docker-compose -f docker-compose-timescaledb.yaml exec timescaledb env PGOPTIONS="--search_path=public" bash -c 'psql -U $POSTGRES_USER postgres'
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 30, 50);
postgres=# INSERT INTO conditions VALUES (now(), 'Brno', 35, 55);
postgres=# INSERT INTO conditions VALUES (now(), 'Prague', 40, 60);

并读取捕获的主题消息(在命令中启用打印密钥和标题)

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions

这些消息包含两个标头 debezium_timescaledb_chunk_table:_hyper_1_1_chunk、debezium_timescaledb_chunk_schema:_timescaledb_internal,它们描述了逻辑超表名称与从中捕获它们的物理源表之间的映射。

四、Continuous aggregates

连续聚合对存储在超表中的数据提供自动统计计算。聚合被定义为物化视图,由其自己的超表支持,而超表又由一组物理表支持。重新计算聚合后(手动或自动),新值将存储在超表中,可以从中捕获和流式传输这些值。连接器捕获物理表中的新值,SMT 通过将物理目标重新映射回聚合逻辑名称来再次解决路由问题。还添加了带有原始超表和物理表名称的 Kafka Connect 标头。

让我们创建一个名为conditions_summary的连续聚合,用于计算每个位置和时间间隔的平均、最低和最高温度

postgres=# CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) ASSELECTlocation,time_bucket(INTERVAL '1 hour', time) AS bucket,AVG(temperature),MAX(temperature),MIN(temperature)FROM conditionsGROUP BY location, bucket;

并阅读捕获的主题消息

docker-compose -f docker-compose-timescaledb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning \--property print.key=true \--property print.headers=true \--topic timescaledb.public.conditions_summary

这些消息包含两个标头 debezium_timescaledb_hypertable_table:_materialized_hypertable_2,debezium_timescaledb_hypertable_schema:_timescaledb_internal 公开哪个支持超表用于存储聚合,以及两个附加标头 debezium_timescaledb_chunk_table:_hyper_2_2_chunk,debezium_timescaledb_chunk_schema:_timescaledb_internal 公开存储聚合的物理表。

`__debezium_timescaledb_chunk_table:_hyper_1_1_chunk,__debezium_timescaledb_chunk_schema:_timescaledb_internal` that describes the mapping between the logical hypertable name and the physical source table from which they were captured.

如果添加新的测量并触发聚合重新计算,则更新的聚合将发送到主题

postgres=# INSERT INTO conditions VALUES (now(), 'Ostrava', 10, 50);
postgres=# CALL refresh_continuous_aggregate('conditions_summary', CURRENT_DATE, CURRENT_DATE + 1);

看起来像

{"schema":{
...},"payload":{"before":null,"after":{"location":"Ostrava","bucket":"2024-01-09T13:00:00.000000Z","avg":10.0,"max":10.0,"min":10.0},"source":{"version":"2.5.0.Final","connector":"postgresql","name":"dbserver1","ts_ms":1704806938840,"snapshot":"false","db":"postgres","sequence":"[\"29727872\",\"29728440\"]","schema":"public","table":"conditions_summary","txId":764,"lsn":29728440,"xmin":null},"op":"c","ts_ms":1704806939163,"transaction":null}
}

因此,该主题包含针对两个不同位置计算的两条或多条消息。

五、Compression

TimescaleDB SMT 不会增强压缩数据块(物理表记录),而只是将其作为存储在超表中的副产品。压缩后的数据被捕获并存储在 Kafka 主题中。通常,带有压缩块的消息会被丢弃,并且不会被管道中的后续作业处理。

让我们为超表启用压缩并压缩它

postgres=# ALTER TABLE conditions SET (timescaledb.compress, timescaledb.compress_segment by = 'location');
postgres=# SELECT show_chunks('conditions');show_chunks
----------------------------------------_timescaledb_internal._hyper_1_1_chunk
(1 row)postgres=# SELECT compress_chunk( '_timescaledb_internal._hyper_1_1_chunk');

消息写入 timescaledb._timescaledb_internal._compressed_hypertable_3。

停止服务

docker-compose -f docker-compose-timescaledb.yaml down

六、结论

在这篇文章中,我们演示了从 TimescaleDB 时间序列数据库捕获数据以及通过 TimescaleDb SMT 对其进行处理。我们已经展示了如何根据作为数据源的超表和连续聚合来路由和丰富消息。

深入了解Debezium请阅读博主专栏:

  • Debezium专栏

这篇关于Debezium日常分享系列之:Debezium and TimescaleDB的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/613500

相关文章

Python虚拟环境与Conda使用指南分享

《Python虚拟环境与Conda使用指南分享》:本文主要介绍Python虚拟环境与Conda使用指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、python 虚拟环境概述1.1 什么是虚拟环境1.2 为什么需要虚拟环境二、Python 内置的虚拟环境工具

Python处理大量Excel文件的十个技巧分享

《Python处理大量Excel文件的十个技巧分享》每天被大量Excel文件折磨的你看过来!这是一份Python程序员整理的实用技巧,不说废话,直接上干货,文章通过代码示例讲解的非常详细,需要的朋友可... 目录一、批量读取多个Excel文件二、选择性读取工作表和列三、自动调整格式和样式四、智能数据清洗五、

JDK9到JDK21中值得掌握的29个实用特性分享

《JDK9到JDK21中值得掌握的29个实用特性分享》Java的演进节奏从JDK9开始显著加快,每半年一个新版本的发布节奏为Java带来了大量的新特性,本文整理了29个JDK9到JDK21中值得掌握的... 目录JDK 9 模块化与API增强1. 集合工厂方法:一行代码创建不可变集合2. 私有接口方法:接口

电脑系统Hosts文件原理和应用分享

《电脑系统Hosts文件原理和应用分享》Hosts是一个没有扩展名的系统文件,当用户在浏览器中输入一个需要登录的网址时,系统会首先自动从Hosts文件中寻找对应的IP地址,一旦找到,系统会立即打开对应... Hosts是一个没有扩展名的系统文件,可以用记事本等工具打开,其作用就是将一些常用的网址域名与其对应

SpringBoot请求参数接收控制指南分享

《SpringBoot请求参数接收控制指南分享》:本文主要介绍SpringBoot请求参数接收控制指南,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring Boot 请求参数接收控制指南1. 概述2. 有注解时参数接收方式对比3. 无注解时接收参数默认位置

Python通过模块化开发优化代码的技巧分享

《Python通过模块化开发优化代码的技巧分享》模块化开发就是把代码拆成一个个“零件”,该封装封装,该拆分拆分,下面小编就来和大家简单聊聊python如何用模块化开发进行代码优化吧... 目录什么是模块化开发如何拆分代码改进版:拆分成模块让模块更强大:使用 __init__.py你一定会遇到的问题模www.

Python解析器安装指南分享(Mac/Windows/Linux)

《Python解析器安装指南分享(Mac/Windows/Linux)》:本文主要介绍Python解析器安装指南(Mac/Windows/Linux),具有很好的参考价值,希望对大家有所帮助,如有... 目NMNkN录1js. 安装包下载1.1 python 下载官网2.核心安装方式3. MACOS 系统安

Java嵌套for循环优化方案分享

《Java嵌套for循环优化方案分享》介绍了Java中嵌套for循环的优化方法,包括减少循环次数、合并循环、使用更高效的数据结构、并行处理、预处理和缓存、算法优化、尽量减少对象创建以及本地变量优化,通... 目录Java 嵌套 for 循环优化方案1. 减少循环次数2. 合并循环3. 使用更高效的数据结构4

Python中常用的四种取整方式分享

《Python中常用的四种取整方式分享》在数据处理和数值计算中,取整操作是非常常见的需求,Python提供了多种取整方式,本文为大家整理了四种常用的方法,希望对大家有所帮助... 目录引言向零取整(Truncate)向下取整(Floor)向上取整(Ceil)四舍五入(Round)四种取整方式的对比综合示例应

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D