RocketMQ源码解析——存储部分(2)对`MappedFile`进一步封装的`MappedFileQueue`

本文主要是介绍RocketMQ源码解析——存储部分(2)对`MappedFile`进一步封装的`MappedFileQueue`,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • `MappedFileQueue`
        • 属性介绍
        • 方法介绍
          • 构造方法
        • 检查文件是否完整`checkSelf`
        • 加载文件`load`
        • 根据时间戳获取文件`getMappedFileByTime`
        • 根据偏移量获取文件`findMappedFileByOffset`
        • 根据偏移量截断文件`truncateDirtyFiles`
        • 获取最后一个文件`getLastMappedFile`
        • 根据时间删除过期文件`deleteExpiredFileByTime`
        • 根据偏移量删除文件`deleteExpiredFileByOffset`
        • 其他跟`MappedFile`有关联的方法
      • 在`CommitLog`和`ConsumeQueue`中的使用

MappedFileQueue

 前面已经介绍了RocketMQ跟存储交互的底层封装对象mappedFile。而跟CommitLog,ConsumeQueue进行交互的并不是mappedFile,而是对其进一步封装的MappedFileQueue类。
在这里插入图片描述

属性介绍
	//文件的存储路径private final String storePath;//映射文件大小,指的是单个文件的大小,比如CommitLog大小为1Gprivate final int mappedFileSize;//并发线程安全队列存储映射文件private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();private final AllocateMappedFileService allocateMappedFileService;//刷新完的位置private long flushedWhere = 0;//提交完成的位置private long committedWhere = 0;//存储时间private volatile long storeTimestamp = 0;

MappedFileQueue这个类的属性相对来说比较少,其中需要说的是,AllocateMappedFileService类型的字段,这个对象的作用是根据情况来决定是否需要提前创建好MappedFile对象供后续的直接使用。而这个参数是在构造MappedFileQueue对象的时候的一个参数。只有在CommitLog中构造时才会传入AllocateMappedFileService,在ConsumeQueue并没有传入。

方法介绍
构造方法

MappedFileQueue只有一个全参构造器,分别是传入文件的存储路径storePath,单个存储文件的大小mappedFileSize和提前创建MappedFile对象的allocateMappedFileService

public MappedFileQueue(final String storePath, int mappedFileSize,AllocateMappedFileService allocateMappedFileService) {//指定文件的存储路径this.storePath = storePath;//指定单个文件的大小this.mappedFileSize = mappedFileSize;this.allocateMappedFileService = allocateMappedFileService;}
检查文件是否完整checkSelf
   /*** 检查文件的是否完整,检查的方式。上一个文件的起始偏移量减去当前文件的起始偏移量,如果差值=mappedFileSize那么说明文件是完整的,否则有损坏*/public void checkSelf() {//检查文件组是否为空if (!this.mappedFiles.isEmpty()) {//对文件进行迭代,一个一个进行检查Iterator<MappedFile> iterator = mappedFiles.iterator();MappedFile pre = null;while (iterator.hasNext()) {MappedFile cur = iterator.next();if (pre != null) {//用当前文件的其实偏移量-上一个文件的其实偏移量 正常情况下应该等于一个文件的大小。如果不相等,说明文件存在问题if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",pre.getFileName(), cur.getFileName());}}pre = cur;}}}

 这里检查文件是否被破坏的原理,就是检查文件的大小是不是等于前一个文件的起始偏移量和后一个文件的起始偏移量是不是等于文件大小。而这里的起始偏移量又是在MappedFile进行获取的fileFromOffset,而这个值就是我们在构造MappedFile的时候传入的文件名转化得到的

private void init(final String fileName, final int fileSize) throws IOException {//根据文件的名称计算文件其实的偏移量this.fileFromOffset = Long.parseLong(this.file.getName());
}
加载文件load
   public boolean load() {/***System.getProperty("user.home") + File.separator + "store" + File.separator + 文件名* 根据传入的文件保存路径storePath 来获取文件*/File dir = new File(this.storePath);File[] files = dir.listFiles();//文件列表不为空则进行加载if (files != null) {// ascending order//对文件进行排序Arrays.sort(files);for (File file : files) {//队列映射文件的大小不等于设置的文件类型的大小,说明加载到了最后的一个文件  比如 如果是commitLog那么对于的大小应该为1Gif (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, ignore it");return true;}try {//根据文件的路径和文件大小,创建对应的文件映射,然后加入到映射列表中MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}}return true;}

 这里的逻辑比较简单,就是根据传入的文件路径,加载对应的文件夹下面的文件,并创建文件映射,并加入到文件映射列表中去。这个方法在RocketMQ启动的时候回调用,用来加载系统中已经存在的消息日志文件。

根据时间戳获取文件getMappedFileByTime
    public MappedFile getMappedFileByTime(final long timestamp) {//获取所有的文件映射对象MappedFileObject[] mfs = this.copyMappedFiles(0);//为null说明 mappedFiles 中没有MappedFileif (null == mfs)return null;for (int i = 0; i < mfs.length; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//如果文件的最后修改时间大于等于参数时间,说文件在当前传入的时间之后进行修改了,就是需要寻找的文件if (mappedFile.getLastModifiedTimestamp() >= timestamp) {return mappedFile;}}//如果没有找到合适的MappedFile 就用最后一个return (MappedFile) mfs[mfs.length - 1];}

 这个方法主要使用的位置在ConsumeQueue中,在通过时间戳来找文件中的消息的偏移量。

根据偏移量获取文件findMappedFileByOffset
 	public MappedFile findMappedFileByOffset(final long offset) {//根据偏移量来找映射文件,如果没有找到文件的情况下不返回映射文件列表第一个映射文件return findMappedFileByOffset(offset, false);}public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();//如果不存在文件则直接返回nullif (firstMappedFile != null && lastMappedFile != null) {//如果要查找的偏移量offset不在所有的文件偏移量范围内,则打印错误日志if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//(指定Offset-第一个文件的其实偏移量)/文件大小=第几个文件夹int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//获取指定的映射文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//offset在指定的映射文件中,则直接返回对应的映射文件if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//如果按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//如果指定了没有找到文件就返回第一个映射文件,则直接返回第一个映射文件if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}

 如上代码分析中有两个根据偏移量获取映射文件的方法,其中有两个参数的方法是在知道偏移量所指的信息在第一个映射文件中的时候才调用,而调用的这个方法的基本就是写入信息或者刷新信息的时候调用。关于文件的刷新和提交可以看上一篇对MappedFile分析的文章

根据偏移量截断文件truncateDirtyFiles
    public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();/*** 如果   文件的起始偏移量>指定截断偏移量offset  那么整个文件需要删除* 如果   文件的起始偏移量<指定截断偏移量offset<文件的最大偏移量  那么文件中的部分记录需要清除* 如果   文件的最大偏移量<指定截断偏移量offset  那么这个文件不需要进行处理*/for (MappedFile file : this.mappedFiles) {//文件的开始偏移量+文件大小= 文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//当前文件的最大偏移量大于 指定截断位置的偏移量,说明需要截断的位置就是在这个文件中if (fileTailOffset > offset) {//如果文件初始偏移量小于指定的偏移量,说明只需要截断文件中的一部分if (offset >= file.getFileFromOffset()) {//设置映射文件写的位置file.setWrotePosition((int) (offset % this.mappedFileSize));//设置文件commit的位置file.setCommittedPosition((int) (offset % this.mappedFileSize));//设置文件刷新的位置file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//如果文件的起始偏移量也比指定的偏移量大,则说明这个文件整个需要丢弃file.destroy(1000);//需要删除的文件加上这个文件willRemoveFiles.add(file);}}}//删除映射的文件this.deleteExpiredFile(willRemoveFiles);}

 截断文件的方法,跟load方法一样,在RocketMQ启动的时候会使用到,用来删除那些无效的或者损坏的需要删除的消息。

获取最后一个文件getLastMappedFile
    public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;//如果文件队列不为空则获取最后一个文件while (!this.mappedFiles.isEmpty()) {try {//直接获取最后一个映射文件mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

 这个方法的作用基本就是获取最后一个映射文件,然后进行消息的插入,或者获取最大消息偏移量等信息。

根据时间删除过期文件deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {//获取映射文件列表Object[] mfs = this.copyMappedFiles(0);//如果映射文件列表为空直接返回if (null == mfs){return 0;}int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {//对映射文件进行遍历for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//文件最后的修改时间+过期时间= 文件最终能够存活的时间long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;//如果当前时间大于文件能够存活的最大时间,比如 当前是2021-03-18 12:00:00 ,而文件最大存活时间2021-03-18 11:00:00 就需要删除。或者调用方法的时候指定了马上删除if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {//删除文件,就是解除对文件的引用if (mappedFile.destroy(intervalForcibly)) {//要删除的的文件加入到要删除的集合中files.add(mappedFile);//增加计数deleteCount++;//一次性最多删除的人为10if (files.size() >= DELETE_FILES_BATCH_MAX) {break;}//如果删除时间间隔大于0,并且没有循环玩,则睡眠指定的删除间隔时长后在杀出if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {try {Thread.sleep(deleteFilesInterval);} catch (InterruptedException e) {}}} else {break;}} else {//avoid deleting files in the middlebreak;}}}//从文件映射队列中删除对应的文件映射deleteExpiredFile(files);//返回删除的文件个数return deleteCount;}

 这个方法被用在定期删除过去的CommitLog文件,来保证内存空间。

根据偏移量删除文件deleteExpiredFileByOffset
   public int deleteExpiredFileByOffset(long offset, int unitSize) {Object[] mfs = this.copyMappedFiles(0);List<MappedFile> files = new ArrayList<MappedFile>();int deleteCount = 0;if (null != mfs) {int mfsLength = mfs.length - 1;for (int i = 0; i < mfsLength; i++) {boolean destroy;MappedFile mappedFile = (MappedFile) mfs[i];//unitSize是一个文件格式占用的长度 比如ConsumeQueue中一条记录长度为20byte  这里是获取一个文件中最后一条记录的起始偏移量,SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);if (result != null) {//获取文件中最后一条记录的偏移量long maxOffsetInLogicQueue = result.getByteBuffer().getLong();result.release();//如果最大偏移量 < 指定的偏移量,则需要删除destroy = maxOffsetInLogicQueue < offset;if (destroy) {log.info("physic min offset " + offset + ", logics in current mappedFile max offset "+ maxOffsetInLogicQueue + ", delete it");}} else if (!mappedFile.isAvailable()) { // Handle hanged file.log.warn("Found a hanged consume queue file, attempting to delete it.");destroy = true;} else {log.warn("this being not executed forever.");break;}//删除文件if (destroy && mappedFile.destroy(1000 * 60)) {files.add(mappedFile);deleteCount++;} else {break;}}}//  删除映射文件队列中的映射文件=》deleteExpiredFile(files);return deleteCount;}

 按照偏移量删除文件用于删除过期的ConsumeQueue文件,因为ConsumeQueue文件中信息的记录是定长的20byte,如果偏移量小于指定的偏移量表示都是之前的消息,可以直接删除。

其他跟MappedFile有关联的方法
MappedFileMappedFileQueue
flushflush
commitcommit
destroydestroy
getFileFromOffset获取文件的初始偏移量getMinOffset获取文件的最小偏移量,就是获取映射文件队列的第一个文件,然后调用getFileFromOffset
getFileFromOffset+getReadPositiongetMaxOffset获取文件最大偏移量,就是获取最后一个映射文件的起始偏移量+文件的写入的位置
getFileFromOffset+getWrotePositiongetMaxWrotePosition获取文件最大偏移量,就是获取最后一个映射文件的起始偏移量+文件的写入的位置
remainHowManyDataToCommit获取文件尚未提交的长度
remainHowManyDataToFlush获取文件尚未刷新的长度

CommitLogConsumeQueue中的使用

方法CommitLogConsumeQueue
checkSelf定时检查文件是否完整定时检查文件是否完整
loadMQ启动时加载CommitLogMQ启动时加载ConsumeQueue
getMappedFileByTime根据时间戳查找特定的topic和queue中的消息
findMappedFileByOffset根据index获取消息
truncateDirtyFilesMQ启动时截断无用日志MQ启动时截断无用日志
getLastMappedFile保存消息时获取文件保存消息时获取文件
deleteExpiredFileByTime定时删除过期的文件
deleteExpiredFileByOffset定时删除过期的文件

 可以看到,MappedFileQueue类中的方法基本是操作MappedFile组成的集合,间接的操作MappedFile达到对日志文件组的增删改的操作,都是一些提供给CommitLogConsumeQueue用来对日志文件进行查找,删除的基础方法。

下一篇存储部分(3)CommitLog文件存储加载刷新的CommitLog

这篇关于RocketMQ源码解析——存储部分(2)对`MappedFile`进一步封装的`MappedFileQueue`的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/783325

相关文章

nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析(结合应用场景)

《nginx-t、nginx-sstop和nginx-sreload命令的详细解析(结合应用场景)》本文解析Nginx的-t、-sstop、-sreload命令,分别用于配置语法检... 以下是关于 nginx -t、nginx -s stop 和 nginx -s reload 命令的详细解析,结合实际应

SpringBoot3.X 整合 MinIO 存储原生方案

《SpringBoot3.X整合MinIO存储原生方案》本文详细介绍了SpringBoot3.X整合MinIO的原生方案,从环境搭建到核心功能实现,涵盖了文件上传、下载、删除等常用操作,并补充了... 目录SpringBoot3.X整合MinIO存储原生方案:从环境搭建到实战开发一、前言:为什么选择MinI

MyBatis中$与#的区别解析

《MyBatis中$与#的区别解析》文章浏览阅读314次,点赞4次,收藏6次。MyBatis使用#{}作为参数占位符时,会创建预处理语句(PreparedStatement),并将参数值作为预处理语句... 目录一、介绍二、sql注入风险实例一、介绍#(井号):MyBATis使用#{}作为参数占位符时,会

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

深度解析Java项目中包和包之间的联系

《深度解析Java项目中包和包之间的联系》文章浏览阅读850次,点赞13次,收藏8次。本文详细介绍了Java分层架构中的几个关键包:DTO、Controller、Service和Mapper。_jav... 目录前言一、各大包1.DTO1.1、DTO的核心用途1.2. DTO与实体类(Entity)的区别1

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

Java中调用数据库存储过程的示例代码

《Java中调用数据库存储过程的示例代码》本文介绍Java通过JDBC调用数据库存储过程的方法,涵盖参数类型、执行步骤及数据库差异,需注意异常处理与资源管理,以优化性能并实现复杂业务逻辑,感兴趣的朋友... 目录一、存储过程概述二、Java调用存储过程的基本javascript步骤三、Java调用存储过程示

MySQL之InnoDB存储引擎中的索引用法及说明

《MySQL之InnoDB存储引擎中的索引用法及说明》:本文主要介绍MySQL之InnoDB存储引擎中的索引用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录1、背景2、准备3、正篇【1】存储用户记录的数据页【2】存储目录项记录的数据页【3】聚簇索引【4】二