【RocketMQ】RocketMQ自动容灾切换DLedger集群实践

2023-12-17 20:32

本文主要是介绍【RocketMQ】RocketMQ自动容灾切换DLedger集群实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • DLedger配置介绍
  • 新建DLedger集群
    • 机器规划
      • Broker
      • nameserver
      • console
    • Broker配置
      • broker-a
      • broker-a-s1
      • broker-a-s2
      • broker-b
      • broker-b-s1
      • broker-b-s2
      • broker-c
      • broker-c-s1
      • broker-c-s2
    • 验证
  • 旧集群升级
    • 杀掉旧的 Broker
    • 检查旧的 Commitlog
    • 修改配置
    • 重新启动 Broker
  • Tips

本文主要演示如何搭建一个多主多从的自动容灾切换的RocketMQ集群,以及如何把现有普通的Master/Slave集群升级为自动容灾切换的集群。

DLedger配置介绍

​ 部署自动容灾的RocketMQ集群和普通的Master/Slave集群没太多差别,只需要在Broker.conf开启DLedger相关配置即可,官方称之为自动容灾切换的RocketMQ-on-DLedger Group,DLedger是分布式一致性协议Raft的一个实现,是一个轻量级的Java类库,RocketMQ引入其来实现Leader选举等一致性操作。

  • RocketMQ-on-DLedger Group是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在Leader和Follower之间复制数据以保证高可用。
  • RocketMQ-on-DLedger Group能自动容灾切换,并保证数据一致。
  • RocketMQ-on-DLedger Group是可以水平扩展的,也即可以部署任意多个RocketMQ-on-DLedger Group同时对外提供服务。

相关配置

name含义举例
enableDLegerCommitLog是否启动 DLedger,即是否启用RocketMQ主从切换,默认为falsetrue
dLegerGroupDLedger Raft Group的名字,建议和brokerName保持一致RaftNode00
dLegerPeersDLedger Group 内各节点的端口信息,同一个Group 内的各个节点配置必须要保证一致n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId节点 id, 必须属于 dLegerPeers 中的一个;同Group 内各个节点要唯一n0
sendMessageThreadPoolNums发送线程个数,建议配置成 Cpu 核数16

新建DLedger集群

机器规划

Broker

节点ID机器端口dLegerPeer
broker-a0172.24.29.21320911n0-172.24.29.213:10911
broker-a-s11172.24.29.21420911n1-172.24.29.214:10911
broker-a-s22172.24.29.21520911n2-172.24.29.215:10911
broker-b0172.24.29.21420915n0-172.24.29.214:10915
broker-b-s11172.24.29.21320915n1-172.24.29.213:10915
broker-b-s22172.24.29.21520915n2-172.24.29.215:10915
broker-c0172.24.29.21520919n0-172.24.29.215:10919
broker-c-s11172.24.29.21320919n1-172.24.29.213:10919
broker-c-s22172.24.29.21420919n2-172.24.29.214:10919

nameserver

节点机器端口
nameserver172.24.29.2159976

console

节点机器端口
console172.24.29.2158585

Broker配置

broker-a

listenPort = 20911
rocketmqHome = /neworiental/rocketmq-test/rocketmq-a/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-a
#对外提供服务地址
brokerIP1 = 172.24.29.213
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.213
brokerId = 0
storePathRootDir = /neworiental/rocketmq-test/rocketmq-a/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-a/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-a/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-a
dLegerSelfId = n0
dLegerPeers = n0-172.24.29.213:10911;n1-172.24.29.214:10911;n2-172.24.29.215:10911

broker-a-s1

listenPort = 20911
rocketmqHome = /neworiental/rocketmq-test/rocketmq-a-s1/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-a
#对外提供服务地址
brokerIP1 = 172.24.29.214
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.214
brokerId = 1
storePathRootDir = /neworiental/rocketmq-test/rocketmq-a-s1/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-a-s1/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-a-s1/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-a
dLegerSelfId = n1
dLegerPeers = n0-172.24.29.213:10911;n1-172.24.29.214:10911;n2-172.24.29.215:10911

broker-a-s2

listenPort = 20911
rocketmqHome = /neworiental/rocketmq-test/rocketmq-a-s2/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-a
#对外提供服务地址
brokerIP1 = 172.24.29.215
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.215
brokerId = 2
storePathRootDir = /neworiental/rocketmq-test/rocketmq-a-s2/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-a-s2/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-a-s2/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-a
dLegerSelfId = n2
dLegerPeers = n0-172.24.29.213:10911;n1-172.24.29.214:10911;n2-172.24.29.215:10911

broker-b

listenPort = 20915
rocketmqHome = /neworiental/rocketmq-test/rocketmq-b/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-b
#对外提供服务地址
brokerIP1 = 172.24.29.214
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.214
brokerId = 0
storePathRootDir = /neworiental/rocketmq-test/rocketmq-b/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-b/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-b/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-b
dLegerSelfId = n0
dLegerPeers = n0-172.24.29.214:10915;n1-172.24.29.213:10915;n2-172.24.29.215:10915

broker-b-s1

listenPort = 20915
rocketmqHome = /neworiental/rocketmq-test/rocketmq-b-s1/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-b
#对外提供服务地址
brokerIP1 = 172.24.29.213
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.213
brokerId = 1
storePathRootDir = /neworiental/rocketmq-test/rocketmq-b-s1/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-b-s1/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-b-s1/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-b
dLegerSelfId = n1
dLegerPeers = n0-172.24.29.214:10915;n1-172.24.29.213:10915;n2-172.24.29.215:10915

broker-b-s2

listenPort = 20915
rocketmqHome = /neworiental/rocketmq-test/rocketmq-b-s2/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-b
#对外提供服务地址
brokerIP1 = 172.24.29.215
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.215
brokerId = 2
storePathRootDir = /neworiental/rocketmq-test/rocketmq-b-s2/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-b-s2/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-b-s2/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-b
dLegerSelfId = n2
dLegerPeers = n0-172.24.29.214:10915;n1-172.24.29.213:10915;n2-172.24.29.215:10915

broker-c

listenPort = 20919
rocketmqHome = /neworiental/rocketmq-test/rocketmq-c/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-c
#对外提供服务地址
brokerIP1 = 172.24.29.215
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.215
brokerId = 0
storePathRootDir = /neworiental/rocketmq-test/rocketmq-c/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-c/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-c/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-c
dLegerSelfId = n0
dLegerPeers = n0-172.24.29.215:10919;n1-172.24.29.213:10919;n2-172.24.29.214:10919

broker-c-s1

listenPort = 20919
rocketmqHome = /neworiental/rocketmq-test/rocketmq-c-s1/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-c
#对外提供服务地址
brokerIP1 = 172.24.29.213
#Broker HAIP地址,供slave同步消息的地址
brokerIP2= 172.24.29.213
brokerId = 1
storePathRootDir = /neworiental/rocketmq-test/rocketmq-c-s1/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-c-s1/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-c-s1/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-c
dLegerSelfId = n1
dLegerPeers = n0-172.24.29.215:10919;n1-172.24.29.213:10919;n2-172.24.29.214:10919

broker-c-s2

listenPort = 20919
rocketmqHome = /neworiental/rocketmq-test/rocketmq-c-s2/rocketmq471
brokerClusterName = rocketmq-test
namesrvAddr= 172.24.29.215:9976
brokerName = broker-c
#对外提供服务地址
brokerIP1 = 172.24.29.214
#Broker HAIP地址,供slave同步消息的地址,不配此项则不会进行主从复制
brokerIP2= 172.24.29.214
brokerId = 2
storePathRootDir = /neworiental/rocketmq-test/rocketmq-c-s2/store
storePathCommitLog = /neworiental/rocketmq-test/rocketmq-c-s2/store/commitlog
storePathConsumerQueue = /neworiental/rocketmq-test/rocketmq-c-s2/store/consumequeue
deleteWhen = 04
fileReservedTime = 168
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = false
#是否自动创建消费组
autoCreateSubscriptionGroup = false
#集群名称是否可用作Topic使用
clusterTopicEnable = false
#Broker名称是否可用作Topic使用
brokerTopicEnable = false
useEpollNativeSelector = true#是否开启ACL
aclEnable=true#多副本自动主从选举Dledger相关
enableDLegerCommitLog = true
dLegerGroup = broker-c
dLegerSelfId = n2
dLegerPeers = n0-172.24.29.215:10919;n1-172.24.29.213:10919;n2-172.24.29.214:10919

验证

通过管理命令可以看到集群信息,如下图,每个组有一个Leader,两个Follower,集群搭建成功,日志也正常

Leader日志:

Follower日志:

下面手动将broker-b的Leader杀掉

操作之前:

Leader是在215上

操作之后:

Leader成功选举为214了

再次将215恢复:

可以看到215成为了新的Follower

旧集群升级

如果旧集群采用 Master 方式部署,则每个 Master 都需要转换成一个 RocketMQ-on-DLedger Group。
如果旧集群采用 Master-Slave 方式部署,则每个 Master-Slave 组都需要转换成一个 RocketMQ-on-DLedger Group。

杀掉旧的 Broker

可以通过 kill 命令来完成,也可以调用 bin/mqshutdown broker

检查旧的 Commitlog

​ RocketMQ-on-DLedger 组中的每个节点,可以兼容旧的 Commitlog ,但其 Raft 复制过程,只能针对新增加的消息。因此,为了避免出现异常,需要保证 旧的 Commitlog 是一致的。
​ 如果旧的集群是采用 Master-Slave 方式部署,有可能在shutdown时,其数据并不是一致的,建议通过md5sum 的方式,检查最近的最少 2 个 Commmitlog 文件,如果发现不一致,则通过拷贝的方式进行对齐。

​ 虽然 RocketMQ-on-DLedger Group 也可以以 2 节点方式部署,但其会丧失容灾切换能力(2n + 1 原则,至少需要3个节点才能容忍其中 1 个宕机)。
​ 所以在对齐了 Master 和 Slave 的 Commitlog 之后,还需要准备第 3 台机器,并把旧的 Commitlog 从 Master 拷贝到 第 3 台机器(记得同时拷贝一下 config 文件夹)。

​ 在 3 台机器准备好了之后,旧 Commitlog 文件也保证一致之后,就可以开始走下一步修改配置了。

修改配置

参考新集群部署。

重新启动 Broker

参考新集群部署。

Tips

  • dLegerPeers:每个DLedger Group内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致,多个节点用英文分号隔开,单个条目遵循 legerSlefId-IP:端口,这里的端口用作dledge内部通信,注意不要和RocketMQ本身的端口冲突
  • dLegerSelfId:节点 id, 必须属于dLegerPeers中的一个;同 Group 内各个节点要唯一,并且此ID要和dLegerPeers配置的内容中对应上
  • Broker配置文件中的dLedger少了一个d,不知道是笔误还是有意为之

参考:

https://github.com/apache/rocketmq/blob/master/docs/cn/dledger/deploy_guide.md

这篇关于【RocketMQ】RocketMQ自动容灾切换DLedger集群实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/505753

相关文章

Spring Security 单点登录与自动登录机制的实现原理

《SpringSecurity单点登录与自动登录机制的实现原理》本文探讨SpringSecurity实现单点登录(SSO)与自动登录机制,涵盖JWT跨系统认证、RememberMe持久化Token... 目录一、核心概念解析1.1 单点登录(SSO)1.2 自动登录(Remember Me)二、代码分析三、

在Java中使用OpenCV实践

《在Java中使用OpenCV实践》用户分享了在Java项目中集成OpenCV4.10.0的实践经验,涵盖库简介、Windows安装、依赖配置及灰度图测试,强调其在图像处理领域的多功能性,并计划后续探... 目录前言一 、OpenCV1.简介2.下载与安装3.目录说明二、在Java项目中使用三 、测试1.测

MyBatis-Plus 自动赋值实体字段最佳实践指南

《MyBatis-Plus自动赋值实体字段最佳实践指南》MyBatis-Plus通过@TableField注解与填充策略,实现时间戳、用户信息、逻辑删除等字段的自动填充,减少手动赋值,提升开发效率与... 目录1. MyBATis-Plus 自动赋值概述1.1 适用场景1.2 自动填充的原理1.3 填充策略

Olingo分析和实践之EDM 辅助序列化器详解(最佳实践)

《Olingo分析和实践之EDM辅助序列化器详解(最佳实践)》EDM辅助序列化器是ApacheOlingoOData框架中无需完整EDM模型的智能序列化工具,通过运行时类型推断实现灵活数据转换,适用... 目录概念与定义什么是 EDM 辅助序列化器?核心概念设计目标核心特点1. EDM 信息可选2. 智能类

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Olingo分析和实践之ODataImpl详细分析(重要方法详解)

《Olingo分析和实践之ODataImpl详细分析(重要方法详解)》ODataImpl.java是ApacheOlingoOData框架的核心工厂类,负责创建序列化器、反序列化器和处理器等组件,... 目录概述主要职责类结构与继承关系核心功能分析1. 序列化器管理2. 反序列化器管理3. 处理器管理重要方

虚拟机Centos7安装MySQL数据库实践

《虚拟机Centos7安装MySQL数据库实践》用户分享在虚拟机安装MySQL的全过程及常见问题解决方案,包括处理GPG密钥、修改密码策略、配置远程访问权限及防火墙设置,最终通过关闭防火墙和停止Net... 目录安装mysql数据库下载wget命令下载MySQL安装包安装MySQL安装MySQL服务安装完成

SpringBoot整合(ES)ElasticSearch7.8实践

《SpringBoot整合(ES)ElasticSearch7.8实践》本文详细介绍了SpringBoot整合ElasticSearch7.8的教程,涵盖依赖添加、客户端初始化、索引创建与获取、批量插... 目录SpringBoot整合ElasticSearch7.8添加依赖初始化创建SpringBoot项

Zabbix在MySQL性能监控方面的运用及最佳实践记录

《Zabbix在MySQL性能监控方面的运用及最佳实践记录》Zabbix通过自定义脚本和内置模板监控MySQL核心指标(连接、查询、资源、复制),支持自动发现多实例及告警通知,结合可视化仪表盘,可有效... 目录一、核心监控指标及配置1. 关键监控指标示例2. 配置方法二、自动发现与多实例管理1. 实践步骤

MySQL 迁移至 Doris 最佳实践方案(最新整理)

《MySQL迁移至Doris最佳实践方案(最新整理)》本文将深入剖析三种经过实践验证的MySQL迁移至Doris的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于CDC(ChangeData... 目录一、China编程JDBC Catalog 联邦查询方案(适合跨库实时查询)1. 方案概述2. 环境要求3.