RocketMQ之 CommitLog

2023-10-19 21:30
文章标签 rocketmq commitlog

本文主要是介绍RocketMQ之 CommitLog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 消息存储概述

RocketMQ 的存储文件,放在 ${ROCKET_HOME}/store 目录下。

当生产者发送消息时,broker 会将消息存储到 commit 文件下,然后再异步的转存到 consumeQueue 以及 indexFile

commitlog
消息主体以及元数据的存储主体。Producer 发送的消息就存放在 commitlog 里面.

consumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。

indexFile
IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

下图解释了,当写入 commitlog 时。commitlog, consumeQueue, indexFile 3 者的关系。
在这里插入图片描述

2 认识 commitLog

commitlogRocketMQ 用于存储消息的文件. commitlog 具有以下特征

  1. 顺序写入,随机读写。
  2. 消息只要被写入 commitlog 那么该消息就不会丢失
  3. 消息是非定长的
  4. 每个文件大小默认 1GB
  5. 文件的命名是以 commitlog 起始偏移量命名的

下面列出,commitlog 在磁盘上的表现
在这里插入图片描述

3 commitLog 写入流程

在这里插入图片描述

这个流程中,我们只需要关注 2 个点。

  1. appendMessage()
  2. 处理刷盘

appendMessage() 处理的逻辑是将消息写入堆外内存中,这里不过多关注。读者感兴趣可以自行查看源码,源码的位置 DefaultAppendMessageCallback#doAppend()

4 刷盘

4.1 同步刷盘

从使用者角度来看的同步刷盘流程

从程序角度来看的同步刷盘流程
在这里插入图片描述

  1. CommitLog 将消息封装成 GroupCommitRequest, 并放入队列中。
  2. 唤醒 GroupCommitServiceThread 线程
  3. GroupCommitServiceThreadList 中获取 Request 并消费
  4. 调用 MappedFile 将数据刷到磁盘上
  5. 刷盘后,唤醒 CommitLog 所在线程
4.2 异步刷盘

从使用者角度来看的异步刷盘流程

transientStorePoolEnable 关闭时, 即 FlushRealTimeService 刷盘逻辑

基本流程:
在这里插入图片描述

代码位置:
CommitLog$FlushRealTimeService#run()

  1. 每隔 500ms 执行一次 flush()
  2. 每次刷盘时,需要凑够至少 4 页。当 2 次刷盘的时间间隔大于 10s 时,则忽略该条件,直接执行 flush()
  3. 刷完盘之后,记录 checkPoint(刷盘点)

可配置参数

# false 使用 AQS 等待; true 使用 sleep 等待
flushCommitLogTimed=false
# 每隔 500ms 刷盘
flushIntervalCommitLog=500
# 每次刷盘时,至少4页
flushCommitLogLeastPages=4
# 间隔多久未刷盘,会强制刷盘
flushCommitLogThoroughInterval=1000*10

transientStorePoolEnable 开启时。RocketMQ 会通过 CommitRealTimeService 线程调用 FileChannel#write() 方法,将消息写入 cache. 然后唤醒 FlushRealTimeService 线程,由该线程执行 flush()

基本流程
在这里插入图片描述

代码位置
CommitLog$CommitRealTimeService#run()

  1. 每隔 200ms 执行一次 write()
  2. 每次执行 write() 时,需要凑够至少 4 页。当 2 次 write 的时间间隔大于 200ms,则忽略该条件,直接执行 write()
  3. 调用 FileChannel#write()
  4. 唤醒 FlushRealTimeService 线程
  5. 由 FlushRealTimeService 执行 flush
# 每个 200ms 执行 write
commitIntervalCommitLog=200
# 每次执行 write 时,至少4页
commitCommitLogLeastPages
# 间隔多久未刷盘,会强制刷盘
commitCommitLogThoroughInterval=200
4.3 总结

RocketMQ 提供了 2 种刷盘方式, 可通过 flushDiskType 进行配置

  1. flushDiskType=SYNC_FLUSH
    同步刷盘,调用 flush
  2. flushDiskType=ASYNC_FLUSH & transientStorePoolEnable=false
    异步刷盘,调用 flush.
  3. flushDiskType=ASYNC_FLUSH & transientStorePoolEnable=true
    异步刷盘, 调用 write,并随之调用 flush

5 参考

  • rocketMQ 设计指南
  • Java NIO 文件通道 FileChannel 用法及原理
  • ByteBuffer详解

这篇关于RocketMQ之 CommitLog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/242460

相关文章

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

centos7 安装rocketmq4.7.0以及RocketMQ-Console-Ng控制台

一、前置工作 1.1安装jdk8 https://blog.csdn.net/pang_ping/article/details/80570011 1.2安装maven https://www.cnblogs.com/116970u/p/11211963.html 1.3安装git https://blog.csdn.net/xwj1992930/article/details/964

RocketMQ 介绍

前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难。 作为一个在互联网公司面一次拿一次Offer的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。 于是在一个寂寞难耐的夜晚,我痛定思痛,决定开始写《吊打面试官》系列,希望能帮助各位读者以后面试势如破竹,

基于 RocketMQ 的云原生 MQTT 消息引擎设计

作者:沁君 概述 随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。 本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储

Rocketmq源码分析(1)

此次源码分析-rocketmq-spring-boot-starter,starter众所周知入口点就是AutoConfiguration.RocketMQAutoConfiguration.class // 标识为配置类@Configuration//将RocketMQProperties识别为配置属性类,创建对象并注入到spring容器中@EnableConfigurationProp

Docker创建Rocketmq-4.8.0镜像

rocketmq包含namesrv和broker两部分,这里不使用docker-compose编排,而是将这两部分分别创建容器. 一. namesrv 1. Dockerfile编写 FROM java:8ENV ROCKETMQ_VERSION 4.8.0ENV NAMESRV_HOME /home/rocketmq/namesrv-${ROCKETMQ_VERSION}ENV JAVA_

RocketMQ广播消费消息

1、 基础概念 RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。 集群消费模式(Cluster): 在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 广播消费模式(Broadcast)

ActiveMQ、RocketMQ、RabbitMQ、Kafka

特点:解耦、异步、削峰 特性ActiveMQRabbitMQRocketMQkafka开发语言javaerlangjavascala单机吞吐量万级万级10万级10万级时效性ms级us级ms级ms级以内可用性高(主从架构)高(主从架构)非常高(分布式架构)非常高(分布式架构)功能特性成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好基于erlang开发,所以并发能力很强,性能极其好,延