RabbitMQ 之 死信队列

2024-05-27 22:28
文章标签 队列 rabbitmq 死信

本文主要是介绍RabbitMQ 之 死信队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

​编辑一、死信的概念

二、死信的来源

三、死信实战

1、代码架构图

2、消息 TTL 过期

(1)消费者

(2)生产者

(3)结果展示​编辑

 3、队列达到最大长度

(1)消费者

(2)生产者

(3)结果展示

4、消息被拒

(1)消费者

(2)生产者

(3)结果展示


一、死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理
解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息
消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效


二、死信的来源

1、消息 TTL 过期
2、队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3、消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.


三、死信实战

1、代码架构图

生产者正常情况下走的是普通的交换机,这个交换机的类型是 direct ,它和普通队列之间的关系是一个叫 "zhangsan" 的路由 key, 正常情况下会被 C1 消费。

但是发生了上面所说的三种情况中的一种,成为了死信,然后被转换到死信交换机中,这个死信交换机也是 direct 类型,它们之间的 routingKey 是 "lisi",然后就进入了死信队列,死信队列由  C2 消费。


2、消息 TTL 过期

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}

// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}
}

(3)结果展示


 3、队列达到最大长度

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示


4、消息被拒

(1)消费者

// 死信队列 实战
// 消费者 1
public class Comsumer01 {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";// 死信交换机名称public static final String DEAD_EXCHANGE = "dead_exchange";// 普通队列名称public static final String NORMAL_QUEUE = "normal_queue";// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 交换机的声明channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通队列的声明Map<String, Object> arguments = new HashMap<>();// 过期时间//arguments.put("x-message-ttl",100000);// 正常队列设置死信交换机arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);// 设置死信 RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");// 设置正常队列的长度的限制// arguments.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);// 死信队列的声明channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通的交换机与普通队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");// 绑定死信的交换机与死信的队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{String msg = new String(message.getBody(),"UTF-8");if (msg.equals("info5")){System.out.println("Consumer01 接收的消息是: " + msg + ": 此消息是被 C1 拒绝的");// 拒绝,且不放囧普通队列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer01 接收的消息是: " + new String(message.getBody(),"UTF-8"));   channel.basicAck(message.getEnvelope().getDeliveryTag(),false);            }};CancelCallback cancelCallback = consumerTag->{};// 开启手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);}
}
// 死信队列 实战
// 消费者 2
public class Comsumer02 {// 死信队列名称public static final String DEAD_QUEUE = "dead_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("等待接收消息.....");DeliverCallback deliverCallback = ( consumerTag, message) ->{System.out.println("Consumer02 接收的消息是: " + new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback = consumerTag->{};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);}
}

(2)生产者

// 死信队列  生产者代码
public class Producer {// 普通交换机名称public static final String NORMAL_EXCHANGE = "normal_exchange";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();// 死信消息 设置 TTL 的时间/*AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();*/for (int i = 1; i < 11; i++) {String message = "info" + i;channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());}}
}

(3)结果展示

 

这篇关于RabbitMQ 之 死信队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1008763

相关文章

SpringBoot集成redisson实现延时队列教程

《SpringBoot集成redisson实现延时队列教程》文章介绍了使用Redisson实现延迟队列的完整步骤,包括依赖导入、Redis配置、工具类封装、业务枚举定义、执行器实现、Bean创建、消费... 目录1、先给项目导入Redisson依赖2、配置redis3、创建 RedissonConfig 配

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

spring AMQP代码生成rabbitmq的exchange and queue教程

《springAMQP代码生成rabbitmq的exchangeandqueue教程》使用SpringAMQP代码直接创建RabbitMQexchange和queue,并确保绑定关系自动成立,简... 目录spring AMQP代码生成rabbitmq的exchange and 编程queue执行结果总结s

RabbitMQ消费端单线程与多线程案例讲解

《RabbitMQ消费端单线程与多线程案例讲解》文章解析RabbitMQ消费端单线程与多线程处理机制,说明concurrency控制消费者数量,max-concurrency控制最大线程数,prefe... 目录 一、基础概念详细解释:举个例子:✅ 单消费者 + 单线程消费❌ 单消费者 + 多线程消费❌ 多

RabbitMQ消息总线方式刷新配置服务全过程

《RabbitMQ消息总线方式刷新配置服务全过程》SpringCloudBus通过消息总线与MQ实现微服务配置统一刷新,结合GitWebhooks自动触发更新,避免手动重启,提升效率与可靠性,适用于配... 目录前言介绍环境准备代码示例测试验证总结前言介绍在微服务架构中,为了更方便的向微服务实例广播消息,

在Spring Boot中集成RabbitMQ的实战记录

《在SpringBoot中集成RabbitMQ的实战记录》本文介绍SpringBoot集成RabbitMQ的步骤,涵盖配置连接、消息发送与接收,并对比两种定义Exchange与队列的方式:手动声明(... 目录前言准备工作1. 安装 RabbitMQ2. 消息发送者(Producer)配置1. 创建 Spr

Java中常见队列举例详解(非线程安全)

《Java中常见队列举例详解(非线程安全)》队列用于模拟队列这种数据结构,队列通常是指先进先出的容器,:本文主要介绍Java中常见队列(非线程安全)的相关资料,文中通过代码介绍的非常详细,需要的朋... 目录一.队列定义 二.常见接口 三.常见实现类3.1 ArrayDeque3.1.1 实现原理3.1.2

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

golang实现延迟队列(delay queue)的两种实现

《golang实现延迟队列(delayqueue)的两种实现》本文主要介绍了golang实现延迟队列(delayqueue)的两种实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录1 延迟队列:邮件提醒、订单自动取消2 实现2.1 simplChina编程e简单版:go自带的time