RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

本文主要是介绍RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan...

RabbitMQ 默认并不支持“真正意义上的延迟队列”。实现延时消息最常用的方式就是安装 rabbitmq_delayed_message_exchange 插件。
本文将从插件安装、启用、配置、使用全流程带你掌握 RabbitMQ 延时队列的正确姿势。

一、什么是 RabbitMQ 延时队列?

延时队列的核心功能就是:让消息延迟一定时间后再投递到消费者

典型应用场景包括

  • 订单支付超时取消
  • 秒杀未支付自动释放库存
  • 用户注册后延时发送欢迎邮件
  • 自动关闭未操作的工单等

二、安装前准备

✅ RabbitMQ 环境要求

  • RabbitMQ ≥ 3.6.x
  • Erlang ≥ 19.x(越新越好)
  • 插件版本需与 RabbitMQ 版本匹配(注意版本兼容

docker 搭建 RabbitMQ

三、安装延时队列插件

插件名称:

rabbitmq_delayed_message_exchange

1️⃣ 下载插件

你可以从 github 或 RabbitMQ 官网下载预编译的 .ez 文件。

插件发布地址:

  • https://www.rabbitmq.com/community-plugins.html

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

  • GitHub 地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

进入web端后左上角有显示当前安装的RabbitMQ版本

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

需要保证RabbitMQ插件的大版本与RabbitMQ保持一致,否则会启动失败

例如(RabbitMQ 4.0.*):

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v4.0.7/rabbitmq_delayed_message_exchange-v4.0.7.ez

2️⃣ 复制插件到插件目录

将插件拷贝到容器内plugins目录下

docker cp /root/rabbitmq_delayed_message_exchange-v4.0.7.ez rabbitmq:/plugins

.ez 文件复制进去:

// 进入容器 我这里使用容器名字 也可以用容器id进入
docker exec -it rabbitmq /bin/bash
// 移动到plugins目录下
cd plugins
// 查看是否上传成功
ls

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

3️⃣ 启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

输出类似:

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

检查插件是否存在于镜像中
容器内执行:

rabbitmq-plugins list

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

这表明:

✅ 插件 rabbitmq_delayed_message_exchange 已经启用并正在运行。

4️⃣ 重启 RabbitMQ 容器(建议)

docker restart rabbitmq

5️⃣ 验证插件是否启用成功

容器启动成功之后,登录RabbitMQ的管理界面(ip:15672 访问web界面),找到ExchangesTab页。点击Add a new exchange,在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

6️⃣ 配置容器重启时自动加载插件

为了确保 RabbitMQ 容器每次重启时自动启用指定插件,需要在容器内 /etc/rabbitmq/enabled_plugins 文件中配置插件列表。

进入正在运行的容器:

docker exec -it <容器名称或ID> /bin/bash

编辑或创建插件配置文件 /etc/rabbitmq/enabled_plugins,写入如下内容:

[rabbitmq_management, rabbitmq_delayed_message_exchange].

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

保存文件并退出容器。

重新启动容器,确认插件自动启用:

docker restart <容器名称或ID>

通过 RabbitMQ 管理界面或命令行确认插件状态:

docker exec <容器名称或ID> rabbitmq-plugins list

备注:

  • 确保挂载的配置目录(包含 enabled_plugins 文件)权限正确,容器内 RabbitMQ 进程能正常读取。
  • enabled_plugins 文件必须以 Erlang 列表格式并以句点 . 结尾。
  • 也可通过环境变量或启动命令方式启用插件,但写入 enabled_plugins 文件更持久、可靠。

四、使用延时队列功能(Java 示例)

延时消息的核心是:通过一个特殊的 Exchange 类型 x-delayed-message + 自定义 header 设置延迟时间。

创建延时交换机与队列

/**
     * 声明延迟队列(Queue 本身无 TTL,延迟由消息属性 x-delay 控制)
     * 注意:需要安装 RabbitMQ 的延迟消息插件(rabbitmq_delayed_message_exchange)
     */
    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(getRetry().getQueue()).build();
    }
    /**
     * 声明延迟交换机:必须使用插件类型 x-delayed-message,并声明延迟基础类型为 direct
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(
                getRetry().getExchange(),      // 名称
                "x-delayed-message",           // 类型(延迟消息插件)
                true,                          // durable
                false,                         // auto-delete
                args                           // 参数:声明 delayed 类型为 direct
        );
    }
    /**
     * 绑定延迟队列到延迟交换机
     */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue())
                .to(delayExchange())
                .with(getRetry().getRoutingKey())
                .noargs();
    }

发送延时消息

/**
     * 默认延迟 30 分钟(1800000 毫秒)
     */
    public void sendDelayedTask(String message) {
        this.sendDelayedTask(message, 30 * 60 * 1000);
    }
    /**
     * 发送延迟消息
     */
    public void sendDelayedTask(String message, long delayMilli编程s) {
        MessChina编程ageProperties properties = new MessageProperties();
        properties.setContentType(MessageProperties.CONTENT_TYPE_jsON);
        properties.setHeader("x-delay", delayMillis); // 设置延迟时间(单位:毫秒)
        Message amqpMessage = new Message(message.getBytes(StandardCharsets.UTF_8), properties);
        rabbitTemplate.send(
                rabbitMqProperties.getConfig().getRetry().getExchange(),
                rabbitMqProperties.getConfig().getRetry().getRoutingKey(),
                amqpMessage
        );
        log.info("延迟任务已发送,延迟 {} s 后投递: {}", delayMillis/1000, message);
    }

五、注意事项

  • 插件启用后,RabbitMQ 会新增一种交换机类型 x-delayevwIvHnhd-message
  • 不兼容 RabbitMQ 自带的 TTL + DLX 延迟实现方案
  • 插件仅支持设置 发送时延迟,不支持基于队列统一延迟
  • 插件不支持所有 AMQP 客户端(需显式支持 header 设置的客户端)

六、Web 管理界面配置示例

你也可以通过 RabbitMQ 的 Web 控制台:

  1. 创建交换机类型为 x-delayed-message
  2. 设置参数 x-delayed-type = direct
  3. 绑定队列
  4. 消息发布时设置 header:x-delay: 10000(单位 ms)

七、延时队列 VS RabbitMQ 原生 TTL 实现

方案支持粒度精准控制依赖 DLX灵活性复杂度
原生 TTL + 死信队列队列级 / 消息级一般中等中等
延时插件 rabbitmq_delayed_message_exchange消息级

✅ 八、常见问题 FAQ

1. 插件安装后没有效果?

  • 检查是否重启 RabbitMQ
  • 检查交换机类型是否正确设置为 x-delayed-message
  • 检查消息是否设置了 header:x-delay

2. 插件是否与集群兼容?

支持,只需确保每个节点都安装启用了插件

总结

通过安装 rabbitmq_delayed_message_exchange 插件,RabbitMQ 拥有了真正意义上的“消息级”延迟投递功能,无需依赖死信队列和 TTL,极大简化了延时场景的开发与维护工作。

到此这篇关于RabbitChina编程MQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)的文章就介绍到这了,更多相关RabbitMQ 延时队列插件内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!

这篇关于RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1155847

相关文章

Java中流式并行操作parallelStream的原理和使用方法

《Java中流式并行操作parallelStream的原理和使用方法》本文详细介绍了Java中的并行流(parallelStream)的原理、正确使用方法以及在实际业务中的应用案例,并指出在使用并行流... 目录Java中流式并行操作parallelStream0. 问题的产生1. 什么是parallelS

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

Linux join命令的使用及说明

《Linuxjoin命令的使用及说明》`join`命令用于在Linux中按字段将两个文件进行连接,类似于SQL的JOIN,它需要两个文件按用于匹配的字段排序,并且第一个文件的换行符必须是LF,`jo... 目录一. 基本语法二. 数据准备三. 指定文件的连接key四.-a输出指定文件的所有行五.-o指定输出

Linux jq命令的使用解读

《Linuxjq命令的使用解读》jq是一个强大的命令行工具,用于处理JSON数据,它可以用来查看、过滤、修改、格式化JSON数据,通过使用各种选项和过滤器,可以实现复杂的JSON处理任务... 目录一. 简介二. 选项2.1.2.2-c2.3-r2.4-R三. 字段提取3.1 普通字段3.2 数组字段四.

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

MyBatis常用XML语法详解

《MyBatis常用XML语法详解》文章介绍了MyBatis常用XML语法,包括结果映射、查询语句、插入语句、更新语句、删除语句、动态SQL标签以及ehcache.xml文件的使用,感兴趣的朋友跟随小... 目录1、定义结果映射2、查询语句3、插入语句4、更新语句5、删除语句6、动态 SQL 标签7、ehc

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV