SpringBoot+RocketMQ集群(dledger)部署完整学习笔记

本文主要是介绍SpringBoot+RocketMQ集群(dledger)部署完整学习笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、单台集群部署
  • 二、多台集群部署
    • 1.修改配置
    • 2.dashboard修改
  • 三、整合springboot
    • 1.引入pom和修改yml
    • 2.编写消费者
    • 3.编写生产者
    • 4.测试效果
  • 总结


前言


RocketMQ集群方式有好几种
官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy

  • 2m-2s-async:2主2从异步刷盘(吞吐量较大,但是消息可能丢失
  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
  • 2m-noslave :2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
  • dledger:用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,
    其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

dledger搭建参考文档 https://rocketmq.apache.org/zh/docs/4.x/bestPractice/02dledger

MQ安装部署请看这篇:https://blog.csdn.net/HBliucheng/article/details/135357998

搭建过程中踩过的坑也也会记录下来

一、单台集群部署

## 启动
nohup sh bin/dledger/fast-try.sh start
## 关闭
nohup sh bin/dledger/fast-try.sh stop

先启动 fast-try.sh start
启动时发现权限不足
nohup: 无法运行命令"bin/mqbroker": 权限不够
查看启动脚本

cat bin/dledger/fast-try.sh

在这里插入图片描述
那我们就修改下nohup 后面加上sh
修改后如下

function startNameserver() {export JAVA_OPT_EXT=" -Xms512m -Xmx512m  "nohup sh bin/mqnamesrv &
}function startBroker() {export JAVA_OPT_EXT=" -Xms1g -Xmx1g  "conf_name=$1nohup sh bin/mqbroker -c $conf_name &
}

再次启动发现可以了
在这里插入图片描述
执行命令 查看集群情况 BID =0的是主节点

sh bin/mqadmin clusterList -n 127.0.0.1:9876

在这里插入图片描述
再看看dashboarb
启动之前请先开放6个端口 如果还有端口访问不了的请自行开放出来

firewall-cmd --zone=public --add-port=30909/tcp --permanent
firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=30919/tcp --permanent
firewall-cmd --zone=public --add-port=30921/tcp --permanent
firewall-cmd --zone=public --add-port=30929/tcp --permanent
firewall-cmd --zone=public --add-port=30931/tcp --permanent### 如果不想一次次开放下面命令也可以
firewall-cmd --zone=public --add-port=30900-30930/tcp --permanent
## 重启防火墙
systemctl reload firewalld
## 查看开放的端口
firewall-cmd --list-ports
## 其它命令
### 关闭端口
firewall-cmd --zone=public --remove-port=30909/tcp --permanent

在这里插入图片描述

启动生产者和消费者再看 master消费一个
在这里插入图片描述
停止master

 lsof  -i:30911## 找到pid杀死 我的是118276kill  118276

在这里插入图片描述
在这里插入图片描述
我们再启动 被杀死的broker

nohup sh  bin/mqbroker -c conf/dledger/broker-n0.conf &

在这里插入图片描述
在这里插入图片描述
发现30911作为slave回来了

二、多台集群部署

先准备三台机器
192.168.141.101
192.168.141.102
192.168.141.103

1.修改配置

192.168.141.101修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.101:9876
source /etc/profile

修改 broker.conf 后面我们启动哪个就修改哪个 我是把 broker-n0.conf复制一份到broker.conf,也可以直接修改broker-n0.conf,启动时启动自己配置的conf文件就可以

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16

192.168.141.102修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=92.168.141.102:9876
source /etc/profile

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16

192.168.141.103修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.103:9876
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16

开放端口
每台机器都要开放

firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=40911/tcp --permanent
systemctl reload firewalld

如果还有端口没开放,请自行开放

启动
每台机器都要启动

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
nohup sh bin/mqnamesrv &
nohup sh  bin/mqbroker -c conf/dledger/broker.conf &

查看日志
在这里插入图片描述
发现未创建文件夹,创建文件夹

 mkdir -p  /tmp/rmqstore/node00/commitlog
## 关掉再启动
sh bin/mqshutdown broker
## 启动broker
nohup sh  bin/mqbroker -c conf/dledger/broker.conf &

查看集群情况

sh bin/mqadmin clusterList -n 127.0.0.1:9876

在这里插入图片描述

踩坑 这个值不要随便写,这里从0开始递增 ,不然选举会有问题
在这里插入图片描述

2.dashboard修改

修改配置

## 根据自己的服务器地址修改,注意中间是分号不是逗号
rocketmq.config.namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876

启动访问
在这里插入图片描述
关闭master再查看集群情况,然后再重启,和前面的单机集群一样的,大家可自行测试

三、整合springboot

1.引入pom和修改yml

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
rocketmq:
# 集群中间以分号隔开name-server: 192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876producer:group: my_group_test

2.编写消费者

package com.study.config.rocketmq;import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.nio.charset.Charset;/*** @author: * @time: 2024/1/5 10:00*/
@Component
@RocketMQMessageListener(consumerGroup = "my_group_test",topic = "topic_test",selectorType = SelectorType.TAG,selectorExpression = "tagA")
@Slf4j
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody(), CharsetUtil.UTF_8);log.info("msgId={} msg={}",msgId,msg);}
}

@RocketMQMessageListener 注解参数如下:

  • topic: 消费者订阅的主题,即消费者将从这个主题中接收消息。
  • consumerGroup: 消费者组,多个消费者可以组成一个消费者组,共同从一个主题中接收消息。
  • consumeMode: 消费模式,指定消费者是以并发的方式接收消息还是以有序的方式接收消息。并发模式下,多个消费者可以同时接收消息;有序模式下,每个消费者按照消息的顺序依次接收消息。
  • messageModel: 消息模式,指定消息是以集群模式还是广播模式发送。集群模式下,消息将被发送到同一个主题的其中一个消费者;广播模式下,消息将被发送到主题的所有消费者。
  • selectorType: 过滤消息的方式,可以使用标签(Tag)或SQL92表达式(SQL92)来过滤消息。
  • selectorExpression: 过滤消息的表达式,可以使用标签(Tag)或SQL92表达式(SQL92)来指定过滤条件。
  • maxReconsumeTimes: 消息消费失败后,可被重复投递的最大次数。超过最大重试次数后,消息将被放入死信队列。
  • delayLevelWhenNextConsume: 并发模式的消息重试策略,指定消息消费失败后的重试延迟级别。设置为-1时,表示无需重试,直接将消息放入死信队列。

3.编写生产者

package com.study.controller;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @author: * @time: 2024/1/5 10:17*/
@RestController
@RequestMapping("/mq")
@Slf4j
public class RocketMQProducerController {@ResourceRocketMQTemplate rocketMQTemplate;@PostMapping("/sendMessage")@ResponseBodypublic void sendMessage(String msg){rocketMQTemplate.asyncSend("topic_test", "hello mq", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("msgId={}",sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}

同步会有一点小问题,第一次启动不会消费,直接写成异步

4.测试效果

发现没有主题
在这里插入图片描述
追踪源码发现主题和过滤消息的表达式按照冒号分割
topic取第一位,过滤表达式取第二位
在这里插入图片描述

修改再试下

  rocketMQTemplate.asyncSend("topic_test:tagA", "hello mq", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("msgId={}",sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});

发现可以了
在这里插入图片描述
前面写了个java客户端的消费者,改下消费组发现也可以消费

java客户端代码

package com.bsoft;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;/*** @author: liucheng* @time: 2023/12/29 15:39*/
public class MQConsumer {private final static String nameServer = "192.168.141.101:9876";private final static String consumerGroup = "my_group_test02";private final static String topic = "topic_test";public static void main(String[] args) throws MQClientException, IOException, InterruptedException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);// 设置NameServer的地址consumer.setNamesrvAddr(nameServer);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(topic, "tagA");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);msgs.forEach((msg)->{byte[] body = msg.getBody();String s = new String(body, Charset.defaultCharset());System.out.println("msg=================> " +s);});// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();System.in.read();}
}

在这里插入图片描述
到此集群搭建完成,大家搭建过程中有遇到问题可以交流

总结

整个搭建过程不难就是有点繁琐,需要配置多台服务器

  • 其中配置brocker.conf时dLegerSelfId值这块要注意 ,dLegerSelfId是节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一。这个值从0开始递增
    在这里插入图片描述

  • 同一台服务器上启动时先启动 namesrv 再启动 broker

这篇关于SpringBoot+RocketMQ集群(dledger)部署完整学习笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/575715

相关文章

SpringBoot整合Flowable实现工作流的详细流程

《SpringBoot整合Flowable实现工作流的详细流程》Flowable是一个使用Java编写的轻量级业务流程引擎,Flowable流程引擎可用于部署BPMN2.0流程定义,创建这些流程定义的... 目录1、流程引擎介绍2、创建项目3、画流程图4、开发接口4.1 Java 类梳理4.2 查看流程图4

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

SpringBoot排查和解决JSON解析错误(400 Bad Request)的方法

《SpringBoot排查和解决JSON解析错误(400BadRequest)的方法》在开发SpringBootRESTfulAPI时,客户端与服务端的数据交互通常使用JSON格式,然而,JSON... 目录问题背景1. 问题描述2. 错误分析解决方案1. 手动重新输入jsON2. 使用工具清理JSON3.

java中long的一些常见用法

《java中long的一些常见用法》在Java中,long是一种基本数据类型,用于表示长整型数值,接下来通过本文给大家介绍java中long的一些常见用法,感兴趣的朋友一起看看吧... 在Java中,long是一种基本数据类型,用于表示长整型数值。它的取值范围比int更大,从-922337203685477

java Long 与long之间的转换流程

《javaLong与long之间的转换流程》Long类提供了一些方法,用于在long和其他数据类型(如String)之间进行转换,本文将详细介绍如何在Java中实现Long和long之间的转换,感... 目录概述流程步骤1:将long转换为Long对象步骤2:将Longhttp://www.cppcns.c

SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程

《SpringBoot集成LiteFlow实现轻量级工作流引擎的详细过程》LiteFlow是一款专注于逻辑驱动流程编排的轻量级框架,它以组件化方式快速构建和执行业务流程,有效解耦复杂业务逻辑,下面给大... 目录一、基础概念1.1 组件(Component)1.2 规则(Rule)1.3 上下文(Conte

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过