Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性

2024-02-27 02:38

本文主要是介绍Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ksqlDB 是学习和开发 kafka 流式计算的很方便的工具。它支持 Push Query 和 Pull Query。下面是一些 Pull Query 的测试。

测试对象

我建立了下面的 stream 作为测试对象:

CREATE OR REPLACE STREAM tagvalue (tagId INT, value DOUBLE)WITH (kafka_topic='tagvalue', value_format='json', partitions=1);

插入数据

INSERT INTO tagvalue (tagId, value) VALUES (1, 11000);
INSERT INTO tagvalue (tagId, value) VALUES (2, 10000);

执行 Pull Query

直接对 stream 执行 pull query

SELECT *
FROM tagvalue;

系统如下。系统要求必须有 where 语句。
在这里插入图片描述
增加 where 语句:

SELECT *
FROM tagvalue
WHERE tagId = 1;

系统提示如下。系统说我们的 stream 没有主键。
在这里插入图片描述
由于不可能修改 stream 的 schema,我们使用系统推荐的方法,改变如下配置为 true,再次执行查询得到如下提示:

在这里插入图片描述
系统的提示是 不能对 stream 执行 pull query.

对 table 执行 pull query

创建基于 tagvalue topic 的 table:

CREATE OR REPLACE TABLE tagvalueview (tagId INT PRIMARY KEY, value DOUBLE)WITH (kafka_topic='tagvalue', value_format='json', partitions=1);

执行 pull query:

select *
from tagvalueview;

得到如下结果。系统不能直接查询基于 topic 创建的 table
在这里插入图片描述
按照系统提示创建能查询的 table:

CREATE TABLE QUERYABLE_TAGVALUEVIEW AS SELECT * FROM TAGVALUEVIEW

这时候系统增加了一个新的topic:
在这里插入图片描述

select *
from QUERYABLE_TAGVALUEVIEW ;

以下是执行结果。我们可以看到,系统什么都没返回:

在这里插入图片描述
我们再创建一个实时统计 tag 数值数量的 table:

CREATE OR REPLACE TABLE tagvalueview ASSELECT tagId, count(*)FROM tagvalueGROUP BY tagIdEMIT CHANGES;

执行以下查询:

select *
from tagvalueview;

得到查询结果:
在这里插入图片描述

结论

Pull Query 只能在 table 上执行,而且是 queryable table. Pull Query 结合 table 可以帮助开发者统计已有数据的结果。

这篇关于Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/750962

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

基于Python开发Windows自动更新控制工具

《基于Python开发Windows自动更新控制工具》在当今数字化时代,操作系统更新已成为计算机维护的重要组成部分,本文介绍一款基于Python和PyQt5的Windows自动更新控制工具,有需要的可... 目录设计原理与技术实现系统架构概述数学建模工具界面完整代码实现技术深度分析多层级控制理论服务层控制注

基于Go语言开发一个 IP 归属地查询接口工具

《基于Go语言开发一个IP归属地查询接口工具》在日常开发中,IP地址归属地查询是一个常见需求,本文将带大家使用Go语言快速开发一个IP归属地查询接口服务,有需要的小伙伴可以了解下... 目录功能目标技术栈项目结构核心代码(main.go)使用方法扩展功能总结在日常开发中,IP 地址归属地查询是一个常见需求:

Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧

《Python函数的基本用法、返回值特性、全局变量修改及异常处理技巧》本文将通过实际代码示例,深入讲解Python函数的基本用法、返回值特性、全局变量修改以及异常处理技巧,感兴趣的朋友跟随小编一起看看... 目录一、python函数定义与调用1.1 基本函数定义1.2 函数调用二、函数返回值详解2.1 有返

使用python制作一款文件粉碎工具

《使用python制作一款文件粉碎工具》这篇文章主要为大家详细介绍了如何使用python制作一款文件粉碎工具,能够有效粉碎密码文件和机密Excel表格等,感兴趣的小伙伴可以了解一下... 文件粉碎工具:适用于粉碎密码文件和机密的escel表格等等,主要作用就是防止 别人用数据恢复大师把你刚删除的机密的文件恢

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

Python实战之SEO优化自动化工具开发指南

《Python实战之SEO优化自动化工具开发指南》在数字化营销时代,搜索引擎优化(SEO)已成为网站获取流量的重要手段,本文将带您使用Python开发一套完整的SEO自动化工具,需要的可以了解下... 目录前言项目概述技术栈选择核心模块实现1. 关键词研究模块2. 网站技术seo检测模块3. 内容优化分析模

Python文本相似度计算的方法大全

《Python文本相似度计算的方法大全》文本相似度是指两个文本在内容、结构或语义上的相近程度,通常用0到1之间的数值表示,0表示完全不同,1表示完全相同,本文将深入解析多种文本相似度计算方法,帮助您选... 目录前言什么是文本相似度?1. Levenshtein 距离(编辑距离)核心公式实现示例2. Jac

Python利用PySpark和Kafka实现流处理引擎构建指南

《Python利用PySpark和Kafka实现流处理引擎构建指南》本文将深入解剖基于Python的实时处理黄金组合:Kafka(分布式消息队列)与PySpark(分布式计算引擎)的化学反应,并构建一... 目录引言:数据洪流时代的生存法则第一章 Kafka:数据世界的中央神经系统消息引擎核心设计哲学高吞吐