RocketMQ中的CommitLog与ConsumeQueue

2024-03-12 14:04

本文主要是介绍RocketMQ中的CommitLog与ConsumeQueue,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. CommitLog

在消息队列系统中,CommitLog是一个通常用于存储消息的、不断增长的日志文件。在Apache RocketMQ中,CommitLog扮演着核心的角色,是消息存储的核心文件。下面是对RocketMQ中CommitLog的一些详细介绍:

CommitLog的作用

  1. 消息存储:所有的消息体都存储在CommitLog文件中,无论消息是属于哪个Topic或者Consumer Group。

  2. 持久化CommitLog确保了消息的持久化,保证了即使在系统崩溃的情况下,消息也不会丢失。

  3. 顺序写入:RocketMQ的CommitLog是顺序写入的,这种写入方式对于磁盘I/O来说是非常高效的,因为它减少了磁头的移动。

CommitLog的结构

RocketMQ的CommitLog文件通常是固定大小的,比如1GB。当当前文件写满后,会创建一个新的CommitLog文件继续写入。这些文件在物理上是顺序存放的,形成一个文件序列。

CommitLog的工作流程

  1. 消息写入:当一个生产者发送消息时,该消息首先被追加到CommitLog的末尾。

  2. 索引构建:与此同时,RocketMQ会创建并维护索引文件,比如ConsumeQueueIndexFile,这些索引文件存储了指向CommitLog中具体消息位置的指针。

  3. 消息消费:消费者根据索引文件中的指针去CommitLog中读取消息。

CommitLog的清理

因为CommitLog会不断增长,所以需要有一种机制来清理旧的数据,防止磁盘空间耗尽。RocketMQ提供了以下两种清理机制:

  1. 定时清理:在一个配置的时间间隔后,比如默认的72小时,CommitLog文件会被检查,如果文件中的消息都已经被消费,那么这个文件就可以被删除。

  2. 基于磁盘空间的清理:如果磁盘的使用率达到了一个阈值(比如75%),RocketMQ会开始清理最老的CommitLog文件,直到磁盘使用率降到安全水平。

安全性和可靠性

RocketMQ通过复制机制增加了CommitLog的安全性和可靠性。主服务器上的CommitLog可以被复制到多个从服务器,这样即使主服务器发生故障,消息也不会丢失,因为它们已经存储在从服务器上。

总结

CommitLog是RocketMQ消息存储架构的核心组件,它提供了持久化、高效率的消息存储机制。通过与索引文件(如ConsumeQueueIndexFile)的配合使用,它支持高性能的消息读取。同时,清理机制确保了系统的稳定运行,不会因为CommitLog的无限增长而耗尽磁盘空间。通过主从复制,它还保证了消息系统的高可用性和数据的安全性。

2. ConsumeQueue

在Apache RocketMQ中,ConsumeQueue是消费队列的概念,它是消息消费的关键数据结构。下面是对RocketMQ中ConsumeQueue的一些详细介绍:

ConsumeQueue的作用

  1. 索引信息ConsumeQueue是消息索引文件,为消息消费者提供高效的索引以快速检索消息。

  2. 逻辑队列:每个消息主题的每个队列(Topic的Queue)都有对应的ConsumeQueue。如果一个主题有四个队列,那么就有四个ConsumeQueue

  3. 简化读操作:由于CommitLog存储了所有主题的消息,为了快速找到属于同一主题和队列的消息,ConsumeQueue提供了一种快速访问的方式。

ConsumeQueue的结构

ConsumeQueue的每个条目都是一个固定长度的结构,通常包含以下几个部分:

  • 消息偏移量:指向CommitLog中具体消息的偏移量。
  • 消息长度:消息在CommitLog中的长度。
  • 消息标签的哈希码:用于支持基于标签的消息过滤。

ConsumeQueue的工作流程

  1. 写入:当一个消息被追加到CommitLog后,会同时在ConsumeQueue中生成一个索引项。

  2. 消费:消费者通过读取ConsumeQueue来确定需要消费的消息在CommitLog中的位置,然后直接从CommitLog中读取消息。

ConsumeQueue的清理

CommitLog一样,ConsumeQueue文件也会定期清理:

  • CommitLog被清理时,相关的ConsumeQueue条目也会被清理。
  • 清理的触发通常是基于时间或空间的阈值,例如消息存储时间超出配置的存储时间或CommitLog的清理。

总结

ConsumeQueue作为消息消费的索引,极大地提升了消息访问的效率,使得RocketMQ能够支持高吞吐量的消息传递。通过ConsumeQueue,RocketMQ实现了对CommitLog的顺序写入和随机读取的分离,使得生产者和消费者操作可以并行执行,从而提高性能。同时,ConsumeQueue的存在也简化了消息消费的设计,允许消费者以近乎恒定的时间复杂度快速找到和消费消息。

3. CommitLog与ConsumeQueue

在RocketMQ中,ConsumeQueue是一个重要的组件,它起到了消息消费的索引作用。ConsumeQueueCommitLog紧密配合,使消息的存储和检索过程变得高效。下面详细解释ConsumeQueue的作用以及它是如何与CommitLog配合工作的。

ConsumeQueue 的作用

  1. 消息索引ConsumeQueue为每个消息队列提供了索引服务,它映射了CommitLog中的消息位置,使得可以快速地查找到每个消息。

  2. 提高性能:由于ConsumeQueue中存储的是消息在CommitLog中的物理位置,这极大地提高了消息查找的速度,并减少了对CommitLog的直接操作,使得消息消费变得更加高效。

  3. 节约空间ConsumeQueue的存储结构非常简单,每个条目仅包含几个关键信息,比如消息在CommitLog中的偏移量、消息大小和消息Tag的哈希码,这样设计大大减少了存储空间的占用。

ConsumeQueue 的结构

ConsumeQueue是一个逻辑队列,每个消息主题的每个消息队列(Queue)都有自己的ConsumeQueue。例如,如果一个主题有4个队列,那么就会有4个ConsumeQueue

一个ConsumeQueue由多个文件组成,每个文件默认大小为5.72MB,包含了固定大小的条目。每个条目通常是20字节,其中包含消息在CommitLog中的起始偏移量(8字节),消息长度(4字节),和消息Tag的哈希码(8字节)。

CommitLog 与 ConsumeQueue 的配合

  1. 消息存储:当生产者发送消息时,消息首先被追加到CommitLog

  2. 索引创建:消息存储到CommitLog后,RocketMQ会根据主题和队列ID创建或更新ConsumeQueue的条目。这个条目指向了CommitLog中消息的具体位置。

  3. 消息消费:消费者从ConsumeQueue中获取到消息的位置信息后,再到CommitLog中读取实际的消息内容。

  4. 消息确认:消费者消费消息后,会更新其消费进度。消费进度通常是指在ConsumeQueue中的偏移量,表明消费者已经消费到哪个点。

效率与优化

  • 顺序读写:由于CommitLog是按消息到达的顺序存储的,它的写操作是顺序的。而ConsumeQueue作为索引,让读操作也趋近于顺序读取,这对于磁盘操作是非常高效的。

  • 批量拉取:消费者可以批量地从ConsumeQueue拉取消息,进一步提高了消息消费的效率。

  • 内存映射ConsumeQueue可以被内存映射(Memory Mapped),提高访问速度。

总结

ConsumeQueue为消息消费提供了快速的索引方式,它将消息在CommitLog的物理存储位置映射成逻辑队列中的逻辑位置。这种设计简化了消息的查找过程,使得即便是在大量数据的情况下,消息消费仍然能保持高效。同时,它也确保了消费的状态能够被持久化和跟踪,从而保证消息消费的可靠性。

这篇关于RocketMQ中的CommitLog与ConsumeQueue的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/801500

相关文章

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

【Rocketmq入门-基本概念】

Rocketmq入门-基本概念 名词解释名称服务器(NameServer)消息队列(Message Queue)主题(Topic)标签(Tag)生产者(Producer)消费者(Consumer)拉取模式(Pull)推送模式(Push)消息模型(Message Model) 关键组件Broker消息存储工作流程 名词解释 名称服务器(NameServer) 定义: 名称服务器

centos7 安装rocketmq4.7.0以及RocketMQ-Console-Ng控制台

一、前置工作 1.1安装jdk8 https://blog.csdn.net/pang_ping/article/details/80570011 1.2安装maven https://www.cnblogs.com/116970u/p/11211963.html 1.3安装git https://blog.csdn.net/xwj1992930/article/details/964

RocketMQ 介绍

前言 消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360°的刁难。 作为一个在互联网公司面一次拿一次Offer的面霸,打败了无数竞争对手,每次都只能看到无数落寞的身影失望的离开,略感愧疚(请允许我使用一下夸张的修辞手法)。 于是在一个寂寞难耐的夜晚,我痛定思痛,决定开始写《吊打面试官》系列,希望能帮助各位读者以后面试势如破竹,

基于 RocketMQ 的云原生 MQTT 消息引擎设计

作者:沁君 概述 随着智能家居、工业互联网和车联网的迅猛发展,面向 IoT(物联网)设备类的消息通讯需求正在经历前所未有的增长。在这样的背景下,高效和可靠的消息传输标准成为了枢纽。MQTT 协议作为新一代物联网场景中得到广泛认可的协议,正逐渐成为行业标准。 本次我们将介绍搭建在 RocketMQ 基础上实现的 MQTT 核心设计,本文重点分析 RocketMQ 如何适应这些变化,通过优化存储

Rocketmq源码分析(1)

此次源码分析-rocketmq-spring-boot-starter,starter众所周知入口点就是AutoConfiguration.RocketMQAutoConfiguration.class // 标识为配置类@Configuration//将RocketMQProperties识别为配置属性类,创建对象并注入到spring容器中@EnableConfigurationProp

Docker创建Rocketmq-4.8.0镜像

rocketmq包含namesrv和broker两部分,这里不使用docker-compose编排,而是将这两部分分别创建容器. 一. namesrv 1. Dockerfile编写 FROM java:8ENV ROCKETMQ_VERSION 4.8.0ENV NAMESRV_HOME /home/rocketmq/namesrv-${ROCKETMQ_VERSION}ENV JAVA_

RocketMQ广播消费消息

1、 基础概念 RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。 集群消费模式(Cluster): 在集群消费模式下,同一个消费者组(Consumer Group)中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上,但是同一个消息只会被同一个消费者组中的一个消费者消费。 广播消费模式(Broadcast)

ActiveMQ、RocketMQ、RabbitMQ、Kafka

特点:解耦、异步、削峰 特性ActiveMQRabbitMQRocketMQkafka开发语言javaerlangjavascala单机吞吐量万级万级10万级10万级时效性ms级us级ms级ms级以内可用性高(主从架构)高(主从架构)非常高(分布式架构)非常高(分布式架构)功能特性成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好基于erlang开发,所以并发能力很强,性能极其好,延