RocketMQ源码解析——存储部分(2)对`MappedFile`进一步封装的`MappedFileQueue`

本文主要是介绍RocketMQ源码解析——存储部分(2)对`MappedFile`进一步封装的`MappedFileQueue`,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

      • `MappedFileQueue`
        • 属性介绍
        • 方法介绍
          • 构造方法
        • 检查文件是否完整`checkSelf`
        • 加载文件`load`
        • 根据时间戳获取文件`getMappedFileByTime`
        • 根据偏移量获取文件`findMappedFileByOffset`
        • 根据偏移量截断文件`truncateDirtyFiles`
        • 获取最后一个文件`getLastMappedFile`
        • 根据时间删除过期文件`deleteExpiredFileByTime`
        • 根据偏移量删除文件`deleteExpiredFileByOffset`
        • 其他跟`MappedFile`有关联的方法
      • 在`CommitLog`和`ConsumeQueue`中的使用

MappedFileQueue

 前面已经介绍了RocketMQ跟存储交互的底层封装对象mappedFile。而跟CommitLog,ConsumeQueue进行交互的并不是mappedFile,而是对其进一步封装的MappedFileQueue类。
在这里插入图片描述

属性介绍
	//文件的存储路径private final String storePath;//映射文件大小,指的是单个文件的大小,比如CommitLog大小为1Gprivate final int mappedFileSize;//并发线程安全队列存储映射文件private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();private final AllocateMappedFileService allocateMappedFileService;//刷新完的位置private long flushedWhere = 0;//提交完成的位置private long committedWhere = 0;//存储时间private volatile long storeTimestamp = 0;

MappedFileQueue这个类的属性相对来说比较少,其中需要说的是,AllocateMappedFileService类型的字段,这个对象的作用是根据情况来决定是否需要提前创建好MappedFile对象供后续的直接使用。而这个参数是在构造MappedFileQueue对象的时候的一个参数。只有在CommitLog中构造时才会传入AllocateMappedFileService,在ConsumeQueue并没有传入。

方法介绍
构造方法

MappedFileQueue只有一个全参构造器,分别是传入文件的存储路径storePath,单个存储文件的大小mappedFileSize和提前创建MappedFile对象的allocateMappedFileService

public MappedFileQueue(final String storePath, int mappedFileSize,AllocateMappedFileService allocateMappedFileService) {//指定文件的存储路径this.storePath = storePath;//指定单个文件的大小this.mappedFileSize = mappedFileSize;this.allocateMappedFileService = allocateMappedFileService;}
检查文件是否完整checkSelf
   /*** 检查文件的是否完整,检查的方式。上一个文件的起始偏移量减去当前文件的起始偏移量,如果差值=mappedFileSize那么说明文件是完整的,否则有损坏*/public void checkSelf() {//检查文件组是否为空if (!this.mappedFiles.isEmpty()) {//对文件进行迭代,一个一个进行检查Iterator<MappedFile> iterator = mappedFiles.iterator();MappedFile pre = null;while (iterator.hasNext()) {MappedFile cur = iterator.next();if (pre != null) {//用当前文件的其实偏移量-上一个文件的其实偏移量 正常情况下应该等于一个文件的大小。如果不相等,说明文件存在问题if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",pre.getFileName(), cur.getFileName());}}pre = cur;}}}

 这里检查文件是否被破坏的原理,就是检查文件的大小是不是等于前一个文件的起始偏移量和后一个文件的起始偏移量是不是等于文件大小。而这里的起始偏移量又是在MappedFile进行获取的fileFromOffset,而这个值就是我们在构造MappedFile的时候传入的文件名转化得到的

private void init(final String fileName, final int fileSize) throws IOException {//根据文件的名称计算文件其实的偏移量this.fileFromOffset = Long.parseLong(this.file.getName());
}
加载文件load
   public boolean load() {/***System.getProperty("user.home") + File.separator + "store" + File.separator + 文件名* 根据传入的文件保存路径storePath 来获取文件*/File dir = new File(this.storePath);File[] files = dir.listFiles();//文件列表不为空则进行加载if (files != null) {// ascending order//对文件进行排序Arrays.sort(files);for (File file : files) {//队列映射文件的大小不等于设置的文件类型的大小,说明加载到了最后的一个文件  比如 如果是commitLog那么对于的大小应该为1Gif (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, ignore it");return true;}try {//根据文件的路径和文件大小,创建对应的文件映射,然后加入到映射列表中MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}}return true;}

 这里的逻辑比较简单,就是根据传入的文件路径,加载对应的文件夹下面的文件,并创建文件映射,并加入到文件映射列表中去。这个方法在RocketMQ启动的时候回调用,用来加载系统中已经存在的消息日志文件。

根据时间戳获取文件getMappedFileByTime
    public MappedFile getMappedFileByTime(final long timestamp) {//获取所有的文件映射对象MappedFileObject[] mfs = this.copyMappedFiles(0);//为null说明 mappedFiles 中没有MappedFileif (null == mfs)return null;for (int i = 0; i < mfs.length; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//如果文件的最后修改时间大于等于参数时间,说文件在当前传入的时间之后进行修改了,就是需要寻找的文件if (mappedFile.getLastModifiedTimestamp() >= timestamp) {return mappedFile;}}//如果没有找到合适的MappedFile 就用最后一个return (MappedFile) mfs[mfs.length - 1];}

 这个方法主要使用的位置在ConsumeQueue中,在通过时间戳来找文件中的消息的偏移量。

根据偏移量获取文件findMappedFileByOffset
 	public MappedFile findMappedFileByOffset(final long offset) {//根据偏移量来找映射文件,如果没有找到文件的情况下不返回映射文件列表第一个映射文件return findMappedFileByOffset(offset, false);}public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();//如果不存在文件则直接返回nullif (firstMappedFile != null && lastMappedFile != null) {//如果要查找的偏移量offset不在所有的文件偏移量范围内,则打印错误日志if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//(指定Offset-第一个文件的其实偏移量)/文件大小=第几个文件夹int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//获取指定的映射文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//offset在指定的映射文件中,则直接返回对应的映射文件if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//如果按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//如果指定了没有找到文件就返回第一个映射文件,则直接返回第一个映射文件if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}

 如上代码分析中有两个根据偏移量获取映射文件的方法,其中有两个参数的方法是在知道偏移量所指的信息在第一个映射文件中的时候才调用,而调用的这个方法的基本就是写入信息或者刷新信息的时候调用。关于文件的刷新和提交可以看上一篇对MappedFile分析的文章

根据偏移量截断文件truncateDirtyFiles
    public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();/*** 如果   文件的起始偏移量>指定截断偏移量offset  那么整个文件需要删除* 如果   文件的起始偏移量<指定截断偏移量offset<文件的最大偏移量  那么文件中的部分记录需要清除* 如果   文件的最大偏移量<指定截断偏移量offset  那么这个文件不需要进行处理*/for (MappedFile file : this.mappedFiles) {//文件的开始偏移量+文件大小= 文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//当前文件的最大偏移量大于 指定截断位置的偏移量,说明需要截断的位置就是在这个文件中if (fileTailOffset > offset) {//如果文件初始偏移量小于指定的偏移量,说明只需要截断文件中的一部分if (offset >= file.getFileFromOffset()) {//设置映射文件写的位置file.setWrotePosition((int) (offset % this.mappedFileSize));//设置文件commit的位置file.setCommittedPosition((int) (offset % this.mappedFileSize));//设置文件刷新的位置file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//如果文件的起始偏移量也比指定的偏移量大,则说明这个文件整个需要丢弃file.destroy(1000);//需要删除的文件加上这个文件willRemoveFiles.add(file);}}}//删除映射的文件this.deleteExpiredFile(willRemoveFiles);}

 截断文件的方法,跟load方法一样,在RocketMQ启动的时候会使用到,用来删除那些无效的或者损坏的需要删除的消息。

获取最后一个文件getLastMappedFile
    public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;//如果文件队列不为空则获取最后一个文件while (!this.mappedFiles.isEmpty()) {try {//直接获取最后一个映射文件mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

 这个方法的作用基本就是获取最后一个映射文件,然后进行消息的插入,或者获取最大消息偏移量等信息。

根据时间删除过期文件deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {//获取映射文件列表Object[] mfs = this.copyMappedFiles(0);//如果映射文件列表为空直接返回if (null == mfs){return 0;}int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {//对映射文件进行遍历for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];//文件最后的修改时间+过期时间= 文件最终能够存活的时间long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;//如果当前时间大于文件能够存活的最大时间,比如 当前是2021-03-18 12:00:00 ,而文件最大存活时间2021-03-18 11:00:00 就需要删除。或者调用方法的时候指定了马上删除if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {//删除文件,就是解除对文件的引用if (mappedFile.destroy(intervalForcibly)) {//要删除的的文件加入到要删除的集合中files.add(mappedFile);//增加计数deleteCount++;//一次性最多删除的人为10if (files.size() >= DELETE_FILES_BATCH_MAX) {break;}//如果删除时间间隔大于0,并且没有循环玩,则睡眠指定的删除间隔时长后在杀出if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {try {Thread.sleep(deleteFilesInterval);} catch (InterruptedException e) {}}} else {break;}} else {//avoid deleting files in the middlebreak;}}}//从文件映射队列中删除对应的文件映射deleteExpiredFile(files);//返回删除的文件个数return deleteCount;}

 这个方法被用在定期删除过去的CommitLog文件,来保证内存空间。

根据偏移量删除文件deleteExpiredFileByOffset
   public int deleteExpiredFileByOffset(long offset, int unitSize) {Object[] mfs = this.copyMappedFiles(0);List<MappedFile> files = new ArrayList<MappedFile>();int deleteCount = 0;if (null != mfs) {int mfsLength = mfs.length - 1;for (int i = 0; i < mfsLength; i++) {boolean destroy;MappedFile mappedFile = (MappedFile) mfs[i];//unitSize是一个文件格式占用的长度 比如ConsumeQueue中一条记录长度为20byte  这里是获取一个文件中最后一条记录的起始偏移量,SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);if (result != null) {//获取文件中最后一条记录的偏移量long maxOffsetInLogicQueue = result.getByteBuffer().getLong();result.release();//如果最大偏移量 < 指定的偏移量,则需要删除destroy = maxOffsetInLogicQueue < offset;if (destroy) {log.info("physic min offset " + offset + ", logics in current mappedFile max offset "+ maxOffsetInLogicQueue + ", delete it");}} else if (!mappedFile.isAvailable()) { // Handle hanged file.log.warn("Found a hanged consume queue file, attempting to delete it.");destroy = true;} else {log.warn("this being not executed forever.");break;}//删除文件if (destroy && mappedFile.destroy(1000 * 60)) {files.add(mappedFile);deleteCount++;} else {break;}}}//  删除映射文件队列中的映射文件=》deleteExpiredFile(files);return deleteCount;}

 按照偏移量删除文件用于删除过期的ConsumeQueue文件,因为ConsumeQueue文件中信息的记录是定长的20byte,如果偏移量小于指定的偏移量表示都是之前的消息,可以直接删除。

其他跟MappedFile有关联的方法
MappedFileMappedFileQueue
flushflush
commitcommit
destroydestroy
getFileFromOffset获取文件的初始偏移量getMinOffset获取文件的最小偏移量,就是获取映射文件队列的第一个文件,然后调用getFileFromOffset
getFileFromOffset+getReadPositiongetMaxOffset获取文件最大偏移量,就是获取最后一个映射文件的起始偏移量+文件的写入的位置
getFileFromOffset+getWrotePositiongetMaxWrotePosition获取文件最大偏移量,就是获取最后一个映射文件的起始偏移量+文件的写入的位置
remainHowManyDataToCommit获取文件尚未提交的长度
remainHowManyDataToFlush获取文件尚未刷新的长度

CommitLogConsumeQueue中的使用

方法CommitLogConsumeQueue
checkSelf定时检查文件是否完整定时检查文件是否完整
loadMQ启动时加载CommitLogMQ启动时加载ConsumeQueue
getMappedFileByTime根据时间戳查找特定的topic和queue中的消息
findMappedFileByOffset根据index获取消息
truncateDirtyFilesMQ启动时截断无用日志MQ启动时截断无用日志
getLastMappedFile保存消息时获取文件保存消息时获取文件
deleteExpiredFileByTime定时删除过期的文件
deleteExpiredFileByOffset定时删除过期的文件

 可以看到,MappedFileQueue类中的方法基本是操作MappedFile组成的集合,间接的操作MappedFile达到对日志文件组的增删改的操作,都是一些提供给CommitLogConsumeQueue用来对日志文件进行查找,删除的基础方法。

下一篇存储部分(3)CommitLog文件存储加载刷新的CommitLog

这篇关于RocketMQ源码解析——存储部分(2)对`MappedFile`进一步封装的`MappedFileQueue`的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/783325

相关文章

Agent开发核心技术解析以及现代Agent架构设计

《Agent开发核心技术解析以及现代Agent架构设计》在人工智能领域,Agent并非一个全新的概念,但在大模型时代,它被赋予了全新的生命力,简单来说,Agent是一个能够自主感知环境、理解任务、制定... 目录一、回归本源:到底什么是Agent?二、核心链路拆解:Agent的"大脑"与"四肢"1. 规划模

MySQL字符串转数值的方法全解析

《MySQL字符串转数值的方法全解析》在MySQL开发中,字符串与数值的转换是高频操作,本文从隐式转换原理、显式转换方法、典型场景案例、风险防控四个维度系统梳理,助您精准掌握这一核心技能,需要的朋友可... 目录一、隐式转换:自动但需警惕的&ld编程quo;双刃剑”二、显式转换:三大核心方法详解三、典型场景

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

SQL 注入攻击(SQL Injection)原理、利用方式与防御策略深度解析

《SQL注入攻击(SQLInjection)原理、利用方式与防御策略深度解析》本文将从SQL注入的基本原理、攻击方式、常见利用手法,到企业级防御方案进行全面讲解,以帮助开发者和安全人员更系统地理解... 目录一、前言二、SQL 注入攻击的基本概念三、SQL 注入常见类型分析1. 基于错误回显的注入(Erro

C++ 多态性实战之何时使用 virtual 和 override的问题解析

《C++多态性实战之何时使用virtual和override的问题解析》在面向对象编程中,多态是一个核心概念,很多开发者在遇到override编译错误时,不清楚是否需要将基类函数声明为virt... 目录C++ 多态性实战:何时使用 virtual 和 override?引言问题场景判断是否需要多态的三个关

Springboot主配置文件解析

《Springboot主配置文件解析》SpringBoot主配置文件application.yml支持多种核心值类型,包括字符串、数字、布尔值等,文章详细介绍了Profile环境配置和加载位置,本文... 目录Profile环境配置配置文件加载位置Springboot主配置文件 application.ym

MySQL中存储过程(procedure)的使用及说明

《MySQL中存储过程(procedure)的使用及说明》存储过程是预先定义的SQL语句集合,可在数据库中重复调用,它们提供事务性、高效性和安全性,MySQL和Java中均可创建和调用存储过程,示例展... 目录概念示例1示例2总结概念存储过程:在数据库中预先定义好一组SQL语句,可以被程序反复调用。

MySQL存储过程实践(in、out、inout)

《MySQL存储过程实践(in、out、inout)》文章介绍了数据库中的存储过程,包括其定义、优缺点、性能调校与撰写,以及创建和调用方法,还详细说明了存储过程的参数类型,包括IN、OUT和INOUT... 目录简述存储过程存储过程的优缺点优点缺点存储过程的创建和调用mysql 存储过程中的关键语法案例存储

Java中Redisson 的原理深度解析

《Java中Redisson的原理深度解析》Redisson是一个高性能的Redis客户端,它通过将Redis数据结构映射为Java对象和分布式对象,实现了在Java应用中方便地使用Redis,本文... 目录前言一、核心设计理念二、核心架构与通信层1. 基于 Netty 的异步非阻塞通信2. 编解码器三、

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node