RabbitMQ 之 死信队列

2024-05-27 22:28
文章标签 队列 rabbitmq 死信

本文主要是介绍RabbitMQ 之 死信队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

​编辑一、死信的概念

二、死信的来源

三、死信实战

1、代码架构图

2、消息 TTL 过期

(1)消费者

(2)生产者

(3)结果展示​编辑

 3、队列达到最大长度

(1)消费者

(2)生产者

(3)结果展示

4、消息被拒

(1)消费者

(2)生产者

(3)结果展示


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.


三、死信实战

1、代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。

但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。


2、消息 TTL 过期

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}

// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}

(3)结果展示


 3、队列达到最大长度

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示


4、消息被拒

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制// arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer01 接收的消息是: " + msg + ": 此消息是被 C1 拒绝的");// 拒绝,且不放囧普通队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));   channel.basicAck(message.getEnvelope().getDeliveryTag(),false);            }};CancelCallback cancelCallback = consumerTag->{};// 开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示

 

这篇关于RabbitMQ 之 死信队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1008763

相关文章

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整