RabbitMQ——死信队列和延迟队列

2024-03-18 05:36
文章标签 队列 rabbitmq 延迟 死信

本文主要是介绍RabbitMQ——死信队列和延迟队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

RabbitMQ——死信队列和延迟队列

1、死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

死信的来源

在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

  • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
  • 消息过期(TTL,Time-To-Live)。
  • 队列长度超过限制,无法再添加数据到mq中。

生产者

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;/*** @Author 方唐镜* @Create 2024-03-03 14:08* @Description*/
public class Produce {private static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//模拟消息过期 10s//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 0; i < 10; i++) {String message = "hello world" + i;channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());}}
}

消费者

正常队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer01 {private static final String NORMAL_EXCHANGE = "normal_exchange";private static final String DEAD_EXCHANGE = "dead_exchange";private static final String NORMAL_QUEUE = "normal_queue";private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//声明死信交换机和队列channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//绑定channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");//声明普通交换机和队列channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//死信配制 指定死信交换机和死信路由键Map<String, Object> map = new HashMap<>();map.put("x-dead-letter-exchange", DEAD_EXCHANGE);map.put("x-dead-letter-routing-key", "dead-routing-key");//最大长度//map.put("x-max-length", 6);channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);if (message.contains("5")){System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");//拒绝消息并把消息丢入死信队列channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}else {System.out.println("Consumer01接收消息:" + message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});}
}

死信队列:

package com.weipch.rabbitmq.dlq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;/*** @Author 方唐镜* @Create 2024-03-03 13:50* @Description*/
public class Consumer02 {private static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.basicConsume(DEAD_QUEUE, true,(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),(consumerTag, e) -> {});}
}

生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

2、基于插件的延迟队列

延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

使用场景

  1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
  2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
  3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
  4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
  5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
  6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

2.1、安装延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

以docker方式安装

1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins

插件版本和rabbitmq版本一致

2、进入容器查看插件

在这里插入图片描述

3、启动插件

root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart 7c8726620871

5、安装成功

在这里插入图片描述

2.2、代码实例

配置类

package springbootrabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;@Configuration
public class DelayedQueueConfig {//    队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//    交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//    routingKeypublic static final String DELAYED_ROUTING_KEY = "delayed.routingKey";//    声明队列@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//    声明交换机 基于插件的交换机@Beanpublic CustomExchange delayedExchange() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");/** 1.交换机名称* 2.交换机类型* 3.是否需要持久化* 4.是否需要自动删除* 5.其他参数* */return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);}//    绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {//            发送消息的时候 延迟时长msg.getMessageProperties().setDelay(delayTime);return msg;});
}

消费者

@Slf4j
@Component
public class DelayedQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);}
}

这篇关于RabbitMQ——死信队列和延迟队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/821332

相关文章

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

spring AMQP代码生成rabbitmq的exchange and queue教程

《springAMQP代码生成rabbitmq的exchangeandqueue教程》使用SpringAMQP代码直接创建RabbitMQexchange和queue,并确保绑定关系自动成立,简... 目录spring AMQP代码生成rabbitmq的exchange and 编程queue执行结果总结s

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time

Spring框架中@Lazy延迟加载原理和使用详解

《Spring框架中@Lazy延迟加载原理和使用详解》:本文主要介绍Spring框架中@Lazy延迟加载原理和使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、@Lazy延迟加载原理1.延迟加载原理1.1 @Lazy三种配置方法1.2 @Component