Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

2024-01-18 12:12

本文主要是介绍Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。

ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。

在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。

在这里插入图片描述

  • client:NetworkClient对象。
  • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
    简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。
  • metadata:用于管理Kafka集群元数据。
  • unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
  • unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
  • wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
  • wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
    wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:

  • timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
  • now表示当前时间戳;
  • executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

下面介绍其流程,其中简单回顾一下NetworkClient的功能:

  1. 调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。

    具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。

    若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
    在这里插入图片描述

  2. 计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。

  3. 调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。

  4. 调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。

在这里插入图片描述

  1. 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

在这里插入图片描述

  1. 根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。

  2. 再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。

  3. 调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。

在这里插入图片描述
分析完poll方法的详细步骤之后,我们下面来看其实现代码:

在这里插入图片描述
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。

具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。

poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。

在这里插入图片描述
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。

在这里插入图片描述

在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。

在这里插入图片描述
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。

  • isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。

在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:

在这里插入图片描述
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。

当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。

在这里插入图片描述
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:

在这里插入图片描述

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。

介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:

  • awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
  • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
  • put()方法:向unsent中添加请求。
  • schedule()方法:向delayedTasks队列中添加定时任务。
  • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

这篇关于Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/618952

相关文章

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Apache 高级配置实战之从连接保持到日志分析的完整指南

《Apache高级配置实战之从连接保持到日志分析的完整指南》本文带你从连接保持优化开始,一路走到访问控制和日志管理,最后用AWStats来分析网站数据,对Apache配置日志分析相关知识感兴趣的朋友... 目录Apache 高级配置实战:从连接保持到日志分析的完整指南前言 一、Apache 连接保持 - 性

Linux中的more 和 less区别对比分析

《Linux中的more和less区别对比分析》在Linux/Unix系统中,more和less都是用于分页查看文本文件的命令,但less是more的增强版,功能更强大,:本文主要介绍Linu... 目录1. 基础功能对比2. 常用操作对比less 的操作3. 实际使用示例4. 为什么推荐 less?5.

spring-gateway filters添加自定义过滤器实现流程分析(可插拔)

《spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔)》:本文主要介绍spring-gatewayfilters添加自定义过滤器实现流程分析(可插拔),本文通过实例图... 目录需求背景需求拆解设计流程及作用域逻辑处理代码逻辑需求背景公司要求,通过公司网络代理访问的请求需要做请

Java集成Onlyoffice的示例代码及场景分析

《Java集成Onlyoffice的示例代码及场景分析》:本文主要介绍Java集成Onlyoffice的示例代码及场景分析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 需求场景:实现文档的在线编辑,团队协作总结:两个接口 + 前端页面 + 配置项接口1:一个接口,将o

SpringBoot实现Kafka动态反序列化的完整代码

《SpringBoot实现Kafka动态反序列化的完整代码》在分布式系统中,Kafka作为高吞吐量的消息队列,常常需要处理来自不同主题(Topic)的异构数据,不同的业务场景可能要求对同一消费者组内的... 目录引言一、问题背景1.1 动态反序列化的需求1.2 常见问题二、动态反序列化的核心方案2.1 ht

IDEA下"File is read-only"可能原因分析及"找不到或无法加载主类"的问题

《IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题》:本文主要介绍IDEA下Fileisread-only可能原因分析及找不到或无法加载主类的问题,具有很好的参... 目录1.File is read-only”可能原因2.“找不到或无法加载主类”问题的解决总结1.File

Dubbo之SPI机制的实现原理和优势分析

《Dubbo之SPI机制的实现原理和优势分析》:本文主要介绍Dubbo之SPI机制的实现原理和优势,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Dubbo中SPI机制的实现原理和优势JDK 中的 SPI 机制解析Dubbo 中的 SPI 机制解析总结Dubbo中

C#继承之里氏替换原则分析

《C#继承之里氏替换原则分析》:本文主要介绍C#继承之里氏替换原则,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录C#里氏替换原则一.概念二.语法表现三.类型检查与转换总结C#里氏替换原则一.概念里氏替换原则是面向对象设计的基本原则之一:核心思想:所有引py

基于Go语言实现Base62编码的三种方式以及对比分析

《基于Go语言实现Base62编码的三种方式以及对比分析》Base62编码是一种在字符编码中使用62个字符的编码方式,在计算机科学中,,Go语言是一种静态类型、编译型语言,它由Google开发并开源,... 目录一、标准库现状与解决方案1. 标准库对比表2. 解决方案完整实现代码(含边界处理)二、关键实现细