RabiitMQ延迟队列(死信交换机)

2024-02-07 09:12

本文主要是介绍RabiitMQ延迟队列(死信交换机),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Dead Letter Exchange(死信交换机)

        在MQ中,当消息成为死信(Dead message 死掉的信息)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而 在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况

  1. 队列消息长度到达限制
  2. 消费者拒签消息,并且不把消息重新放入原队列
  3. 消息到达存活时间未被消费

有些队列的消息成为死信后,(比如过期了或者队列满了)这些死信一般情况下是会被 RabbitMQ 清理的。但是你可以配置某个交换机为此队列的死信交换机,该队列的消息成为死信后会被重新发送到此 DLX 。至于怎么处理这个DLX中的死信就是看具体的业务场景了,DLX 中的信息可以被路由到新的队列。

  • 生产者 

    /*** 普通交换机绑定普通交换机** @return*/@Beanpublic Queue queueA() {//信息配置Map<String, Object> map = new HashMap<>();//message在该队列queue的存活时间最大为15秒map.put("x-message-ttl", 15000);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)map.put("x-dead-letter-exchange", "exchangeB");//x-dead-letter-routing-key参数是给这个DLX指定路由键map.put("x-dead-letter-routing-key", "queueB");return new Queue("queueA", true, false, false, map);}@Beanpublic DirectExchange exchangeA() {return new DirectExchange("exchangeA");}@Beanpublic Binding bindingA() {return BindingBuilder.bind(queueA()).to(exchangeA()).with("queueA");}/*** 死信交换机绑定死信交换机** @return*/@Beanpublic Queue queueB() {return new Queue("queueB");}@Beanpublic DirectExchange exchangeB() {return new DirectExchange("exchangeB");}@Beanpublic Binding bindingB() {return BindingBuilder.bind(queueB()).to(exchangeB()).with("queueB");}
  •  模拟发送请求
    @RequestMapping("/send6")public String sendSix() throws JsonProcessingException {rabbitTemplate.convertAndSend("exchangeA", "queueA", "检查订单是否过期");return "🫶";}

这时我发送请求到队列queueA,并设置了15秒的延迟,将超时的信息调用到死信交换机中。在这里我是没开启消费者所有没有消费者去处理该请求的,信息在queueA队列等待15秒后将会转到死信交换机queueB队列进行处理:

延迟队列

  • 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。经典的应用场景是下单减库存。 

        根据以上结论,在rabbitmq中消费者只要接到信息就会自动确认进行处理。所以在上面并没有开启消费者,当请求時效后(如订单未支付,定时30分钟自动取消功能)我们不应该再让它正常处理,而把该请求放到死信交换机中安排对应的处理,所以我们需要打消费者自动处理请求改成手动。 

  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

  1. AcknowledgeMode.NONE:自动确认
  2. AcknowledgeMode.AUTO:根据情况确认
  3. AcknowledgeMode.MANUAL:手动确认

确认消息(局部方法处理消息)

  • 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

消费者添加手动确认消息配置配置 :

spring:rabbitmq:listener:simple:acknowledge-mode: manua
  • 消费者接受消息: 
package com.ycxw.consumer.demos;import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author 云村小威* @create 2024-01-25 15:36*/
@Component
public class DLXReceiver {@RabbitListener(queues = {"queueA"})@RabbitHandlerpublic void handlerA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {System.out.println("已接受到队列queueA传递过来的消息:" + msg);channel.basicReject(tag, false);// 拒接消息,如果为true则拒绝后又从新回到队列被接受(循环),除非消息过期。//channel.basicAck(tag, true); 确认消息()一次性全接受,如果为false则接受一次}/*** 接受死信消息** @param msg*/@RabbitListener(queues = {"queueB"})@RabbitHandlerpublic void handlerB(String msg) {/*** ...接受到信息,去数据库处理*/System.out.println("已接受到队列queueB传递过来的消息:" + msg);}
}

 第一次进入普通队列别拒绝后,转到死信队列中处理...

需要注意的 basicAck 方法需要传递两个参数

  • deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

  • multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

这篇关于RabiitMQ延迟队列(死信交换机)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/687239

相关文章

MyBatis延迟加载与多级缓存全解析

《MyBatis延迟加载与多级缓存全解析》文章介绍MyBatis的延迟加载与多级缓存机制,延迟加载按需加载关联数据提升性能,一级缓存会话级默认开启,二级缓存工厂级支持跨会话共享,增删改操作会清空对应缓... 目录MyBATis延迟加载策略一对多示例一对多示例MyBatis框架的缓存一级缓存二级缓存MyBat

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

如何正确识别一台POE交换机的好坏? 选购可靠的POE交换机注意事项

《如何正确识别一台POE交换机的好坏?选购可靠的POE交换机注意事项》POE技术已经历多年发展,广泛应用于安防监控和无线覆盖等领域,需求量大,但质量参差不齐,市场上POE交换机的品牌繁多,如何正确识... 目录生产标识1. 必须包含的信息2. 劣质设备的常见问题供电标准1. 正规的 POE 标准2. 劣质设

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

一文带你迅速搞懂路由器/交换机/光猫三者概念区别

《一文带你迅速搞懂路由器/交换机/光猫三者概念区别》讨论网络设备时,常提及路由器、交换机及光猫等词汇,日常生活、工作中,这些设备至关重要,居家上网、企业内部沟通乃至互联网冲浪皆无法脱离其影响力,本文将... 当谈论网络设备时,我们常常会听到路由器、交换机和光猫这几个名词。它们是构建现代网络基础设施的关键组成

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time

Spring框架中@Lazy延迟加载原理和使用详解

《Spring框架中@Lazy延迟加载原理和使用详解》:本文主要介绍Spring框架中@Lazy延迟加载原理和使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、@Lazy延迟加载原理1.延迟加载原理1.1 @Lazy三种配置方法1.2 @Component