RocketMQ源码分析----Consumer消费进度相关

2024-08-30 09:58

本文主要是介绍RocketMQ源码分析----Consumer消费进度相关,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Consumer消费的时候总有几个疑问:

  • 消费完成后,这个消费进度存在哪里
  • 消费完成后,还没保存消费进度就挂了,会不会导致重复消费

Consumer

消费进度保存

消费完成后,会返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS告诉MQ消费成功,以MessageListener的consumeMessage为入口分析。
消费的时候,是以ConsumeRequest类为Runnable对象,在线程池中进行处理的,即ConsumeRequest的run方法会处理这个状态

        @Overridepublic void run() {//....status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);// 如果这个ProcessQueue废弃了,则不处理if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);}}

在消费完成后,将status交给processConsumeResult处理,代码如下

    public void processConsumeResult(//final ConsumeConcurrentlyStatus status, //final ConsumeConcurrentlyContext context, //final ConsumeRequest consumeRequest//) {//....消费成功或者失败的处理// 将这批消息从ProcessQueue中移除,代表消费完毕,并返回当前ProcessQueue中的消息最小的offsetlong offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新消费进度this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}

在分析ProcessQueue的时候,说过removeMessage返回有两种情况:

  1. 如果移除这批消息之后已经没有消息了,那么返回ProcessQueue中最大的offset+1
  2. 如果还有消息,那么返回treeMap中最小的key,即未消费的消息中最小的offset

getOffsetStore返回RemoteBrokerOffsetStore,看下其实现

    @Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {// 通过MessageQueue获取本地的对应的消费进度AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {//increaseOnly 为false则直接覆盖//increaseOnly为true则会判断更新的值比老的值大才会进行更新if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}

这篇关于RocketMQ源码分析----Consumer消费进度相关的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120500

相关文章

Maven中引入 springboot 相关依赖的方式(最新推荐)

《Maven中引入springboot相关依赖的方式(最新推荐)》:本文主要介绍Maven中引入springboot相关依赖的方式(最新推荐),本文给大家介绍的非常详细,对大家的学习或工作具有... 目录Maven中引入 springboot 相关依赖的方式1. 不使用版本管理(不推荐)2、使用版本管理(推

Python中的Walrus运算符分析示例详解

《Python中的Walrus运算符分析示例详解》Python中的Walrus运算符(:=)是Python3.8引入的一个新特性,允许在表达式中同时赋值和返回值,它的核心作用是减少重复计算,提升代码简... 目录1. 在循环中避免重复计算2. 在条件判断中同时赋值变量3. 在列表推导式或字典推导式中简化逻辑

Python的time模块一些常用功能(各种与时间相关的函数)

《Python的time模块一些常用功能(各种与时间相关的函数)》Python的time模块提供了各种与时间相关的函数,包括获取当前时间、处理时间间隔、执行时间测量等,:本文主要介绍Python的... 目录1. 获取当前时间2. 时间格式化3. 延时执行4. 时间戳运算5. 计算代码执行时间6. 转换为指

Java程序进程起来了但是不打印日志的原因分析

《Java程序进程起来了但是不打印日志的原因分析》:本文主要介绍Java程序进程起来了但是不打印日志的原因分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java程序进程起来了但是不打印日志的原因1、日志配置问题2、日志文件权限问题3、日志文件路径问题4、程序

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

Python 迭代器和生成器概念及场景分析

《Python迭代器和生成器概念及场景分析》yield是Python中实现惰性计算和协程的核心工具,结合send()、throw()、close()等方法,能够构建高效、灵活的数据流和控制流模型,这... 目录迭代器的介绍自定义迭代器省略的迭代器生产器的介绍yield的普通用法yield的高级用法yidle

C++ Sort函数使用场景分析

《C++Sort函数使用场景分析》sort函数是algorithm库下的一个函数,sort函数是不稳定的,即大小相同的元素在排序后相对顺序可能发生改变,如果某些场景需要保持相同元素间的相对顺序,可使... 目录C++ Sort函数详解一、sort函数调用的两种方式二、sort函数使用场景三、sort函数排序

AJAX请求上传下载进度监控实现方式

《AJAX请求上传下载进度监控实现方式》在日常Web开发中,AJAX(AsynchronousJavaScriptandXML)被广泛用于异步请求数据,而无需刷新整个页面,:本文主要介绍AJAX请... 目录1. 前言2. 基于XMLHttpRequest的进度监控2.1 基础版文件上传监控2.2 增强版多

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++