# 消息中间件 RocketMQ 高级功能和源码分析(八)

2024-06-23 04:28

本文主要是介绍# 消息中间件 RocketMQ 高级功能和源码分析(八),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

消息中间件 RocketMQ 高级功能和源码分析(八)

一、消息中间件 RocketMQ 源码分析:实时更新消息消费队列与索引文件流程说明

1、实时更新消息消费队列与索引文件

消息消费队文件、消息属性索引文件都是基于 CommitLog 文件构建的,当消息生产者提交的消息存储在 CommitLog 文件中,ConsumerQueue、IndexFile 需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准实时转发 CommitLog 文件更新事件,相应的任务处理器根据转发的消息及时更新 ConsumerQueue、IndexFile 文件。

2、消息存储结构 示例图:

在这里插入图片描述

3、构建消息消费队列和索引文件 示例图:

在这里插入图片描述

4、 代码:DefaultMessageStore:start


//设置CommitLog内存中最大偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动
this.reputMessageService.start();

5、 代码:DefaultMessageStore:run


public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");//每隔1毫秒就继续尝试推送消息到消息消费队列和索引文件while (!this.isStopped()) {try {Thread.sleep(1);this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

6、 代码:DefaultMessageStore:deReput


//从result中循环遍历消息,一次读一条,创建DispatherRequest对象。
for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =                               DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();if (dispatchRequest.isSuccess()) {if (size > 0) {DefaultMessageStore.this.doDispatch(dispatchRequest);}}
}

7、 DispatchRequest

在这里插入图片描述


String topic; //消息主题名称
int queueId;  //消息队列ID
long commitLogOffset;	//消息物理偏移量
int msgSize;	//消息长度
long tagsCode;	//消息过滤tag hashCode
long storeTimestamp;	//消息存储时间戳
long consumeQueueOffset;	//消息队列偏移量
String keys;	//消息索引key
boolean success;	//是否成功解析到完整的消息
String uniqKey;	//消息唯一键
int sysFlag;	//消息系统标记
long preparedTransactionOffset;	//消息预处理事务偏移量
Map<String, String> propertiesMap;	//消息属性
byte[] bitMap;	//位图

二、消息中间件 RocketMQ 源码分析:转发数据到 ConsumerQueue 文件

1、转发到 ConsumerQueue 消息分发到消息消费队列 示例图:

在这里插入图片描述

2、 代码 CommitLogDispatcherBuildConsumeQueue 类:


class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE://消息分发DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;}}
}

3、 代码:DefaultMessageStore#putMessagePositionInfo


public void putMessagePositionInfo(DispatchRequest dispatchRequest) {//获得消费队列ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());//消费队列分发消息cq.putMessagePositionInfoWrapper(dispatchRequest);
}

4、 代码:DefaultMessageStore#putMessagePositionInfo


//依次将消息偏移量、消息长度、tag写入到ByteBuffer中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
//获得内存映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {//将消息追加到内存映射文件,异步输盘return mappedFile.appendMessage(this.byteBufferIndex.array());
}

三、消息中间件 RocketMQ 源码分析:转发 IndexFile 文件

1、转发到 Index 消息分发到索引文件 示例图:

在这里插入图片描述

2、 代码 CommitLogDispatcherBuildIndex 类:


class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {@Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {DefaultMessageStore.this.indexService.buildIndex(request);}}
}

3、 代码:DefaultMessageStore#buildIndex


public void buildIndex(DispatchRequest req) {//获得索引文件IndexFile indexFile = retryGetAndCreateIndexFile();if (indexFile != null) {//获得文件最大物理偏移量long endPhyOffset = indexFile.getEndPhyOffset();DispatchRequest msg = req;String topic = msg.getTopic();String keys = msg.getKeys();//如果该消息的物理偏移量小于索引文件中的最大物理偏移量,则说明是重复数据,忽略本次索引构建if (msg.getCommitLogOffset() < endPhyOffset) {return;}final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:break;case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:return;}//如果消息ID不为空,则添加到Hash索引中if (req.getUniqKey() != null) {indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));if (indexFile == null) {return;}}//构建索引key,RocketMQ支持为同一个消息建立多个索引,多个索引键空格隔开.if (keys != null && keys.length() > 0) {String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);for (int i = 0; i < keyset.length; i++) {String key = keyset[i];if (key.length() > 0) {indexFile = putKey(indexFile, msg, buildKey(topic, key));if (indexFile == null) {return;}}}}} else {log.error("build index error, stop building index");}
}

四、消息中间件 RocketMQ 源码分析:消息队列和索引文件恢复

1、消息队列和索引文件恢复

由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumerQueue 和 Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器 Broker 由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile 文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在 CommitLog 中文件中存在,但由于没有转发到 ConsumerQueue,这部分消息将永远复发被消费者消费。

2、文件恢复总体流程 示例图:

在这里插入图片描述

3、存储文件加载

代码:DefaultMessageStore#load

判断上一次是否异常退出。实现机制是 Broker 在启动时创建 abort 文件,在退出时通过 JVM 钩子函数删除 abort 文件。如果下次启动时存在 abort 文件。说明 Broker 时异常退出的,CommitLog 与 ConsumerQueue 数据有可能不一致,需要进行修复。


//判断临时文件是否存在
boolean lastExitOK = !this.isTempFileExist();
//根据临时文件判断当前Broker是否异常退出
private boolean isTempFileExist() {String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());File file = new File(fileName);return file.exists();
}

4、 代码:DefaultMessageStore#load


//加载延时队列
if (null != scheduleMessageService) {result = result && this.scheduleMessageService.load();
}// 加载CommitLog文件
result = result && this.commitLog.load();// 加载消费队列文件
result = result && this.loadConsumeQueue();if (result) {//加载存储监测点,监测点主要记录CommitLog文件、ConsumerQueue文件、Index索引文件的刷盘点this.storeCheckpoint =new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));//加载index文件this.indexService.load(lastExitOK);//根据Broker是否异常退出,执行不同的恢复策略this.recover(lastExitOK);
}

5、 代码:MappedFileQueue#load

加载 CommitLog 到映射文件


//指向CommitLog文件目录
File dir = new File(this.storePath);
//获得文件数组
File[] files = dir.listFiles();
if (files != null) {// 文件排序Arrays.sort(files);//遍历文件for (File file : files) {//如果文件大小和配置文件不一致,退出if (file.length() != this.mappedFileSize) {return false;}try {//创建映射文件MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);//将映射文件添加到队列this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}
}return true;

6、 代码:DefaultMessageStore#loadConsumeQueue

加载消息消费队列


//执行消费队列目录
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//遍历消费队列目录
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {for (File fileTopic : fileTopicList) {//获得子目录名称,即topic名称String topic = fileTopic.getName();//遍历子目录下的消费队列文件File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {//遍历文件for (File fileQueueId : fileQueueIdList) {//文件名称即队列IDint queueId;try {queueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}//创建消费队列并加载到内存ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);this.putConsumeQueue(topic, queueId, logic);if (!logic.load()) {return false;}}}}
}log.info("load logics queue all over, OK");return true;

7、 代码:IndexService#load

加载索引文件


public boolean load(final boolean lastExitOK) {//索引文件目录File dir = new File(this.storePath);//遍历索引文件File[] files = dir.listFiles();if (files != null) {//文件排序Arrays.sort(files);//遍历文件for (File file : files) {try {//加载索引文件IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);f.load();if (!lastExitOK) {//索引文件上次的刷盘时间小于该索引文件的消息时间戳,该文件将立即删除if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint().getIndexMsgTimestamp()) {f.destroy(0);continue;}}//将索引文件添加到队列log.info("load index file OK, " + f.getFileName());this.indexFileList.add(f);} catch (IOException e) {log.error("load file {} error", file, e);return false;} catch (NumberFormatException e) {log.error("load file {} error", file, e);}}}return true;
}

8、 代码:DefaultMessageStore#recover

文件恢复,根据 Broker 是否正常退出执行不同的恢复策略


private void recover(final boolean lastExitOK) {//获得最大的物理便宜消费队列long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();if (lastExitOK) {//正常恢复this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {//异常恢复this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}//在CommitLog中保存每个消息消费队列当前的存储逻辑偏移量this.recoverTopicQueueTable();
}

9、 代码:DefaultMessageStore#recoverTopicQueueTable

恢复 ConsumerQueue 后,将在 CommitLog 实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列 ID、还存储了消息队列的关键所在。


public void recoverTopicQueueTable() {HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);//CommitLog最小偏移量long minPhyOffset = this.commitLog.getMinOffset();//遍历消费队列,将消费队列保存在CommitLog中for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key = logic.getTopic() + "-" + logic.getQueueId();table.put(key, logic.getMaxOffsetInQueue());logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table);
}

五、消息中间件 RocketMQ 源码分析:正常恢复和异常恢复

1、正常恢复

代码:CommitLog#recoverNormally


public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {//Broker正常停止再重启时,从倒数第三个开始恢复,如果不足3个文件,则从第一个文件开始恢复。int index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();//代表当前已校验通过的offsetlong mappedFileOffset = 0;while (true) {//查找消息DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);//消息长度int size = dispatchRequest.getMsgSize();//查找结果为true,并且消息长度大于0,表示消息正确.mappedFileOffset向前移动本消息长度if (dispatchRequest.isSuccess() && size > 0) {mappedFileOffset += size;}//如果查找结果为true且消息长度等于0,表示已到该文件末尾,如果还有下一个文件,则重置processOffset和MappedFileOffset重复查找下一个文件,否则跳出循环。else if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenbreak;} else {//取出每个文件mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;}}// 查找结果为false,表明该文件未填满所有消息,跳出循环,结束循环else if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}//更新MappedFileQueue的flushedWhere和committedWhere指针processOffset += mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);//删除offset之后的所有文件this.mappedFileQueue.truncateDirtyFiles(processOffset);if (maxPhyOffsetOfConsumeQueue >= processOffset) {this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}} else {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}

2、 代码:MappedFileQueue#truncateDirtyFiles


public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();//遍历目录下文件for (MappedFile file : this.mappedFiles) {//文件尾部的偏移量long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//文件尾部的偏移量大于offsetif (fileTailOffset > offset) {//offset大于文件的起始偏移量if (offset >= file.getFileFromOffset()) {//更新wrotePosition、committedPosition、flushedPosistionfile.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,释放mappedFile占用内存,删除文件file.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles);
}

3、异常恢复

Broker 异常停止文件恢复的实现为 CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果 CommitLog 目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。

代码:CommitLog#recoverAbnormally


if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which fileint index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);//判断消息文件是否是一个正确的文件if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}//根据索引取出mappedFile文件if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}//...验证消息的合法性,并将消息转发到消息消费队列和索引文件}else{//未找到mappedFile,重置flushWhere、committedWhere都为0,销毁消息队列文件this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();
}

上一节关联链接请点击:
# 消息中间件 RocketMQ 高级功能和源码分析(七)

这篇关于# 消息中间件 RocketMQ 高级功能和源码分析(八)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1086201

相关文章

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE

MySQL中的表连接原理分析

《MySQL中的表连接原理分析》:本文主要介绍MySQL中的表连接原理分析,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、环境3、表连接原理【1】驱动表和被驱动表【2】内连接【3】外连接【4编程】嵌套循环连接【5】join buffer4、总结1、背景

Golang如何用gorm实现分页的功能

《Golang如何用gorm实现分页的功能》:本文主要介绍Golang如何用gorm实现分页的功能方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录背景go库下载初始化数据【1】建表【2】插入数据【3】查看数据4、代码示例【1】gorm结构体定义【2】分页结构体

Java中的for循环高级用法

《Java中的for循环高级用法》本文系统解析Java中传统、增强型for循环、StreamAPI及并行流的实现原理与性能差异,并通过大量代码示例展示实际开发中的最佳实践,感兴趣的朋友一起看看吧... 目录前言一、基础篇:传统for循环1.1 标准语法结构1.2 典型应用场景二、进阶篇:增强型for循环2.

python中Hash使用场景分析

《python中Hash使用场景分析》Python的hash()函数用于获取对象哈希值,常用于字典和集合,不可变类型可哈希,可变类型不可,常见算法包括除法、乘法、平方取中和随机数哈希,各有优缺点,需根... 目录python中的 Hash除法哈希算法乘法哈希算法平方取中法随机数哈希算法小结在Python中,

Java Stream的distinct去重原理分析

《JavaStream的distinct去重原理分析》Javastream中的distinct方法用于去除流中的重复元素,它返回一个包含过滤后唯一元素的新流,该方法会根据元素的hashcode和eq... 目录一、distinct 的基础用法与核心特性二、distinct 的底层实现原理1. 顺序流中的去重

Java Web实现类似Excel表格锁定功能实战教程

《JavaWeb实现类似Excel表格锁定功能实战教程》本文将详细介绍通过创建特定div元素并利用CSS布局和JavaScript事件监听来实现类似Excel的锁定行和列效果的方法,感兴趣的朋友跟随... 目录1. 模拟Excel表格锁定功能2. 创建3个div元素实现表格锁定2.1 div元素布局设计2.

使用Python进行GRPC和Dubbo协议的高级测试

《使用Python进行GRPC和Dubbo协议的高级测试》GRPC(GoogleRemoteProcedureCall)是一种高性能、开源的远程过程调用(RPC)框架,Dubbo是一种高性能的分布式服... 目录01 GRPC测试安装gRPC编写.proto文件实现服务02 Dubbo测试1. 安装Dubb

HTML5实现的移动端购物车自动结算功能示例代码

《HTML5实现的移动端购物车自动结算功能示例代码》本文介绍HTML5实现移动端购物车自动结算,通过WebStorage、事件监听、DOM操作等技术,确保实时更新与数据同步,优化性能及无障碍性,提升用... 目录1. 移动端购物车自动结算概述2. 数据存储与状态保存机制2.1 浏览器端的数据存储方式2.1.

基于 HTML5 Canvas 实现图片旋转与下载功能(完整代码展示)

《基于HTML5Canvas实现图片旋转与下载功能(完整代码展示)》本文将深入剖析一段基于HTML5Canvas的代码,该代码实现了图片的旋转(90度和180度)以及旋转后图片的下载... 目录一、引言二、html 结构分析三、css 样式分析四、JavaScript 功能实现一、引言在 Web 开发中,