RocketMQ之消息存储管理

2024-03-11 23:18

本文主要是介绍RocketMQ之消息存储管理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

        我们知道RocketMQ的queue都是逻辑上的概念,实际消息都是写入文件来管理的,达到了操作queue的表象,下面就RocketMQ管理消息操作文件的思路做个讲解。RocketMQ主要有6类文件,小文件有3类:checkpoint文件,config目录下的配置文件,abort文件,大文件有3类:Index文件,ConsumeQueue文件,CommitLog文件。Broker操作相关文件业务中最终统一调用DefaultMessageStore,而DefaultMessageStore又管理三类文件的写入,具体体现为IndexService、ConsumeQueue、CommitLog三个类型,形象的图形可参考下面
对于大文件的操作,使用的是NIO的MappedByteBuffer类来提高读写性能。这个类是文件内存映射的相关类,支持随机读和顺序写,在RocketMQ中,被封装成了MappedFile类。 RocketMQ对于大文件的存储会进行多文件存储,到达指定大小会重新建立新文件存储。
        下面我们来了解下RocketMQ是如何模拟出逻辑队列的,CommitLog是用于存储真实的物理消息的结构,ConsumeQueue是逻辑队列,仅仅存储了CommitLog的位移而已,真实的存储都在CommitLog中。CommitLog文件的存储地址: $HOME\store\commitlog\${fileName}。每个文件的大小默认为1G,CommitLog的文件名fileName,名字长度为20位,左边补0,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0, 文件大小为 1G=1073741824; 当这个文件满了,第二个文件名字为 00000000001073741824。
        创建Index文件的目的是能快速定位查询出消息,是为随机查询消息服务的,比如在管理端查询消息等,和consume queue没有直接联系的,与consume queue相比,我们更应该关系consume queue的机制。Index文件的存储位置是:$HOME \store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小。一个索引文件从整体上可以分为header和其它部分(body)。header主要描述此文件中索引的整体参数等信息。slot存储里面保存的是 Index Linked List的索引。消息对应slotPos=Math.abs(keyHash)%hashSlotNum,消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE,也就是说能很快定位到某一个消息在IndexLinkedList中的位置,形象的图可以如下:
Header字段
beginTimestamp(8)   第一个索引消息保存broker的时间戳
endTimestamp(8)    最后一个索引消息保存broker的时间戳
beginPhyOffset(8)  第一个索引消息在commitLog的偏移地址
endPhyOffset(8)  最后一个索引消息在commitlog的偏移地址
hashSlotCount(4) 从0开始,计数,记录Slot Table使用个数
indexCount(4) 从1开始,计数,记录Index Linked List使用个数
IndexLinkedList字段
keyHash(4)   topic+key的hash值
phyOffset(8) 消息在commitLog中的偏移地址
timeDiff(4)  消息的保存时间戳减去IndexHeader的beginTimestamp
slotValue   上一个相同keyHash的节点在Index Linked List的位置,链表连接指针
RocketMQ在启动时会根据CommitLog文件修改Index文件和ConsumeQueue文件,运行时也会通过独立线程根据CommitLog文件来刷新Index文件和ConsumeQueue文件。
        下面是ConsumeQueue的结构

与ConsumeQueue打交道的是Consumer,这个结构能迅速在CommitLog里将消息取出进行消费。

这篇关于RocketMQ之消息存储管理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/799392

相关文章

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

SpringIntegration消息路由之Router的条件路由与过滤功能

《SpringIntegration消息路由之Router的条件路由与过滤功能》本文详细介绍了Router的基础概念、条件路由实现、基于消息头的路由、动态路由与路由表、消息过滤与选择性路由以及错误处理... 目录引言一、Router基础概念二、条件路由实现三、基于消息头的路由四、动态路由与路由表五、消息过滤

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在