【Kafka】Kafka 1.0.1案例详解之Kafka Streams

2024-01-15 10:18
文章标签 详解 案例 1.0 kafka streams

本文主要是介绍【Kafka】Kafka 1.0.1案例详解之Kafka Streams,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这之前我们已经讲解了Kafka的安装部署和最核心的发布订阅功能,本次章节我们来介绍Kafka的新特性——Kafka Streams。

首先,要研究一样新东西,我们需要知道它是做什么的:

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

大家仔细阅读上面一段话可以知道,Kafka Streams是一个用来处理Kafka消息的库,它包含了如下几个优势:

  1. 通过与现有的Java应用整合,我们可以设计出简单的、轻量级的客户端类库

  2. 只需要基于Kafka自身的消息系统,不需要额外的第三方系统,就可以很容易地实现水平扩展

  3. 通过可容错的状态管理,实现高效的窗口操作和聚合

  4. 支持 exactly-once语义

  5. 既支持基于时间窗口的操作,也支持每次单条数据的处理

  6. 既支持低阶的流处理接口,也支持高阶的流处理DSL(领域专用语言)

Kafka Streams处理剖析图

8dedbcba9f7a942252b660624732bf8c.jpeg

案例剖析

说了这么多理论知识,实际上用起来很简单,接下来我们通过一个简单的例子来熟悉这个新特性。

添加依赖

kafka-streams是一个单独的依赖包,并不存在于kafka-client中

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>1.0.1</version>
</dependency>

属性配置

添加属性配置,application id相当于group id,bootstrap servers配置kafka的brokers地址,并配置key与value的序列化、反序列化实现类。这两个类均实现了

org.apache.kafka.common.serialization.Serde接口

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

读取并处理输出

最后通过StreamsBuilder来创建KStream,进行数据处理转换后输出到一个新的topic或者其他外部存储器中。

builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

退出机制

最后添加退出时的处理逻辑

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}
});

我们可以在github中查看完整的程序代码:

https://github.com/lubinsu/new-kafka

156c2c0d92e9b8ce6790e672bab69c68.jpeg

生活

岂止于美

f5b4ea41f58297ff793aaf16419c528f.jpeg

作者:苏鹭彬

长按二维码关注

这篇关于【Kafka】Kafka 1.0.1案例详解之Kafka Streams的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/608551

相关文章

HTML5的input标签的`type`属性值详解和代码示例

《HTML5的input标签的`type`属性值详解和代码示例》HTML5的`input`标签提供了多种`type`属性值,用于创建不同类型的输入控件,满足用户输入的多样化需求,从文本输入、密码输入、... 目录一、引言二、文本类输入类型2.1 text2.2 password2.3 textarea(严格

C++ move 的作用详解及陷阱最佳实践

《C++move的作用详解及陷阱最佳实践》文章详细介绍了C++中的`std::move`函数的作用,包括为什么需要它、它的本质、典型使用场景、以及一些常见陷阱和最佳实践,感兴趣的朋友跟随小编一起看... 目录C++ move 的作用详解一、一句话总结二、为什么需要 move?C++98/03 的痛点⚡C++

MySQL中between and的基本用法、范围查询示例详解

《MySQL中betweenand的基本用法、范围查询示例详解》BETWEENAND操作符在MySQL中用于选择在两个值之间的数据,包括边界值,它支持数值和日期类型,示例展示了如何使用BETWEEN... 目录一、between and语法二、使用示例2.1、betwphpeen and数值查询2.2、be

python中的flask_sqlalchemy的使用及示例详解

《python中的flask_sqlalchemy的使用及示例详解》文章主要介绍了在使用SQLAlchemy创建模型实例时,通过元类动态创建实例的方式,并说明了如何在实例化时执行__init__方法,... 目录@orm.reconstructorSQLAlchemy的回滚关联其他模型数据库基本操作将数据添

Java中ArrayList与顺序表示例详解

《Java中ArrayList与顺序表示例详解》顺序表是在计算机内存中以数组的形式保存的线性表,是指用一组地址连续的存储单元依次存储数据元素的线性结构,:本文主要介绍Java中ArrayList与... 目录前言一、Java集合框架核心接口与分类ArrayList二、顺序表数据结构中的顺序表三、常用代码手动

JAVA线程的周期及调度机制详解

《JAVA线程的周期及调度机制详解》Java线程的生命周期包括NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING和TERMINATED,线程调度依赖操作系统,采用抢占... 目录Java线程的生命周期线程状态转换示例代码JAVA线程调度机制优先级设置示例注意事项JAVA线程

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

C++构造函数中explicit详解

《C++构造函数中explicit详解》explicit关键字用于修饰单参数构造函数或可以看作单参数的构造函数,阻止编译器进行隐式类型转换或拷贝初始化,本文就来介绍explicit的使用,感兴趣的可以... 目录1. 什么是explicit2. 隐式转换的问题3.explicit的使用示例基本用法多参数构造

Springboot3 ResponseEntity 完全使用案例

《Springboot3ResponseEntity完全使用案例》ResponseEntity是SpringBoot中控制HTTP响应的核心工具——它能让你精准定义响应状态码、响应头、响应体,相比... 目录Spring Boot 3 ResponseEntity 完全使用教程前置准备1. 项目基础依赖(M

Android使用java实现网络连通性检查详解

《Android使用java实现网络连通性检查详解》这篇文章主要为大家详细介绍了Android使用java实现网络连通性检查的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录NetCheck.Java(可直接拷贝)使用示例(Activity/Fragment 内)权限要求