MQ专题:延迟消息的通用方案

2024-08-30 09:44

本文主要是介绍MQ专题:延迟消息的通用方案,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、主要内容

本文将实现一个MQ延迟消息的通用方案。

方案不依赖于MQ中间件,依靠MySQL和DelayQueue解决,不管大家用的是什么MQ,具体是RocketMQ、RabbitMQ还是kafka,本文这个方案你都可以拿去直接使用,可以轻松实现任意时间的延迟消息投递。

二、涉及技术点

  1. SpringBoot2.7
  2. MyBatisPlus
  3. MySQL
  4. 线程池
  5. java中的延迟队列:DelayQueue
  6. 分布式锁

三、延迟消息常见的使用场景

  1. 订单超时处理

比如下单后15分钟,未支付,则自动取消订单,回退库存。

可以采用延迟队列实现:下单的时候可以投递一条延迟15分钟的消息,15分钟后消息将被消费。

  1. 消息消费失败重试

比如MQ消息消费失败后,可以延迟一段时间再次消费。

可以采用延迟消息实现:消费失败,可以投递一条延迟消息,触发再次消费

  1. 其他任意需要延迟处理的业务

业务中需要延迟处理的场景,都可以使用延迟消息来搞定。

四、延迟消息常见的实现方案

方案1:MySQL + job定时轮询

由于延迟消息的时间不确定,若要达到实时性很高的效果,也就是说消息的延迟时间是不知道的,那就需要轮询每一秒才能确保消息在指定的延迟时间被处理,这就要求job需要每秒查询一次db中待投递的消息。

这种方案访问db的频率比较高,对数据库造成了一定的压力。

方案2:RabbitMQ 中的TTL+死信队列

rabbitmq中可以使用TTL消息 + 死信队列实现,也可以安装延时插件。

此方案对中间件有依赖,不同的MQ实现是不一样的,若换成其他的MQ,方案要重新实现

方案3:MySQL + job定时轮询 + DelayQueue

可以对方案1进行改进,引入java中的 DelayQueue。

job可以采用1分钟执行一次,每次拉取未来2分钟内需要投递的消息,将其丢到java自带的 DelayQueue 这个延迟队列工具类中去处理,这样便能做到实时性很高的投递效果,且对db的压力也降低了很多。

这种方案对db也没什么压力,实时性非常高,且对MQ没有依赖,这样不管切换什么MQ,这种方案都不需要改动。

本文将落地该方案。

需要一张本地消息表(t_msg)

这张表用来暂存事务消息和延迟消息

create table if not exists t_msg
(id               varchar(32) not null primary key comment '消息id',body_json        text        not null comment '消息体,json格式',status           smallint    not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',expect_send_time datetime    not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',actual_send_time datetime comment '消息实际投递时间',create_time      datetime comment '创建时间',fail_msg         text comment 'status=2 时,记录消息投递失败的原因',fail_count       int         not null default 0 comment '已投递失败次数',send_retry       smallint    not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',next_retry_time  datetime comment '投递失败后,下次重试时间',update_time      datetime comment '最近更新时间',key idx_status (status)
) comment '本地消息表';

五、代码落地

将延迟消息保存到t_msg

在这里插入图片描述

投递事务消息 or 2分钟内的延时消息

在这里插入图片描述

每分钟执行一次Job,查出2分钟内应该被投递的延迟消息 和 2分钟内应该重新投递的上次投递失败的事务消息,再放到延时队列里

    /*** 每分钟执行一次*/@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.MINUTES)public void sendRetry() {/*** 查询出需要重试的消息(状态为0 and 期望投递时间 <= 当前时间 + 2分钟) || (投递失败的 and 需要重试 and 下次重试时间小于等于当前时间 + 2分钟)* select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟))*/LocalDateTime time = LocalDateTime.now().plusMinutes(2);LambdaQueryWrapper<MsgPO> qw = Wrappers.lambdaQuery(MsgPO.class).and(query -> query.and(lq ->lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus()).le(MsgPO::getExpectSendTime, time)).or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus()).eq(MsgPO::getSendRetry, 1).le(MsgPO::getNextRetryTime, time)));qw.orderByAsc(MsgPO::getId);//先获取最小的一条记录的idMsgPO minMsgPo = this.msgService.findOne(qw);if (minMsgPo == null) {return;}this.msgSender.sendRetry(minMsgPo);String minMsgId = minMsgPo.getId();//循环中继续向后找出id>minMsgId的所有记录,然后投递重试while (true) {//select * from t_msg where ((status = 0 and expect_send_time<=当前时间+2分钟) or (status = 2 and send_retry = 1 and next_retry_time <= 当前时间 + 2分钟)) and id>#{minMsgId}qw = Wrappers.lambdaQuery(MsgPO.class).and(query -> query.and(lq ->lq.eq(MsgPO::getStatus, MsgStatusEnum.INIT.getStatus()).le(MsgPO::getExpectSendTime, time)).or(lq -> lq.eq(MsgPO::getStatus, MsgStatusEnum.FAIL.getStatus()).eq(MsgPO::getSendRetry, 1).le(MsgPO::getNextRetryTime, time)));qw.gt(MsgPO::getId, minMsgId);qw.orderByAsc(MsgPO::getId);Page<MsgPO> page = new Page<>();page.setCurrent(1);page.setSize(500);this.msgService.page(page, qw);//如果查询出来的为空 || 当前服务要停止了(stop=true),则退出循环if (CollUtils.isEmpty(page.getRecords()) || stop) {break;}//投递重试for (MsgPO msgPO : page.getRecords()) {this.msgSender.sendRetry(msgPO);}// minMsgId = 当前列表最后一条消息的idminMsgId = page.getRecords().get(page.getRecords().size() - 1).getId();}}

在这里插入图片描述

这篇关于MQ专题:延迟消息的通用方案的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1120462

相关文章

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

MyBatis延迟加载与多级缓存全解析

《MyBatis延迟加载与多级缓存全解析》文章介绍MyBatis的延迟加载与多级缓存机制,延迟加载按需加载关联数据提升性能,一级缓存会话级默认开启,二级缓存工厂级支持跨会话共享,增删改操作会清空对应缓... 目录MyBATis延迟加载策略一对多示例一对多示例MyBatis框架的缓存一级缓存二级缓存MyBat

前端缓存策略的自解方案全解析

《前端缓存策略的自解方案全解析》缓存从来都是前端的一个痛点,很多前端搞不清楚缓存到底是何物,:本文主要介绍前端缓存的自解方案,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、为什么“清缓存”成了技术圈的梗二、先给缓存“把个脉”:浏览器到底缓存了谁?三、设计思路:把“发版”做成“自愈”四、代码

解决docker目录内存不足扩容处理方案

《解决docker目录内存不足扩容处理方案》文章介绍了Docker存储目录迁移方法:因系统盘空间不足,需将Docker数据迁移到更大磁盘(如/home/docker),通过修改daemon.json配... 目录1、查看服务器所有磁盘的使用情况2、查看docker镜像和容器存储目录的空间大小3、停止dock

Spring Gateway动态路由实现方案

《SpringGateway动态路由实现方案》本文主要介绍了SpringGateway动态路由实现方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随... 目录前沿何为路由RouteDefinitionRouteLocator工作流程动态路由实现尾巴前沿S

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

C#实现高性能拍照与水印添加功能完整方案

《C#实现高性能拍照与水印添加功能完整方案》在工业检测、质量追溯等应用场景中,经常需要对产品进行拍照并添加相关信息水印,本文将详细介绍如何使用C#实现一个高性能的拍照和水印添加功能,包含完整的代码实现... 目录1. 概述2. 功能架构设计3. 核心代码实现python3.1 主拍照方法3.2 安全HBIT

Python Excel 通用筛选函数的实现

《PythonExcel通用筛选函数的实现》本文主要介绍了PythonExcel通用筛选函数的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录案例目的示例数据假定数据来源是字典优化:通用CSV数据处理函数使用说明使用示例注意事项案例目的第一

MyBatis Plus实现时间字段自动填充的完整方案

《MyBatisPlus实现时间字段自动填充的完整方案》在日常开发中,我们经常需要记录数据的创建时间和更新时间,传统的做法是在每次插入或更新操作时手动设置这些时间字段,这种方式不仅繁琐,还容易遗漏,... 目录前言解决目标技术栈实现步骤1. 实体类注解配置2. 创建元数据处理器3. 服务层代码优化填充机制详

防止Linux rm命令误操作的多场景防护方案与实践

《防止Linuxrm命令误操作的多场景防护方案与实践》在Linux系统中,rm命令是删除文件和目录的高效工具,但一旦误操作,如执行rm-rf/或rm-rf/*,极易导致系统数据灾难,本文针对不同场景... 目录引言理解 rm 命令及误操作风险rm 命令基础常见误操作案例防护方案使用 rm编程 别名及安全删除