RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失

2024-08-25 21:20

本文主要是介绍RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

重复消息

我们需要给我们的消费者实现幂等解决重复消息,也就是对同一个消息的处理结果,执行多少次都不变。

这个还是需要结合具体的业务的。你可以使用写入Redis来保证,因为Redis 的 key 和 value 就是天然支持幕等的。
当然还有使用用数据库插入法,基于数据库的唯一键来保证重复数据不会被插入多条。

不过最主要的还是需要根据特定场景使用特定的解决方案,你要知道你的消息消费是否是完全不可重复消费、还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。

消息堆积

RocketMQ的消息堆积,一般都是因为客户端本地消费过程中,由于消费耗时过长或消费并发度较小等原因,导致客户端消费能力不足,出现消息堆积的问题。当线上出现消息堆积的问题时,一般有以下几种方式来解决:

  1. 增加消费者数量:消息堆积了,消费不过来了,那就把消费者的数量增加一下,让更多人的实例来消费这些消心。
  2. 提升消费者消费速度:消费者消费的慢可能是消息堆积的主要原因,想办法提升消费速度,比如引入线程池、本地消息存储后即返回成功后续再慢慢消费等。
  3. 降低生产者的生产速度:如果生产者可控的话,可以让生产者生成消息的速度慢一点。
  4. 清理过期消息:有一些过期消息、或者一直无法成功的消息,在业务做评估之后,如果无影响或者影响不大,其实是可以清理的。
  5. 调整RocketMQ的配置参数:RocketMQ提供了很多可配配置的参数,例如消息消费模式、消息拉取间隔时间等,可以根据实际情况来调整这些参数,从而优化消息消费的为效率
  6. 增加Topic队列数:如果一个Topic的队列数比较少,那么就容易出现消息堆积的情况。可以通过增加队列数来提高消息的处理并发度,从而减少消息堆积。

回溯消费

回溯消费是指在向Consumer已经消费成功的消息,由于业务上需求需要重新消费,在RocketMQ中,Broker投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,间维度精确到毫秒。

如何防止消息不丢失

RocketMQ的消息想要确保不丢失,需要生产者、消费者以及Broker的共同努力,缺一不可。

首先在生产者端,消息的发送分为同步和异步两种,在同步发送消息的情况下,消息的发送会同步阻塞等待Broker返回结果,在Broker确认收到消息之后,生产者才会拿到SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息
但是Broker其实并不会立即把消息存储到磁盘上,而是先存储到内存中,内存存储成功之后,就返回给确认结果给生产者了。然后再通过异步刷盘的方式将内存中的数据存储到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。

如果想要保证消息不丢失,可以将消息保存机制修改为同步刷盘,这样,Broker会在同步请求中把数据保存在磁盘上,确保保存成功后再返回确认结果给生产者。

除了同步发送消息,还有异步发送,异步发送的话就需要生产者重写SendCallback的onSuccess和onException方法,用于给Broker进行回调。在方法中实现消息的确认或者重新发送

为了保证消息不丢失,RocketMQ肯定要通过集群方式进行部署,Broker通常采用一主多从部署方式,并且采用主从同步的方式做数据复制。
当主Broker宕机时,从Broker会接管主Broker的工作,保证消息不丢失。RocketMQ的Broker还可以配置多个实例,消息会在多个Broker之间进行冗余备份,从而保证数据的的可靠性。默认方式下,Broker在接收消息后,写入master成功,就可以返回确认响应给生产者了,接着消息将会异步复制到slave节点。但是如果这个过程中,Master的磁盘损坏了。那就会导致数据丢失了。如果想要解决这个问题,可以配置同步复制的方式,即Master在将数据同步到Slave节点后,再返回给生产者确认结果

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。
所以,在消费者的代码中,一定要在业务逻辑的最后一步 returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;当然,也可以先把数据保存在数据库中,就返回,然后自己再慢慢处理。
但是,需要注意的是RocketMQ和Kafka一样,只能最大限度的为保证消息不丢失,但是没办法做到100%保证不丢失。例如异步发送时,要执行回调时Brocker宕机了,或者生产者宕机了。。。

这篇关于RocketMQ~重复消息、消息堆积、回溯消费、如何防止消息不丢失的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106683

相关文章

如何通过try-catch判断数据库唯一键字段是否重复

《如何通过try-catch判断数据库唯一键字段是否重复》在MyBatis+MySQL中,通过try-catch捕获唯一约束异常可避免重复数据查询,优点是减少数据库交互、提升并发安全,缺点是异常处理开... 目录1、原理2、怎么理解“异常走的是数据库错误路径,开销比普通逻辑分支稍高”?1. 普通逻辑分支 v

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

解决RocketMQ的幂等性问题

《解决RocketMQ的幂等性问题》重复消费因调用链路长、消息发送超时或消费者故障导致,通过生产者消息查询、Redis缓存及消费者唯一主键可以确保幂等性,避免重复处理,本文主要介绍了解决RocketM... 目录造成重复消费的原因解决方法生产者端消费者端代码实现造成重复消费的原因当系统的调用链路比较长的时

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

电脑提示d3dx11_43.dll缺失怎么办? DLL文件丢失的多种修复教程

《电脑提示d3dx11_43.dll缺失怎么办?DLL文件丢失的多种修复教程》在使用电脑玩游戏或运行某些图形处理软件时,有时会遇到系统提示“d3dx11_43.dll缺失”的错误,下面我们就来分享超... 在计算机使用过程中,我们可能会遇到一些错误提示,其中之一就是缺失某个dll文件。其中,d3dx11_4

Spring的RedisTemplate的json反序列泛型丢失问题解决

《Spring的RedisTemplate的json反序列泛型丢失问题解决》本文主要介绍了SpringRedisTemplate中使用JSON序列化时泛型信息丢失的问题及其提出三种解决方案,可以根据性... 目录背景解决方案方案一方案二方案三总结背景在使用RedisTemplate操作redis时我们针对

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

nginx 负载均衡配置及如何解决重复登录问题

《nginx负载均衡配置及如何解决重复登录问题》文章详解Nginx源码安装与Docker部署,介绍四层/七层代理区别及负载均衡策略,通过ip_hash解决重复登录问题,对nginx负载均衡配置及如何... 目录一:源码安装:1.配置编译参数2.编译3.编译安装 二,四层代理和七层代理区别1.二者混合使用举例

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

java向微信服务号发送消息的完整步骤实例

《java向微信服务号发送消息的完整步骤实例》:本文主要介绍java向微信服务号发送消息的相关资料,包括申请测试号获取appID/appsecret、关注公众号获取openID、配置消息模板及代码... 目录步骤1. 申请测试系统2. 公众号账号信息3. 关注测试号二维码4. 消息模板接口5. Java测试