从Apache Kafka读数据写入TimescaleDB的案例

2024-03-22 14:50

本文主要是介绍从Apache Kafka读数据写入TimescaleDB的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文链接:https://streamsets.com/blog/ingesting-data-apache-kafka-timescaledb/

  • 作者:Pat Patterson
  • 2019年5月28日
  • StreamSets新闻

时间序列数据库

时间序列数据库经过优化,可以处理按时间索引的数据,有效地处理特定时间范围内的数据查询。市场上有几个时间序列数据库,事实上,Data Collector长期以来一直有能力写入InfluxDB,但是我对TimescaleDB感兴趣的是它建立在PostgreSQL之上。完全披露:我作为Salesforce的开发人员传播者花了五年半的时间,而PostgreSQL曾经是,现在仍然是Heroku平台的核心部分,但我也喜欢PostgreSQL作为MySQL的强大替代品。

TimescaleDB入门

在听Diana的演示时,我运行了TimescaleDB Docker镜像,将笔记本电脑上的端口54321映射到Docker容器中的5432,这样就不会与我现有的PostgreSQL部署发生冲突。一旦戴安娜离开舞台,我就会浏览TimescaleDB快速入门的“创建Hypertables”部分,创建一个PostgreSQL数据库,为TimescaleDB启用它,并为其写入一行数据:

tutorial=# INSERT INTO conditions(time, location, temperature, humidity)
tutorial-#   VALUES (NOW(), 'office', 70.0, 50.0);
INSERT 0 1
tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time              | location | temperature | humidity 
-------------------------------+----------+-------------+----------2019-05-25 00:37:11.288536+00 | office   |          70 |       50
(1 row)

第一个TimescaleDB Pipeline

由于TimescaleDB是基于PostgreSQL构建的,因此标准的PostgreSQL JDBC驱动程序可以直接使用它。由于我已经在Data Collector中安装了驱动程序,因此我花了大约两分钟构建一个简单的测试管道来将第二行数据写入闪亮的新TimescaleDB服务器:

æ¶é´å°ºåº¦æµè¯ç®¡é

生成的测试结果数据

tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time            |         location          |    temperature     |      humidity      
----------------------------+---------------------------+--------------------+--------------------2020-12-25 23:35:43.889+00 | Grocery                   |  0.806543707847595 | 0.08446377515792852020-10-27 02:20:47.905+00 | Shoes                     | 0.0802439451217651 |  0.3988062143325812020-10-24 01:15:15.903+00 | Games & Industrial        |  0.577536821365356 |  0.4052745103836062020-10-22 02:32:21.916+00 | Baby                      | 0.0524919033050537 |  0.4990888833999632020-09-12 10:30:53.905+00 | Electronics & Garden      |  0.679168224334717 |  0.4276011586189272020-08-25 19:39:50.895+00 | Baby & Electronics        |  0.265614211559296 |  0.2746958136558532020-08-15 15:53:02.906+00 | Home                      | 0.0492082238197327 |  0.0466884374618532020-08-10 08:56:03.889+00 | Electronics, Home & Tools |  0.336894452571869 |  0.8480106592178342020-08-02 09:48:58.918+00 | Books & Jewelry           |  0.217794299125671 |  0.7347096204757692020-08-02 08:52:31.915+00 | Home                      |  0.931948065757751 |  0.499135136604309
(10 rows)

从Kafka到数据到TimescaleDB

时间序列数据库的主要用例之一是存储来自物联网的数据。我花了几分钟来编写一个简单的Python Kafka客户端,它可以模拟一组传感器,产生比我的测试管道更真实的温度和湿度数据:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random# Create a producer that JSON-encodes the message
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))# Send a quarter million data points (asynchronous)
for _ in range(250000):location = random.randint(1, 4)temperature = 95.0 + random.uniform(0, 10) + locationhumidity = 45.0 + random.uniform(0, 10) - locationproducer.send('timescale', {'location': location, 'temperature': temperature, 'humidity': humidity})# Block until all the messages have been sent
producer.flush()

请注意,模拟器会为位置发出一个整数值,而不会为数据添加时间戳。正如您所看到的,只是为了好玩,我让模拟器生成了25万个数据点。这足以使TimescaleDB运行一点,而不需要花费大量时间来生成。

我用Kafka Consumer替换了我的管道的Dev Data Generator源,并在管道中添加了几个处理器:

Kafka Timescale管é

Expression Evaluator只是为每条记录添加一个时间戳,使用一些表达式语言来创建正确的格式:

${time:extractStringFromDate(time:now(), 'yyyy-MM-dd HH:mm:ss.SSSZZ')}

静态查找处理器使用字符串替换整数位置字段以匹配TimescaleDB表模式:

éææ¥æ¾éç½®

结论

TimescaleDB给我留下了深刻的印象。拆箱经验快速无痛。虽然我只给了它最简单的轮胎踢,但一切都是第一次。TimescaleDB是基于PostgreSQL构建的,这使我可以轻松地使用现成的工具编写数据,并且我能够使用熟悉的SQL命令在数据处于超级状态时使用它们。

如果您正在使用TimescaleDB,请下载StreamSets Data Collector并尝试满足您的数据集成需求。与TimescaleDB的核心一样,它在Apache 2.0许可下以开源形式提供,可免费用于测试,开发和生产。

这篇关于从Apache Kafka读数据写入TimescaleDB的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/835510

相关文章

Java中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例解析

《Java中的分布式系统开发基于Zookeeper与Dubbo的应用案例解析》本文将通过实际案例,带你走进基于Zookeeper与Dubbo的分布式系统开发,本文通过实例代码给大家介绍的非常详... 目录Java 中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例一、分布式系统中的挑战二

Java 中的 equals 和 hashCode 方法关系与正确重写实践案例

《Java中的equals和hashCode方法关系与正确重写实践案例》在Java中,equals和hashCode方法是Object类的核心方法,广泛用于对象比较和哈希集合(如HashMa... 目录一、背景与需求分析1.1 equals 和 hashCode 的背景1.2 需求分析1.3 技术挑战1.4

Java中实现对象的拷贝案例讲解

《Java中实现对象的拷贝案例讲解》Java对象拷贝分为浅拷贝(复制值及引用地址)和深拷贝(递归复制所有引用对象),常用方法包括Object.clone()、序列化及JSON转换,需处理循环引用问题,... 目录对象的拷贝简介浅拷贝和深拷贝浅拷贝深拷贝深拷贝和循环引用总结对象的拷贝简介对象的拷贝,把一个

Java中最全最基础的IO流概述和简介案例分析

《Java中最全最基础的IO流概述和简介案例分析》JavaIO流用于程序与外部设备的数据交互,分为字节流(InputStream/OutputStream)和字符流(Reader/Writer),处理... 目录IO流简介IO是什么应用场景IO流的分类流的超类类型字节文件流应用简介核心API文件输出流应用文

MyBatis分页查询实战案例完整流程

《MyBatis分页查询实战案例完整流程》MyBatis是一个强大的Java持久层框架,支持自定义SQL和高级映射,本案例以员工工资信息管理为例,详细讲解如何在IDEA中使用MyBatis结合Page... 目录1. MyBATis框架简介2. 分页查询原理与应用场景2.1 分页查询的基本原理2.1.1 分

深度解析Java @Serial 注解及常见错误案例

《深度解析Java@Serial注解及常见错误案例》Java14引入@Serial注解,用于编译时校验序列化成员,替代传统方式解决运行时错误,适用于Serializable类的方法/字段,需注意签... 目录Java @Serial 注解深度解析1. 注解本质2. 核心作用(1) 主要用途(2) 适用位置3

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

Java 正则表达式的使用实战案例

《Java正则表达式的使用实战案例》本文详细介绍了Java正则表达式的使用方法,涵盖语法细节、核心类方法、高级特性及实战案例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、正则表达式语法详解1. 基础字符匹配2. 字符类([]定义)3. 量词(控制匹配次数)4. 边

Python Counter 函数使用案例

《PythonCounter函数使用案例》Counter是collections模块中的一个类,专门用于对可迭代对象中的元素进行计数,接下来通过本文给大家介绍PythonCounter函数使用案例... 目录一、Counter函数概述二、基本使用案例(一)列表元素计数(二)字符串字符计数(三)元组计数三、C

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐