kafka源码6-Sender线程对Broker响应的处理

2024-02-07 23:38

本文主要是介绍kafka源码6-Sender线程对Broker响应的处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

生产者处理响应

前面我们分析了producer发送请求的过程,现在分析发送请求后,怎么处理响应的,producer的响应处理也是在Sender线程中处理的,再看Selector的pollSelectionKeys方法

class Selector{
void pollSelectionKeys(Set<SelectionKey> selectionKeys,boolean isImmediatelyConnected,long currentTimeNanos) {/*** 遍历所有的key*/for (SelectionKey key : determineHandlingOrder(selectionKeys)) {/*** 获取对应的channel*/KafkaChannel channel = channel(key);long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;// register all per-connection metrics at oncesensors.maybeRegisterConnectionMetrics(channel.id());if (idleExpiryManager != null)idleExpiryManager.update(channel.id(), currentTimeNanos);boolean sendFailed = false;try {............./***读取响应的数据*/attemptRead(key, channel);....................}/* cancel any defunct sockets */if (!key.isValid())close(channel, CloseMode.GRACEFUL);} catch (Exception e) {...........} finally {maybeRecordTimePerConnection(channel, channelStartTimeNanos);}}private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no//previous receive(s) already staged or otherwise in progress then read from itif (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)&& !explicitlyMutedChannels.contains(channel)) {NetworkReceive networkReceive;while ((networkReceive = channel.read()) != null) {madeReadProgressLastPoll = true;/*** 有Broker返回的响应,添加*/addToStagedReceives(channel, networkReceive);}if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure.} else {madeReadProgressLastPoll = true;}}}private void  addToStagedReceives

这篇关于kafka源码6-Sender线程对Broker响应的处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/689265

相关文章

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

Spring Boot 中的默认异常处理机制及执行流程

《SpringBoot中的默认异常处理机制及执行流程》SpringBoot内置BasicErrorController,自动处理异常并生成HTML/JSON响应,支持自定义错误路径、配置及扩展,如... 目录Spring Boot 异常处理机制详解默认错误页面功能自动异常转换机制错误属性配置选项默认错误处理

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

Java中的xxl-job调度器线程池工作机制

《Java中的xxl-job调度器线程池工作机制》xxl-job通过快慢线程池分离短时与长时任务,动态降级超时任务至慢池,结合异步触发和资源隔离机制,提升高频调度的性能与稳定性,支撑高并发场景下的可靠... 目录⚙️ 一、调度器线程池的核心设计 二、线程池的工作流程 三、线程池配置参数与优化 四、总结:线程

WinForm跨线程访问UI及UI卡死的解决方案

《WinForm跨线程访问UI及UI卡死的解决方案》在WinForm开发过程中,跨线程访问UI控件和界面卡死是常见的技术难题,由于Windows窗体应用程序的UI控件默认只能在主线程(UI线程)上操作... 目录前言正文案例1:直接线程操作(无UI访问)案例2:BeginInvoke访问UI(错误用法)案例

Java堆转储文件之1.6G大文件处理完整指南

《Java堆转储文件之1.6G大文件处理完整指南》堆转储文件是优化、分析内存消耗的重要工具,:本文主要介绍Java堆转储文件之1.6G大文件处理的相关资料,文中通过代码介绍的非常详细,需要的朋友可... 目录前言文件为什么这么大?如何处理这个文件?分析文件内容(推荐)删除文件(如果不需要)查看错误来源如何避

使用Python构建一个高效的日志处理系统

《使用Python构建一个高效的日志处理系统》这篇文章主要为大家详细讲解了如何使用Python开发一个专业的日志分析工具,能够自动化处理、分析和可视化各类日志文件,大幅提升运维效率,需要的可以了解下... 目录环境准备工具功能概述完整代码实现代码深度解析1. 类设计与初始化2. 日志解析核心逻辑3. 文件处

Java docx4j高效处理Word文档的实战指南

《Javadocx4j高效处理Word文档的实战指南》对于需要在Java应用程序中生成、修改或处理Word文档的开发者来说,docx4j是一个强大而专业的选择,下面我们就来看看docx4j的具体使用... 目录引言一、环境准备与基础配置1.1 Maven依赖配置1.2 初始化测试类二、增强版文档操作示例2.

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

Linux线程之线程的创建、属性、回收、退出、取消方式

《Linux线程之线程的创建、属性、回收、退出、取消方式》文章总结了线程管理核心知识:线程号唯一、创建方式、属性设置(如分离状态与栈大小)、回收机制(join/detach)、退出方法(返回/pthr... 目录1. 线程号2. 线程的创建3. 线程属性4. 线程的回收5. 线程的退出6. 线程的取消7.