全网最细RocketMQ源码四:消息存储

2024-01-16 02:36

本文主要是介绍全网最细RocketMQ源码四:消息存储,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习

入口

SendMessageProcessor.processRequest
在这里插入图片描述
在这里插入图片描述

  private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 使用defaultMessageStore.aysncPutMessage存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}

实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦

MappedFile

public class MappedFile extends ReferenceResource {// 内存页大小:4kpublic static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 当前进程下 所有的 mappedFile占用的总虚拟内存大小private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前进程下 所有的 mappedFile个数private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);// 当前mappedFile数据写入点protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);// 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)private final AtomicInteger flushedPosition = new AtomicInteger(0);// 文件大小protected int fileSize;// 文件通道protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;// 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量   索引文件: 年月日小时分钟秒.. )private String fileName;// 文件名转longprivate long fileFromOffset;// 文件对象private File file;// 内存映射缓冲区,访问虚拟内存private MappedByteBuffer mappedByteBuffer;// 该文件下 保存的第一条 msg 的存储时间private volatile long storeTimestamp = 0;// 当前文件如果是 目录内 有效文件的 首文件的话,该值为trueprivate boolean firstCreateInQueue = false;
  • 构造方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage方法
    在这里插入图片描述
    在这里插入图片描述

  • appendMessage(byte[] data)
    在这里插入图片描述

  • flush
    在这里插入图片描述

MappedFileQueue

public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;// mfq 管理的目录(CommitLog: ../store/commitlog  或者  consumeQueue: ../store/xxx_topic/0)private final String storePath;// 目录下每个文件大小(commitLog文件 默认 1g     consumeQueue 文件 默认 600w字节)private final int mappedFileSize;// list,目录下的每个 mappedFile 都加入该listprivate final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。private final AllocateMappedFileService allocateMappedFileService;// 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)private long flushedWhere = 0;private long committedWhere = 0;// 当前目录下最后一条msg存储时间private volatile long storeTimestamp = 0;
  • load方法
    在这里插入图片描述

  • getLastMappedFile

 /*** 参数1:startOffset ,文件起始偏移量* 参数2:needCreate ,当 list 为空时,是否创建mappedFile*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {// 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾// 两种情况会创建:// 1. list内没有mappedFile// 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {// 情况1  list内没有mappedFile// createOffset 取值 必须是 mappedFileSize 的倍数 或者 0createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2  list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..// 上一个文件名 转long + mappedFileSizecreateOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。// 获取待创建文件的 绝对路径(下次即将要创建的文件名)String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 获取 下下次 要创建的文件的 绝对路径String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,// 内部线程处理完后 会返回给我们结果  结果 就是 mappedFile对象。// 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}
  • flush
 /*** @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)* @return boolean true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘*/public boolean flush(final int flushLeastPages) {boolean result = true;// 获取当前正在刷盘的文件 (正在顺序写的mappedFile)MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取mappedFile 最后一条消息的存储时间long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用mf 刷盘 ,返回mf的最新的落盘位点int offset = mappedFile.flush(flushLeastPages);// mf起始偏移量 + mf最新的落盘位点long where = mappedFile.getFileFromOffset() + offset;// true 表示本次刷盘无数据落盘   false 表示本次刷盘有数据落盘result = where == this.flushedWhere;// 将最新的目录刷盘位点 赋值给 flushedWherethis.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result;}

CommitLog

ConsumeQueue

DefaultMessageStore

这篇关于全网最细RocketMQ源码四:消息存储的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/611042

相关文章

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

MySQL 存储引擎 MyISAM详解(最新推荐)

《MySQL存储引擎MyISAM详解(最新推荐)》使用MyISAM存储引擎的表占用空间很小,但是由于使用表级锁定,所以限制了读/写操作的性能,通常用于中小型的Web应用和数据仓库配置中的只读或主要... 目录mysql 5.5 之前默认的存储引擎️‍一、MyISAM 存储引擎的特性️‍二、MyISAM 的主

Linux lvm实例之如何创建一个专用于MySQL数据存储的LVM卷组

《Linuxlvm实例之如何创建一个专用于MySQL数据存储的LVM卷组》:本文主要介绍使用Linux创建一个专用于MySQL数据存储的LVM卷组的实例,具有很好的参考价值,希望对大家有所帮助,... 目录在Centos 7上创建卷China编程组并配置mysql数据目录1. 检查现有磁盘2. 创建物理卷3. 创

使用Python实现调用API获取图片存储到本地的方法

《使用Python实现调用API获取图片存储到本地的方法》开发一个自动化工具,用于从JSON数据源中提取图像ID,通过调用指定API获取未经压缩的原始图像文件,并确保下载结果与Postman等工具直接... 目录使用python实现调用API获取图片存储到本地1、项目概述2、核心功能3、环境准备4、代码实现

8种快速易用的Python Matplotlib数据可视化方法汇总(附源码)

《8种快速易用的PythonMatplotlib数据可视化方法汇总(附源码)》你是否曾经面对一堆复杂的数据,却不知道如何让它们变得直观易懂?别慌,Python的Matplotlib库是你数据可视化的... 目录引言1. 折线图(Line Plot)——趋势分析2. 柱状图(Bar Chart)——对比分析3

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

SpringBoot项目中Redis存储Session对象序列化处理

《SpringBoot项目中Redis存储Session对象序列化处理》在SpringBoot项目中使用Redis存储Session时,对象的序列化和反序列化是关键步骤,下面我们就来讲讲如何在Spri... 目录一、为什么需要序列化处理二、Spring Boot 集成 Redis 存储 Session2.1

基于MongoDB实现文件的分布式存储

《基于MongoDB实现文件的分布式存储》分布式文件存储的方案有很多,今天分享一个基于mongodb数据库来实现文件的存储,mongodb支持分布式部署,以此来实现文件的分布式存储,需要的朋友可以参考... 目录一、引言二、GridFS 原理剖析三、Spring Boot 集成 GridFS3.1 添加依赖

java变量内存中存储的使用方式

《java变量内存中存储的使用方式》:本文主要介绍java变量内存中存储的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍2、变量的定义3、 变量的类型4、 变量的作用域5、 内存中的存储方式总结1、介绍在 Java 中,变量是用于存储程序中数据

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2