RocketMQ源码分析----Consumer消费进度相关

2024-08-30 09:58

本文主要是介绍RocketMQ源码分析----Consumer消费进度相关,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Consumer消费的时候总有几个疑问:

  • 消费完成后,这个消费进度存在哪里
  • 消费完成后,还没保存消费进度就挂了,会不会导致重复消费

Consumer

消费进度保存

消费完成后,会返回一个ConsumeConcurrentlyStatus.CONSUME_SUCCESS告诉MQ消费成功,以MessageListener的consumeMessage为入口分析。
消费的时候,是以ConsumeRequest类为Runnable对象,在线程池中进行处理的,即ConsumeRequest的run方法会处理这个状态

        @Overridepublic void run() {//....status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);// 如果这个ProcessQueue废弃了,则不处理if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);}}

在消费完成后,将status交给processConsumeResult处理,代码如下

    public void processConsumeResult(//final ConsumeConcurrentlyStatus status, //final ConsumeConcurrentlyContext context, //final ConsumeRequest consumeRequest//) {//....消费成功或者失败的处理// 将这批消息从ProcessQueue中移除,代表消费完毕,并返回当前ProcessQueue中的消息最小的offsetlong offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {// 更新消费进度this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}

在分析ProcessQueue的时候,说过removeMessage返回有两种情况:

  1. 如果移除这批消息之后已经没有消息了,那么返回ProcessQueue中最大的offset+1
  2. 如果还有消息,那么返回treeMap中最小的key,即未消费的消息中最小的offset

getOffsetStore返回RemoteBrokerOffsetStore,看下其实现

    @Overridepublic void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {if (mq != null) {// 通过MessageQueue获取本地的对应的消费进度AtomicLong offsetOld = this.offsetTable.get(mq);if (null == offsetOld) {offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));}if (null != offsetOld) {//increaseOnly 为false则直接覆盖//increaseOnly为true则会判断更新的值比老的值大才会进行更新if (increaseOnly) {MixAll.compareAndIncreaseOnly(offsetOld, offset);} else {offsetOld.set(offset);}}}

这篇关于RocketMQ源码分析----Consumer消费进度相关的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120500

相关文章

python panda库从基础到高级操作分析

《pythonpanda库从基础到高级操作分析》本文介绍了Pandas库的核心功能,包括处理结构化数据的Series和DataFrame数据结构,数据读取、清洗、分组聚合、合并、时间序列分析及大数据... 目录1. Pandas 概述2. 基本操作:数据读取与查看3. 索引操作:精准定位数据4. Group

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

深度解析Nginx日志分析与499状态码问题解决

《深度解析Nginx日志分析与499状态码问题解决》在Web服务器运维和性能优化过程中,Nginx日志是排查问题的重要依据,本文将围绕Nginx日志分析、499状态码的成因、排查方法及解决方案展开讨论... 目录前言1. Nginx日志基础1.1 Nginx日志存放位置1.2 Nginx日志格式2. 499

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Olingo分析和实践之ODataImpl详细分析(重要方法详解)

《Olingo分析和实践之ODataImpl详细分析(重要方法详解)》ODataImpl.java是ApacheOlingoOData框架的核心工厂类,负责创建序列化器、反序列化器和处理器等组件,... 目录概述主要职责类结构与继承关系核心功能分析1. 序列化器管理2. 反序列化器管理3. 处理器管理重要方

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种