kafka 消费者进行消费数据的各种场景的API(你值得一看)

2024-05-29 07:48

本文主要是介绍kafka 消费者进行消费数据的各种场景的API(你值得一看),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 kafka消费端的参数

 

二 实现案例

2.1 订阅某个主题

创建一个独立消费者,消费 kafka-ljf 主题中数据。
注意: 在消费者 API 代码中必须配置消费者组 id 。命令行启动消费者不填写消费者组
id 会被自动填写随机的消费者组 id
2.消费者代码
package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerTopicDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 14:02:05* @Version: V1.0**/
public class ConsumerTopicDemo {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");// 创建消费者对象KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<String, String>(properties);// 注册要消费的主题(可以消费多个主题)ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

3.执行生产者产生数据

 4.消费数据,观察

2.2 订阅某个主题下的某个分区

需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。

2.代码

 

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerPartitionDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 14:55:31* @Version: V1.0**/
public class ConsumerPartitionDemo {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据,0号分区ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("kafka-ljf", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

 3.生产者生产数据

 4.消费者消费

可见只消费了0号分区上的数据 

2.3  消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

1.consumer代码复制一份,变为两个消费者

 

2. 消费者2:

 3.生产者:

 

4.查看消费者消费信息

 5.查看消费者2消费信息

 结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。

2.4  指定offset消费

auto.offset.reset = earliest | latest | none   其中 默认是 latest
Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
1 earliest :自动将偏移量重置为最早的偏移量,相当于   --from-beginning
(2) latest (默认值) :自动将偏移量重置为最新偏移量。
(3) none :如果未找到消费者组的先前偏移量,则向消费者抛出异常。

 (4)任意指定 offset 位移开始消费

代码:

具体代码:

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;/*** @ClassName: ConsumerSeekDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:08:01* @Version: V1.0**/
public class ConsumerSeekDemo {public static void main(String[] args) {// 0 配置信息Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment= new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}// 遍历所有分区,并指定 offset 从 10 的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 10);}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

 结果:

可以看到都是从0,1分区中,offset为10的位置开始查询的。 

注意:每次执行完,需要修改消费者组名;

2.4  指定offset设置为earliest

auto.offset.reset = earliest | latest | none   其中默认是 latest。本案例设置为earliest。

注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字

代码

 

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerDefineOffset* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:30:01* @Version: V1.0**/
public class ConsumerDefineOffset {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//设置读取的offset的位置properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据,0号分区ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("kafka-ljf", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

3.执行结果

 2.5  指定时间消费数据

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** @ClassName: ConsumerRangeTime* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:53:36* @Version: V1.0**/
public class ConsumerRangeTime {public static void main(String[] args) {Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> timestampToSearch = newHashMap<>();// 封装集合存储,每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMap<TopicPartition, OffsetAndTimestamp> offsets =kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区,对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp =offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp != null){kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}}

结果:

 

这篇关于kafka 消费者进行消费数据的各种场景的API(你值得一看)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013063

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

防止Linux rm命令误操作的多场景防护方案与实践

《防止Linuxrm命令误操作的多场景防护方案与实践》在Linux系统中,rm命令是删除文件和目录的高效工具,但一旦误操作,如执行rm-rf/或rm-rf/*,极易导致系统数据灾难,本文针对不同场景... 目录引言理解 rm 命令及误操作风险rm 命令基础常见误操作案例防护方案使用 rm编程 别名及安全删除

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

C# LiteDB处理时间序列数据的高性能解决方案

《C#LiteDB处理时间序列数据的高性能解决方案》LiteDB作为.NET生态下的轻量级嵌入式NoSQL数据库,一直是时间序列处理的优选方案,本文将为大家大家简单介绍一下LiteDB处理时间序列数... 目录为什么选择LiteDB处理时间序列数据第一章:LiteDB时间序列数据模型设计1.1 核心设计原则

Java+AI驱动实现PDF文件数据提取与解析

《Java+AI驱动实现PDF文件数据提取与解析》本文将和大家分享一套基于AI的体检报告智能评估方案,详细介绍从PDF上传、内容提取到AI分析、数据存储的全流程自动化实现方法,感兴趣的可以了解下... 目录一、核心流程:从上传到评估的完整链路二、第一步:解析 PDF,提取体检报告内容1. 引入依赖2. 封装