从Apache Kafka读数据写入TimescaleDB的案例

2024-03-22 14:50

本文主要是介绍从Apache Kafka读数据写入TimescaleDB的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文链接:https://streamsets.com/blog/ingesting-data-apache-kafka-timescaledb/

  • 作者:Pat Patterson
  • 2019年5月28日
  • StreamSets新闻

时间序列数据库

时间序列数据库经过优化,可以处理按时间索引的数据,有效地处理特定时间范围内的数据查询。市场上有几个时间序列数据库,事实上,Data Collector长期以来一直有能力写入InfluxDB,但是我对TimescaleDB感兴趣的是它建立在PostgreSQL之上。完全披露:我作为Salesforce的开发人员传播者花了五年半的时间,而PostgreSQL曾经是,现在仍然是Heroku平台的核心部分,但我也喜欢PostgreSQL作为MySQL的强大替代品。

TimescaleDB入门

在听Diana的演示时,我运行了TimescaleDB Docker镜像,将笔记本电脑上的端口54321映射到Docker容器中的5432,这样就不会与我现有的PostgreSQL部署发生冲突。一旦戴安娜离开舞台,我就会浏览TimescaleDB快速入门的“创建Hypertables”部分,创建一个PostgreSQL数据库,为TimescaleDB启用它,并为其写入一行数据:

tutorial=# INSERT INTO conditions(time, location, temperature, humidity)
tutorial-#   VALUES (NOW(), 'office', 70.0, 50.0);
INSERT 0 1
tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time              | location | temperature | humidity 
-------------------------------+----------+-------------+----------2019-05-25 00:37:11.288536+00 | office   |          70 |       50
(1 row)

第一个TimescaleDB Pipeline

由于TimescaleDB是基于PostgreSQL构建的,因此标准的PostgreSQL JDBC驱动程序可以直接使用它。由于我已经在Data Collector中安装了驱动程序,因此我花了大约两分钟构建一个简单的测试管道来将第二行数据写入闪亮的新TimescaleDB服务器:

æ¶é´å°ºåº¦æµè¯ç®¡é

生成的测试结果数据

tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time            |         location          |    temperature     |      humidity      
----------------------------+---------------------------+--------------------+--------------------2020-12-25 23:35:43.889+00 | Grocery                   |  0.806543707847595 | 0.08446377515792852020-10-27 02:20:47.905+00 | Shoes                     | 0.0802439451217651 |  0.3988062143325812020-10-24 01:15:15.903+00 | Games & Industrial        |  0.577536821365356 |  0.4052745103836062020-10-22 02:32:21.916+00 | Baby                      | 0.0524919033050537 |  0.4990888833999632020-09-12 10:30:53.905+00 | Electronics & Garden      |  0.679168224334717 |  0.4276011586189272020-08-25 19:39:50.895+00 | Baby & Electronics        |  0.265614211559296 |  0.2746958136558532020-08-15 15:53:02.906+00 | Home                      | 0.0492082238197327 |  0.0466884374618532020-08-10 08:56:03.889+00 | Electronics, Home & Tools |  0.336894452571869 |  0.8480106592178342020-08-02 09:48:58.918+00 | Books & Jewelry           |  0.217794299125671 |  0.7347096204757692020-08-02 08:52:31.915+00 | Home                      |  0.931948065757751 |  0.499135136604309
(10 rows)

从Kafka到数据到TimescaleDB

时间序列数据库的主要用例之一是存储来自物联网的数据。我花了几分钟来编写一个简单的Python Kafka客户端,它可以模拟一组传感器,产生比我的测试管道更真实的温度和湿度数据:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random# Create a producer that JSON-encodes the message
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))# Send a quarter million data points (asynchronous)
for _ in range(250000):location = random.randint(1, 4)temperature = 95.0 + random.uniform(0, 10) + locationhumidity = 45.0 + random.uniform(0, 10) - locationproducer.send('timescale', {'location': location, 'temperature': temperature, 'humidity': humidity})# Block until all the messages have been sent
producer.flush()

请注意,模拟器会为位置发出一个整数值,而不会为数据添加时间戳。正如您所看到的,只是为了好玩,我让模拟器生成了25万个数据点。这足以使TimescaleDB运行一点,而不需要花费大量时间来生成。

我用Kafka Consumer替换了我的管道的Dev Data Generator源,并在管道中添加了几个处理器:

Kafka Timescale管é

Expression Evaluator只是为每条记录添加一个时间戳,使用一些表达式语言来创建正确的格式:

${time:extractStringFromDate(time:now(), 'yyyy-MM-dd HH:mm:ss.SSSZZ')}

静态查找处理器使用字符串替换整数位置字段以匹配TimescaleDB表模式:

éææ¥æ¾éç½®

结论

TimescaleDB给我留下了深刻的印象。拆箱经验快速无痛。虽然我只给了它最简单的轮胎踢,但一切都是第一次。TimescaleDB是基于PostgreSQL构建的,这使我可以轻松地使用现成的工具编写数据,并且我能够使用熟悉的SQL命令在数据处于超级状态时使用它们。

如果您正在使用TimescaleDB,请下载StreamSets Data Collector并尝试满足您的数据集成需求。与TimescaleDB的核心一样,它在Apache 2.0许可下以开源形式提供,可免费用于测试,开发和生产。

这篇关于从Apache Kafka读数据写入TimescaleDB的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:https://blog.csdn.net/zwahut/article/details/90691758
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/835510

相关文章

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Linux中修改Apache HTTP Server(httpd)默认端口的完整指南

《Linux中修改ApacheHTTPServer(httpd)默认端口的完整指南》ApacheHTTPServer(简称httpd)是Linux系统中最常用的Web服务器之一,本文将详细介绍如何... 目录一、修改 httpd 默认端口的步骤1. 查找 httpd 配置文件路径2. 编辑配置文件3. 保存

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

MySQL 表的内外连接案例详解

《MySQL表的内外连接案例详解》本文给大家介绍MySQL表的内外连接,结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录表的内外连接(重点)内连接外连接表的内外连接(重点)内连接内连接实际上就是利用where子句对两种表形成的笛卡儿积进行筛选,我

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

apache的commons-pool2原理与使用实践记录

《apache的commons-pool2原理与使用实践记录》ApacheCommonsPool2是一个高效的对象池化框架,通过复用昂贵资源(如数据库连接、线程、网络连接)优化系统性能,这篇文章主... 目录一、核心原理与组件二、使用步骤详解(以数据库连接池为例)三、高级配置与优化四、典型应用场景五、注意事

Java Stream.reduce()方法操作实际案例讲解

《JavaStream.reduce()方法操作实际案例讲解》reduce是JavaStreamAPI中的一个核心操作,用于将流中的元素组合起来产生单个结果,:本文主要介绍JavaStream.... 目录一、reduce的基本概念1. 什么是reduce操作2. reduce方法的三种形式二、reduce

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

Spring Boot 整合 Redis 实现数据缓存案例详解

《SpringBoot整合Redis实现数据缓存案例详解》Springboot缓存,默认使用的是ConcurrentMap的方式来实现的,然而我们在项目中并不会这么使用,本文介绍SpringB... 目录1.添加 Maven 依赖2.配置Redis属性3.创建 redisCacheManager4.使用Sp

springboot项目redis缓存异常实战案例详解(提供解决方案)

《springboot项目redis缓存异常实战案例详解(提供解决方案)》redis基本上是高并发场景上会用到的一个高性能的key-value数据库,属于nosql类型,一般用作于缓存,一般是结合数据... 目录缓存异常实践案例缓存穿透问题缓存击穿问题(其中也解决了穿透问题)完整代码缓存异常实践案例Red