RocketMQ(二)双主双从集群搭建及入门介绍

2023-11-03 21:10

本文主要是介绍RocketMQ(二)双主双从集群搭建及入门介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

目前这边的配置是双主(master)双从(slave),同步双写,同步刷盘的机制。配置在两盘服务器上。

温故而知新

首先上个从百度图片盗来的图,哦不,程序员的事怎么能说盗呢?
在这里插入图片描述

温故下:

  • Producer : 生产者(消息发送者);
  • Consumer : 消费者(消息接收者);
  • Broker : 负责暂存消息;(类似于邮局);
  • NameServer :管理Broker;(类似于邮政部门,各个邮局的管理机构);
  • ProducerGroup:生产者组;
  • ConsumerGroup:消费者组。值得注意的是,在其他大V博主中的关于多Consumer订阅同一个ConsumerGroup而每个Consumer订阅的Topic又有所不同,照成的数据消费混乱或者无法被消息。这边Rocket应用就是这么个场景的。后面也会加以验证。参考链接: link.
    https://blog.csdn.net/a417930422/article/details/50663639?locationNum=1;
  • Topic : 生产者Producer和消费者Consumer都是通过Topic找到具体某大类进 行消息的投递和消费,Producer可以发送消息给一个或多个Topic。Consumer可以订阅一个或多个Topic消息;
  • Message Queue : 相当于Topic的分区,用于并行发送和接收消息;

  一、由Producer 生产消息,Producer 通过询问NameServer (注:这里NameServer 实现了负载均衡)获得要将消息发送给那个Broker的具体地址,就是红色线流程。同理Consumer消息者携带者ConsumerGroup组别信息和Topic信息对NameServer 进行询问应该去哪个Broker获取消息。(值的注意的是RocketMQ对于消费者有主动推送消息DefaultMQPushConsumer和消费者主站拉取消息DefaultMQPullConsumer两种模式,但其主动推送消息模式底层实现上看也是由消费者主站拉取消息 )

  二、Broker
  1.Broker 分为主从节点,上图Broker Master1 和Broker Master2为主节点,Broker Slave1和Broker Slave2为从节点。主节点主要负责Producer 生产者消息的写入。从节点主要负责Consumer 消费者消息的消费。因为我们即将搭建的是双主双从,所以我们需要由brokerName参数去设置那个主那个从为一个broker组,然后通过brokerId去区分broker组中那个为主从区别,0为主节点,大于0为从节点(注一个master可以对应多个slave,而一个slave只能对应一个master)
  2.Broker 主从节点之间数据同步,分为两种模式:一种异步复制,一种同步双写。其一,异步就是当Producer生产者将消息写入Broker Master以后,Broker Master就将消息成功写入的状态返给Producer了,Broker Master再异步将数据复制给Broker Slave,至于什么时候Broker Master异步复制,是否将数据复制成功Producer生产者是不知道的。异步复制会出现少量的数据丢失。其二,同步呢,顾名思义就是Producer生产者将消息写入Broker Master以后,Broker Master就将消息同步写入Broker Slave,当完成这个 操作以后再返回状态给Producer。异步效率比较高,同步对于数据的一致性安全性比较好。

Linux上双主双从的搭建

这边两台服务器上的IP将用IP1 和IP2区分。
步骤1到步骤3在两台服务器上都要进行配置。
1.hosts中添加信息

vim /etc/hosts
//记得修改IP为服务器地址。
//nameserver
IP1 nameserver1
IP2 nameserver2
//broker
IP1 master1
IP1 master1-slave1
IP2 master2
IP2 master2-slave2
//保存退出
:wq!
//重启网卡
systemctl restart network

2.记得开放RocketMQ服务端口,默认端口分别为
nameserver:9876;
master:10911;
slave:11011;

3.创建消息存储路径


mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
mkdir /usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index

4.配置broker
 4.1配置服务器IP1上的broker文件broker-a.properties

//根据自己的路径,进入同步文件夹下
cd /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync
//修改配置文件
vim broker-a.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1

 4.2配置服务器IP1上的broker文件broker-a-s.properties

//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-a-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP1

 4.3配置服务器IP2上的broker文件broker-b.properties

vim broker-b.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10912
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#(注意修改为自己服务器)强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2

4.4配置服务器IP2上的broker文件broker-b-s.properties

//这边是在IP1上配置broker-a.properties和broker-a-s.properties,当IP1宕机的话broker-a组也就宕掉了。所以你们也可以交叉配置,比如在IP1上配置broker-a.properties和broker-b-s.properties
vim broker-b-s.properties#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=nameserver1:9876;nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#表示Master监听Slave请求的端口,默认为服务端口+1
haListenPort=10924
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/rocketmq-4.7.0/rocketMQ-2m2s/store-s/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
# 这里使用了同步刷盘,建议使用异步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
#强制指定本机IP,需要根据每台机器进行修改。官方介绍可为空,系统默认自动识别,但多网卡时IP地址可能读取错误
brokerIP1=IP2

以上broker配置完毕,上篇文章我们只修改了一个服务器的启动内存,另外一个服务器rocketMQ的启动内存也记得修改。
5.启动服务

//进入到服务器IP1下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a.properties &
//启动slave1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-a-s.properties &
//jps查看
jps46688 NamesrvStartup
47106 BrokerStartup
46815 BrokerStartup
//进入到服务器IP2下的bin目录
cd /usr/local/rocketmq/rocketmq-4.7.0/bin
//启动nameserver集群
nohup sh mqnamesrv &//启动master2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b.properties &
//启动slave2
nohup sh mqbroker /usr/local/rocketmq/rocketmq-4.7.0/conf/2m-2s-sync/broker-b-s.properties &
//jps查看
jps

本章结束

这篇关于RocketMQ(二)双主双从集群搭建及入门介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/340879

相关文章

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

Python中win32包的安装及常见用途介绍

《Python中win32包的安装及常见用途介绍》在Windows环境下,PythonWin32模块通常随Python安装包一起安装,:本文主要介绍Python中win32包的安装及常见用途的相关... 目录前言主要组件安装方法常见用途1. 操作Windows注册表2. 操作Windows服务3. 窗口操作

从入门到精通C++11 <chrono> 库特性

《从入门到精通C++11<chrono>库特性》chrono库是C++11中一个非常强大和实用的库,它为时间处理提供了丰富的功能和类型安全的接口,通过本文的介绍,我们了解了chrono库的基本概念... 目录一、引言1.1 为什么需要<chrono>库1.2<chrono>库的基本概念二、时间段(Durat

c++中的set容器介绍及操作大全

《c++中的set容器介绍及操作大全》:本文主要介绍c++中的set容器介绍及操作大全,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录​​一、核心特性​​️ ​​二、基本操作​​​​1. 初始化与赋值​​​​2. 增删查操作​​​​3. 遍历方

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

从入门到精通MySQL 数据库索引(实战案例)

《从入门到精通MySQL数据库索引(实战案例)》索引是数据库的目录,提升查询速度,主要类型包括BTree、Hash、全文、空间索引,需根据场景选择,建议用于高频查询、关联字段、排序等,避免重复率高或... 目录一、索引是什么?能干嘛?核心作用:二、索引的 4 种主要类型(附通俗例子)1. BTree 索引(

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

HTML img标签和超链接标签详细介绍

《HTMLimg标签和超链接标签详细介绍》:本文主要介绍了HTML中img标签的使用,包括src属性(指定图片路径)、相对/绝对路径区别、alt替代文本、title提示、宽高控制及边框设置等,详细内容请阅读本文,希望能对你有所帮助... 目录img 标签src 属性alt 属性title 属性width/h

如何使用Haporxy搭建Web群集

《如何使用Haporxy搭建Web群集》Haproxy是目前比较流行的一种群集调度工具,同类群集调度工具有很多如LVS和Nginx,本案例介绍使用Haproxy及Nginx搭建一套Web群集,感兴趣的... 目录一、案例分析1.案例概述2.案例前置知识点2.1 HTTP请求2.2 负载均衡常用调度算法 2.

SpringBoot连接Redis集群教程

《SpringBoot连接Redis集群教程》:本文主要介绍SpringBoot连接Redis集群教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 依赖2. 修改配置文件3. 创建RedisClusterConfig4. 测试总结1. 依赖 <de