【Kafka】Kafka 1.0.1案例详解之Kafka Streams

2024-01-15 10:18
文章标签 详解 案例 1.0 kafka streams

本文主要是介绍【Kafka】Kafka 1.0.1案例详解之Kafka Streams,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这之前我们已经讲解了Kafka的安装部署和最核心的发布订阅功能,本次章节我们来介绍Kafka的新特性——Kafka Streams。

首先,要研究一样新东西,我们需要知道它是做什么的:

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

大家仔细阅读上面一段话可以知道,Kafka Streams是一个用来处理Kafka消息的库,它包含了如下几个优势:

  1. 通过与现有的Java应用整合,我们可以设计出简单的、轻量级的客户端类库

  2. 只需要基于Kafka自身的消息系统,不需要额外的第三方系统,就可以很容易地实现水平扩展

  3. 通过可容错的状态管理,实现高效的窗口操作和聚合

  4. 支持 exactly-once语义

  5. 既支持基于时间窗口的操作,也支持每次单条数据的处理

  6. 既支持低阶的流处理接口,也支持高阶的流处理DSL(领域专用语言)

Kafka Streams处理剖析图

8dedbcba9f7a942252b660624732bf8c.jpeg

案例剖析

说了这么多理论知识,实际上用起来很简单,接下来我们通过一个简单的例子来熟悉这个新特性。

添加依赖

kafka-streams是一个单独的依赖包,并不存在于kafka-client中

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>1.0.1</version>
</dependency>

属性配置

添加属性配置,application id相当于group id,bootstrap servers配置kafka的brokers地址,并配置key与value的序列化、反序列化实现类。这两个类均实现了

org.apache.kafka.common.serialization.Serde接口

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

读取并处理输出

最后通过StreamsBuilder来创建KStream,进行数据处理转换后输出到一个新的topic或者其他外部存储器中。

builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

退出机制

最后添加退出时的处理逻辑

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}
});

我们可以在github中查看完整的程序代码:

https://github.com/lubinsu/new-kafka

156c2c0d92e9b8ce6790e672bab69c68.jpeg

生活

岂止于美

f5b4ea41f58297ff793aaf16419c528f.jpeg

作者:苏鹭彬

长按二维码关注

这篇关于【Kafka】Kafka 1.0.1案例详解之Kafka Streams的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/608551

相关文章

HTML5 搜索框Search Box详解

《HTML5搜索框SearchBox详解》HTML5的搜索框是一个强大的工具,能够有效提升用户体验,通过结合自动补全功能和适当的样式,可以创建出既美观又实用的搜索界面,这篇文章给大家介绍HTML5... html5 搜索框(Search Box)详解搜索框是一个用于输入查询内容的控件,通常用于网站或应用程

Python中使用uv创建环境及原理举例详解

《Python中使用uv创建环境及原理举例详解》uv是Astral团队开发的高性能Python工具,整合包管理、虚拟环境、Python版本控制等功能,:本文主要介绍Python中使用uv创建环境及... 目录一、uv工具简介核心特点:二、安装uv1. 通过pip安装2. 通过脚本安装验证安装:配置镜像源(可

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

LiteFlow轻量级工作流引擎使用示例详解

《LiteFlow轻量级工作流引擎使用示例详解》:本文主要介绍LiteFlow是一个灵活、简洁且轻量的工作流引擎,适合用于中小型项目和微服务架构中的流程编排,本文给大家介绍LiteFlow轻量级工... 目录1. LiteFlow 主要特点2. 工作流定义方式3. LiteFlow 流程示例4. LiteF

CSS3中的字体及相关属性详解

《CSS3中的字体及相关属性详解》:本文主要介绍了CSS3中的字体及相关属性,详细内容请阅读本文,希望能对你有所帮助... 字体网页字体的三个来源:用户机器上安装的字体,放心使用。保存在第三方网站上的字体,例如Typekit和Google,可以link标签链接到你的页面上。保存在你自己Web服务器上的字

MySQL存储过程之循环遍历查询的结果集详解

《MySQL存储过程之循环遍历查询的结果集详解》:本文主要介绍MySQL存储过程之循环遍历查询的结果集,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言1. 表结构2. 存储过程3. 关于存储过程的SQL补充总结前言近来碰到这样一个问题:在生产上导入的数据发现

MyBatis ResultMap 的基本用法示例详解

《MyBatisResultMap的基本用法示例详解》在MyBatis中,resultMap用于定义数据库查询结果到Java对象属性的映射关系,本文给大家介绍MyBatisResultMap的基本... 目录MyBATis 中的 resultMap1. resultMap 的基本语法2. 简单的 resul

从基础到进阶详解Pandas时间数据处理指南

《从基础到进阶详解Pandas时间数据处理指南》Pandas构建了完整的时间数据处理生态,核心由四个基础类构成,Timestamp,DatetimeIndex,Period和Timedelta,下面我... 目录1. 时间数据类型与基础操作1.1 核心时间对象体系1.2 时间数据生成技巧2. 时间索引与数据

Mybatis Plus Join使用方法示例详解

《MybatisPlusJoin使用方法示例详解》:本文主要介绍MybatisPlusJoin使用方法示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,... 目录1、pom文件2、yaml配置文件3、分页插件4、示例代码:5、测试代码6、和PageHelper结合6

六个案例搞懂mysql间隙锁

《六个案例搞懂mysql间隙锁》MySQL中的间隙是指索引中两个索引键之间的空间,间隙锁用于防止范围查询期间的幻读,本文主要介绍了六个案例搞懂mysql间隙锁,具有一定的参考价值,感兴趣的可以了解一下... 目录概念解释间隙锁详解间隙锁触发条件间隙锁加锁规则案例演示案例一:唯一索引等值锁定存在的数据案例二: