手搭手RocketMQ重试机制

2024-03-17 12:28
文章标签 rocketmq 机制 搭手 重试

本文主要是介绍手搭手RocketMQ重试机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- 排除logback依赖 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><!--Log4j2场景启动器 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3</version><exclusions><exclusion><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.1.14</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>3.6.1</version></dependency><dependency><groupId>p6spy</groupId><artifactId>p6spy</artifactId><version>3.9.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency></dependencies>

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

Rocket重试机制

RocketMQ的重试机制是指:当消费者消费消息失败时,RocketMQ会在一定时间后重新将消息发送给消费者进行消费,以确保消息的可靠消费。

自动重试:Consumer在消费失败后,会在一定重试策略下定期重试消费失败的消息,直到成功或达到最大重试次数。

消息重发:如果Consumer在最大重试次数内仍然消费失败,Broker会定期扫描被标记为消费失败的消息,并将其重发给其他Consumer。

灵活的重试策略:RocketMQ提供多种重试策略来控制重试时机和频率,主要有:

生产者重试

生产者设置消息失败后重试次数

//同步
producer.setRetryTimesWhenSendFailed(3);
//异步
producer.setRetryTimesWhenSendAsyncFailed(2);

Int 重试的次数

//重试

@Test
void retryProducerTest()throws Exception{//创建生产者DefaultMQProducer producer = new DefaultMQProducer("retryGroup");//连接namesrvproducer.setNamesrvAddr("192.168.68.133:9876");//启动producer.start();producer.setDefaultTopicQueueNums(1);//自身业务key唯一String Key = UUID.randomUUID().toString();System.out.println(Key);//重试//同步producer.setRetryTimesWhenSendFailed(3);//异步//producer.setRetryTimesWhenSendAsyncFailed(2);//创建消息Message message = new Message("retry", null,Key, "重试测试内存内容".getBytes());//发送消息producer.send(message);System.out.println("发送成功");//关闭生产者producer.shutdown();
}

消费者重试

设置自定义最大重试

consumer.setMaxReconsumeTimes(6);

死信消息(超过重试次数,并未处理的消息),放在死信主题中,%DLQ% retry

@Test
void retryConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅主题   *表示该主题的所有消息consumer.subscribe("retry","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("消息内容"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

死信处理方案

死信处理方案1、单独订阅死信主题2、监听死信主题(业务流程控制)

通过存入单独数据库表,业务发送短信等方式通知人工处理

1、单独订阅死信主题

单独订阅监听主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//订阅死信主题   *表示该主题的所有消息consumer.subscribe("%DLQ%retryConsumerTest","*");//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {System.out.println(new Date());System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

2、监听死信主题(业务流程控制)

通过业务流程监听多个主题

//死信处理方案二、监听死信主题

@Test
void retryDeadMonitorConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retryDeadMonitorConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.68.133:9876");//设置每次重试次数consumer.setMaxReconsumeTimes(3);// 订阅需要的多个主题列表List<String> topics = Arrays.asList("retry", "TopicA", "TopicB");// 订阅主题列表中的所有主题for (String topic : topics) {consumer.subscribe(topic, "*"); // 这里的tag是用来过滤消息的,"*" 表示接收这个主题下的所有消息}//设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理//获取keyfor (MessageExt messageExt : msgs) {try{//业务代码System.out.println("业务代码");System.out.println("消息内容"+new String(messageExt.getBody()));int i =1/0;//模拟代码出错}catch (Exception e){//获取重试次数int reconsumeTimes = messageExt.getReconsumeTimes();String key = messageExt.getKeys();if (reconsumeTimes > 2){OrderLog orderLog = new OrderLog();orderLog.setType(2);orderLog.setOrderid(key);orderLog.setUsername("重试超过2次,死信消息");orderMapper.insert(orderLog);System.out.println("将死信消息单独存入未处理消息表中"+new String(messageExt.getBody()));//发送短信通知人工处理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}else {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}//CONSUME_SUCCESS成功  RECONSUME_LATER失败return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭 consumer.shutdown();
}

这篇关于手搭手RocketMQ重试机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/818942

相关文章

嵌入式Linux驱动中的异步通知机制详解

《嵌入式Linux驱动中的异步通知机制详解》:本文主要介绍嵌入式Linux驱动中的异步通知机制,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、异步通知的核心概念1. 什么是异步通知2. 异步通知的关键组件二、异步通知的实现原理三、代码示例分析1. 设备结构

JVM垃圾回收机制之GC解读

《JVM垃圾回收机制之GC解读》:本文主要介绍JVM垃圾回收机制之GC,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、死亡对象的判断算法1.1 引用计数算法1.2 可达性分析算法二、垃圾回收算法2.1 标记-清除算法2.2 复制算法2.3 标记-整理算法2.4

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

SpringRetry重试机制之@Retryable注解与重试策略详解

《SpringRetry重试机制之@Retryable注解与重试策略详解》本文将详细介绍SpringRetry的重试机制,特别是@Retryable注解的使用及各种重试策略的配置,帮助开发者构建更加健... 目录引言一、SpringRetry基础知识二、启用SpringRetry三、@Retryable注解

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

java中反射(Reflection)机制举例详解

《java中反射(Reflection)机制举例详解》Java中的反射机制是指Java程序在运行期间可以获取到一个对象的全部信息,:本文主要介绍java中反射(Reflection)机制的相关资料... 目录一、什么是反射?二、反射的用途三、获取Class对象四、Class类型的对象使用场景1五、Class

Nginx之upstream被动式重试机制的实现

《Nginx之upstream被动式重试机制的实现》本文主要介绍了Nginx之upstream被动式重试机制的实现,可以通过proxy_next_upstream来自定义配置,具有一定的参考价值,感兴... 目录默认错误选择定义错误指令配置proxy_next_upstreamproxy_next_upst

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

Spring Retry 实现乐观锁重试实践记录

《SpringRetry实现乐观锁重试实践记录》本文介绍了在秒杀商品SKU表中使用乐观锁和MybatisPlus配置乐观锁的方法,并分析了测试环境和生产环境的隔离级别对乐观锁的影响,通过简单验证,... 目录一、场景分析 二、简单验证 2.1、可重复读 2.2、读已提交 三、最佳实践 3.1、配置重试模板

Spring排序机制之接口与注解的使用方法

《Spring排序机制之接口与注解的使用方法》本文介绍了Spring中多种排序机制,包括Ordered接口、PriorityOrdered接口、@Order注解和@Priority注解,提供了详细示例... 目录一、Spring 排序的需求场景二、Spring 中的排序机制1、Ordered 接口2、Pri