RocketMQ之消息存储管理

2024-03-11 23:18

本文主要是介绍RocketMQ之消息存储管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

        我们知道RocketMQ的queue都是逻辑上的概念,实际消息都是写入文件来管理的,达到了操作queue的表象,下面就RocketMQ管理消息操作文件的思路做个讲解。RocketMQ主要有6类文件,小文件有3类:checkpoint文件,config目录下的配置文件,abort文件,大文件有3类:Index文件,ConsumeQueue文件,CommitLog文件。Broker操作相关文件业务中最终统一调用DefaultMessageStore,而DefaultMessageStore又管理三类文件的写入,具体体现为IndexService、ConsumeQueue、CommitLog三个类型,形象的图形可参考下面
对于大文件的操作,使用的是NIO的MappedByteBuffer类来提高读写性能。这个类是文件内存映射的相关类,支持随机读和顺序写,在RocketMQ中,被封装成了MappedFile类。 RocketMQ对于大文件的存储会进行多文件存储,到达指定大小会重新建立新文件存储。
        下面我们来了解下RocketMQ是如何模拟出逻辑队列的,CommitLog是用于存储真实的物理消息的结构,ConsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在CommitLog中。CommitLog文件的存储地址: $HOME\store\commitlog\${fileName}。每个文件的大小默认为1G,CommitLog的文件名fileName,名字长度为20位,左边补0,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0, 文件大小为 1G=1073741824; 当这个文件满了,第二个文件名字为 00000000001073741824。
        创建Index文件的目的是能快速定位查询出消息,是为随机查询消息服务的,比如在管理端查询消息等,和consume queue没有直接联系的,与consume queue相比,我们更应该关系consume queue的机制。Index文件的存储位置是:$HOME \store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小。一个索引文件从整体上可以分为header和其它部分(body)。header主要描述此文件中索引的整体参数等信息。slot存储里面保存的是 Index Linked List的索引。消息对应slotPos=Math.abs(keyHash)%hashSlotNum,消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE,也就是说能很快定位到某一个消息在IndexLinkedList中的位置,形象的图可以如下:
Header字段
beginTimestamp(8)   第一个索引消息保存broker的时间戳
endTimestamp(8)    最后一个索引消息保存broker的时间戳
beginPhyOffset(8)  第一个索引消息在commitLog的偏移地址
endPhyOffset(8)  最后一个索引消息在commitlog的偏移地址
hashSlotCount(4) 从0开始,计数,记录Slot Table使用个数
indexCount(4) 从1开始,计数,记录Index Linked List使用个数
IndexLinkedList字段
keyHash(4)   topic+key的hash值
phyOffset(8) 消息在commitLog中的偏移地址
timeDiff(4)  消息的保存时间戳减去IndexHeader的beginTimestamp
slotValue   上一个相同keyHash的节点在Index Linked List的位置,链表连接指针
RocketMQ在启动时会根据CommitLog文件修改Index文件和ConsumeQueue文件,运行时也会通过独立线程根据CommitLog文件来刷新Index文件和ConsumeQueue文件。
        下面是ConsumeQueue的结构

与ConsumeQueue打交道的是Consumer,这个结构能迅速在CommitLog里将消息取出进行消费。

这篇关于RocketMQ之消息存储管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/799392

相关文章

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优