Kafka Consumer API 的使用

2024-04-06 18:58
文章标签 使用 api kafka consumer

本文主要是介绍Kafka Consumer API 的使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

导读

Kafka具有两套消费者API:高级API、低级API。本文章将介绍两种API的区别以及使用时需要注意的地方。

低级API

1. 使用方法

  1. find leader broker
  2. build request
  3. fetch data
  4. identify leader change

2. 为什么要find leader

kafka在0.8版本后,引入replication机制。每个partition是有备份的,在某个broker出故障后,用户仍可以从其他备份中读取数据。消费者并不是并行从多个broker上获取同一个partition的数据,而是选举出一个leader broker,这个broker上的该partition将用于读写。其他的partition则复制leader broker上partition的数据,保持同步。

备份个数由创建topic时,replica-factory决定,当该参数为1时,表示不备份,大于1时,每个partition将有多个备份partition,且分布在不同的broker上。

3. 适合场合

  1. 消费起点需要设置
  2. 反复消费某一段数据

备注:笔者项目中有一个需求是,解析数据时,既可以支持从断点消费,又可以支持从当前位移消费。这个需求就使用了这个API实现功能。

高级API

1. 原理

高级API使用,围绕数据流工作,利用的低级API消费数据。用户不用关心leader broker、offset等问题。

2. 线程

使用高级API时,需要关注线程问题。

用户在使用高级API时,需要指定每个topic获取数据的线程数量。一个线程对应一个数据流。但是寻找主分区、创建流、设置offset这些过程中,高级API仍只有一个线程。只有当从partition中获取数据时,每个流才会产生一个fetchRunable的线程。

每个topic的线程数,最好设置为等于或者小于topic的partition个数。

3. Zookeeper

Kafka的使用需要Zookeeper有以下原因:

  1. 动态集群扩展。
  2. broker的注册,保存topic、partition元数据。
  3. consumer的注册。
  4. watcher的注册。

而高级API使用中,上述Zookeeper的作用全都用到了。首先,均衡(balance)partition和consumer时,需要两者信息。第二,kafka通过Watcher知道broker、topic、partition是否有变化。第三,kafka通过与zk通讯监控partition leader存活性.

另外,在笔者实验中发现,用kill指令杀死进程时,该进程中kafka消费者在zk中的注册信息可能并没有及时删除,如果马上拉起这个进程,将会可能出现消费者大于partition个数的情况。这种情况并不是必现,原因可能和kafka与zk的通讯时间有关系(会多查阅一点资料,验证猜想是否正确)

4. Rebalance

因为每个流,其实都需要指定数据来源的partition.每次创建线程,从partion中获取数据时,需要将同一个topic所有的partition和该group中消费该topic的所有线程合理分配,保证每一个partition只被一个线程消费。这个过程叫做balance,由高级API自动完成。

当发生以下三种情况的时候,会触发Kafka高级API的rebalance动作:

  1. 同一个group下,有新的消费者加入。
  2. 同一个group下,有topic的partition个数有变化。
  3. kafka API与zk的连接中断。

前两种情况比较好理解,重点讲第三种情况。我的理解是,zk超时或者断开后,kafka没有注册partition的信息,需要重新连接zk获取最新的注册信息,并根据新获取的信息进行线程、分区之间的分配和均衡。(个人理解,我会多查阅一些资料证实。)

zk超时后rebalance其实是很有可能不成功,并导致更多次的rebalance。原因是,如果kafka rebalance尝试的总时间(即尝试次数*每次尝试时间)小于zk超时时间,那么在zk连接失败重连之前,kafka的rebalance已经失败。这个原因可能会导致高级API不断的rebalance。而高级API默认设置参数,rebalance的尝试总时间是小于zk超时时间的,所以大家使用高级API时要根据实际情况处理这一点。



作者:君子月满楼
链接:https://www.jianshu.com/p/4d03ee74ad66
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

这篇关于Kafka Consumer API 的使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880579

相关文章

C++中detach的作用、使用场景及注意事项

《C++中detach的作用、使用场景及注意事项》关于C++中的detach,它主要涉及多线程编程中的线程管理,理解detach的作用、使用场景以及注意事项,对于写出高效、安全的多线程程序至关重要,下... 目录一、什么是join()?它的作用是什么?类比一下:二、join()的作用总结三、join()怎么

mybatis中resultMap的association及collectio的使用详解

《mybatis中resultMap的association及collectio的使用详解》MyBatis的resultMap定义数据库结果到Java对象的映射规则,包含id、type等属性,子元素需... 目录1.reusltmap的说明2.association的使用3.collection的使用4.总

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be

Java中使用 @Builder 注解的简单示例

《Java中使用@Builder注解的简单示例》@Builder简化构建但存在复杂性,需配合其他注解,导致可变性、抽象类型处理难题,链式编程非最佳实践,适合长期对象,避免与@Data混用,改用@G... 目录一、案例二、不足之处大多数同学使用 @Builder 无非就是为了链式编程,然而 @Builder

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

mybatis-plus QueryWrapper中or,and的使用及说明

《mybatis-plusQueryWrapper中or,and的使用及说明》使用MyBatisPlusQueryWrapper时,因同时添加角色权限固定条件和多字段模糊查询导致数据异常展示,排查发... 目录QueryWrapper中or,and使用列表中还要同时模糊查询多个字段经过排查这就导致只要whe

Python使用openpyxl读取Excel的操作详解

《Python使用openpyxl读取Excel的操作详解》本文介绍了使用Python的openpyxl库进行Excel文件的创建、读写、数据操作、工作簿与工作表管理,包括创建工作簿、加载工作簿、操作... 目录1 概述1.1 图示1.2 安装第三方库2 工作簿 workbook2.1 创建:Workboo

使用Go实现文件复制的完整流程

《使用Go实现文件复制的完整流程》本案例将实现一个实用的文件操作工具:将一个文件的内容完整复制到另一个文件中,这是文件处理中的常见任务,比如配置文件备份、日志迁移、用户上传文件转存等,文中通过代码示例... 目录案例说明涉及China编程知识点示例代码代码解析示例运行练习扩展小结案例说明我们将通过标准库 os

postgresql使用UUID函数的方法

《postgresql使用UUID函数的方法》本文给大家介绍postgresql使用UUID函数的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录PostgreSQL有两种生成uuid的方法。可以先通过sql查看是否已安装扩展函数,和可以安装的扩展函数

如何使用Lombok进行spring 注入

《如何使用Lombok进行spring注入》本文介绍如何用Lombok简化Spring注入,推荐优先使用setter注入,通过注解自动生成getter/setter及构造器,减少冗余代码,提升开发效... Lombok为了开发环境简化代码,好处不用多说。spring 注入方式为2种,构造器注入和setter