kafka 消费者进行消费数据的各种场景的API(你值得一看)

2024-05-29 07:48

本文主要是介绍kafka 消费者进行消费数据的各种场景的API(你值得一看),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一 kafka消费端的参数

 

二 实现案例

2.1 订阅某个主题

创建一个独立消费者,消费 kafka-ljf 主题中数据。
注意: 在消费者 API 代码中必须配置消费者组 id 。命令行启动消费者不填写消费者组
id 会被自动填写随机的消费者组 id
2.消费者代码
package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerTopicDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 14:02:05* @Version: V1.0**/
public class ConsumerTopicDemo {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");// 创建消费者对象KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<String, String>(properties);// 注册要消费的主题(可以消费多个主题)ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);// 拉取数据打印while (true) {// 设置 1s 中消费一批数据ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));// 打印消费到的数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

3.执行生产者产生数据

 4.消费数据,观察

2.2 订阅某个主题下的某个分区

需求:创建一个独立消费者,消费 kafka-ljf主题 0 号分区的数据。

2.代码

 

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerPartitionDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 14:55:31* @Version: V1.0**/
public class ConsumerPartitionDemo {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"beijing");KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据,0号分区ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("kafka-ljf", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

 3.生产者生产数据

 4.消费者消费

可见只消费了0号分区上的数据 

2.3  消费者组案例

测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

1.consumer代码复制一份,变为两个消费者

 

2. 消费者2:

 3.生产者:

 

4.查看消费者消费信息

 5.查看消费者2消费信息

 结论:即可看到两个消费者在消费不同 分区的数据。消费者一消费分区1的数据,消费者2消费分区2的数据。

2.4  指定offset消费

auto.offset.reset = earliest | latest | none   其中 默认是 latest
Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量
时(例如该数据已被删除),该怎么办?
1 earliest :自动将偏移量重置为最早的偏移量,相当于   --from-beginning
(2) latest (默认值) :自动将偏移量重置为最新偏移量。
(3) none :如果未找到消费者组的先前偏移量,则向消费者抛出异常。

 (4)任意指定 offset 位移开始消费

代码:

具体代码:

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;/*** @ClassName: ConsumerSeekDemo* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:08:01* @Version: V1.0**/
public class ConsumerSeekDemo {public static void main(String[] args) {// 0 配置信息Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "beijing");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment= new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}// 遍历所有分区,并指定 offset 从 10 的位置开始消费for (TopicPartition tp: assignment) {kafkaConsumer.seek(tp, 10);}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}}

 结果:

可以看到都是从0,1分区中,offset为10的位置开始查询的。 

注意:每次执行完,需要修改消费者组名;

2.4  指定offset设置为earliest

auto.offset.reset = earliest | latest | none   其中默认是 latest。本案例设置为earliest。

注意:每次执行完,需要修改消费者组名;每次执行要起一个不同的消费组的名字

代码

 

package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;/*** @ClassName: ConsumerDefineOffset* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:30:01* @Version: V1.0**/
public class ConsumerDefineOffset {public static void main(String[] args) {// 1.创建消费者的配置对象Properties properties = new Properties();// 2.给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// 配置序列化 必须properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//设置读取的offset的位置properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");// 配置消费者组(必须),名字可以任意起properties.put(ConsumerConfig.GROUP_ID_CONFIG,"shanghai");//注意:每次执行完,需要修改消费者组名;KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 消费某个主题的某个分区数据,0号分区ArrayList<TopicPartition> topicPartitions = newArrayList<>();topicPartitions.add(new TopicPartition("kafka-ljf", 0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}
}

3.执行结果

 2.5  指定时间消费数据

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
例如要求按照时间消费前一天的数据,怎么处理?
package com.ljf.spring.boot.demo.consumer;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** @ClassName: ConsumerRangeTime* @Description: TODO* @Author: liujianfu* @Date: 2022/04/10 16:53:36* @Version: V1.0**/
public class ConsumerRangeTime {public static void main(String[] args) {Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.152.136:9092,192.168.152.138:9092,192.168.152.140:9092");// key value 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-time");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = newKafkaConsumer<>(properties);// 2 订阅一个主题ArrayList<String> topics = new ArrayList<>();topics.add("kafka-ljf");kafkaConsumer.subscribe(topics);Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {kafkaConsumer.poll(Duration.ofSeconds(1));// 获取消费者分区分配信息(有了分区分配信息才能开始消费)assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> timestampToSearch = newHashMap<>();// 封装集合存储,每个分区对应一天前的数据for (TopicPartition topicPartition : assignment) {timestampToSearch.put(topicPartition, System.currentTimeMillis() - 5 * 24 * 3600 * 1000);}// 获取从 1 天前开始消费的每个分区的 offsetMap<TopicPartition, OffsetAndTimestamp> offsets =kafkaConsumer.offsetsForTimes(timestampToSearch);// 遍历每个分区,对每个分区设置消费时间。for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp =offsets.get(topicPartition);// 根据时间指定开始消费的位置if (offsetAndTimestamp != null){kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}}// 3 消费该主题数据while (true) {ConsumerRecords<String, String> consumerRecords =kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord);}}}}

结果:

 

这篇关于kafka 消费者进行消费数据的各种场景的API(你值得一看)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1013063

相关文章

批量导入txt数据到的redis过程

《批量导入txt数据到的redis过程》用户通过将Redis命令逐行写入txt文件,利用管道模式运行客户端,成功执行批量删除以Product*匹配的Key操作,提高了数据清理效率... 目录批量导入txt数据到Redisjs把redis命令按一条 一行写到txt中管道命令运行redis客户端成功了批量删除k

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

Java Stream流之GroupBy的用法及应用场景

《JavaStream流之GroupBy的用法及应用场景》本教程将详细介绍如何在Java中使用Stream流的groupby方法,包括基本用法和一些常见的实际应用场景,感兴趣的朋友一起看看吧... 目录Java Stream流之GroupBy的用法1. 前言2. 基础概念什么是 GroupBy?Stream

SpringBoot监控API请求耗时的6中解决解决方案

《SpringBoot监控API请求耗时的6中解决解决方案》本文介绍SpringBoot中记录API请求耗时的6种方案,包括手动埋点、AOP切面、拦截器、Filter、事件监听、Micrometer+... 目录1. 简介2.实战案例2.1 手动记录2.2 自定义AOP记录2.3 拦截器技术2.4 使用Fi

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

C#监听txt文档获取新数据方式

《C#监听txt文档获取新数据方式》文章介绍通过监听txt文件获取最新数据,并实现开机自启动、禁用窗口关闭按钮、阻止Ctrl+C中断及防止程序退出等功能,代码整合于主函数中,供参考学习... 目录前言一、监听txt文档增加数据二、其他功能1. 设置开机自启动2. 禁止控制台窗口关闭按钮3. 阻止Ctrl +

java如何实现高并发场景下三级缓存的数据一致性

《java如何实现高并发场景下三级缓存的数据一致性》这篇文章主要为大家详细介绍了java如何实现高并发场景下三级缓存的数据一致性,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 下面代码是一个使用Java和Redisson实现的三级缓存服务,主要功能包括:1.缓存结构:本地缓存:使