RabbitMQ 实验消费原始队列消息, 拒绝(reject)投递死信交换机过程

本文主要是介绍RabbitMQ 实验消费原始队列消息, 拒绝(reject)投递死信交换机过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如果你想通过 RabbitMQ 的死信队列功能实现消费者拒绝消息投递到死信交换机的行为,你可以按照以下步骤操作:

  1. 创建原始队列,并将其绑定到一个交换机上:
export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
rabbitmqadmin declare queue name=my_queue durable=true
rabbitmqadmin declare exchange name=my_exchange type=direct
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 创建死信交换机和死信队列,并将死信队列绑定到死信交换机:
rabbitmqadmin declare exchange name=my_dlx type=direct
rabbitmqadmin declare queue name=my_dlq durable=true
rabbitmqadmin declare binding source=my_dlx destination=my_dlq routing_key=dlx_routing_key
  1. 设置原始队列的参数,包括死信交换机和死信路由键:
rabbitmqadmin declare queue name=my_queue durable=true arguments='{"x-dead-letter-exchange":"my_dlx", "x-dead-letter-routing-key":"dlx_routing_key"}'
  1. 编写消费者代码,在消费者代码中拒绝投递消息到死信交换机:
//Consumer.ts
import * as amqp from 'amqplib';async function main() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const queue = 'my_queue';await channel.consume(queue, async (msg) => {if (msg !== null) {try {await new Promise(resolve => {setTimeout(()=>{resolve(true)},3000)})// Process the messageconsole.log("Received message:", msg.content.toString());// Simulate rejection of message delivery to dead letter exchangethrow new Error("Failed to process message, rejecting delivery to dead letter exchange");} catch (error) {// Reject the message delivery to dead letter exchangechannel.reject(msg, false);console.error("Error processing message:", error.message);}}});console.log('Waiting for messages...');
}main().catch(console.error);

在这里插入图片描述

在这个示例中,当消费者处理消息时发生错误时,它会将消息的投递拒绝到死信交换机。这样,这些被拒绝的消息将被重新路由到死信队列 my_dlq 中。

记得根据你的具体需求修改队列、交换机和消费者的配置,确保它们符合你的预期行为。

rabbitmqadmin purge queue name=my_dlq #删除队列中的所有消息

//Producer.ts 
import * as amqp from 'amqplib';async function sendMsg() {try {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel(); // 创建通道const delayQueue = 'my_queue'; // 延迟队列名称// 每秒发送一条消息for (let i = 0; i < 100; i++) {await new Promise(resolve => {setTimeout(() => {resolve(true)}, 1000)})const message = 'Hello RabbitMQ!';channel.sendToQueue(delayQueue, Buffer.from(message));console.log('消息已发送:', message);}} catch (error) {console.error('错误:', error);}
}sendMsg()

这篇关于RabbitMQ 实验消费原始队列消息, 拒绝(reject)投递死信交换机过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/858124

相关文章

ShardingProxy读写分离之原理、配置与实践过程

《ShardingProxy读写分离之原理、配置与实践过程》ShardingProxy是ApacheShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶... 目录一、ShardingProxy技术定位与读写分离核心价值1.1 技术定位1.2 读写分离核心价值二

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

Java Kafka消费者实现过程

《JavaKafka消费者实现过程》Kafka消费者通过KafkaConsumer类实现,核心机制包括偏移量管理、消费者组协调、批量拉取消息及多线程处理,手动提交offset确保数据可靠性,自动提交... 目录基础KafkaConsumer类分析关键代码与核心算法2.1 订阅与分区分配2.2 拉取消息2.3

AOP编程的基本概念与idea编辑器的配合体验过程

《AOP编程的基本概念与idea编辑器的配合体验过程》文章简要介绍了AOP基础概念,包括Before/Around通知、PointCut切入点、Advice通知体、JoinPoint连接点等,说明它们... 目录BeforeAroundAdvise — 通知PointCut — 切入点Acpect — 切面

spring AMQP代码生成rabbitmq的exchange and queue教程

《springAMQP代码生成rabbitmq的exchangeandqueue教程》使用SpringAMQP代码直接创建RabbitMQexchange和queue,并确保绑定关系自动成立,简... 目录spring AMQP代码生成rabbitmq的exchange and 编程queue执行结果总结s

C++ STL-string类底层实现过程

《C++STL-string类底层实现过程》本文实现了一个简易的string类,涵盖动态数组存储、深拷贝机制、迭代器支持、容量调整、字符串修改、运算符重载等功能,模拟标准string核心特性,重点强... 目录实现框架一、默认成员函数1.默认构造函数2.构造函数3.拷贝构造函数(重点)4.赋值运算符重载函数

聊聊springboot中如何自定义消息转换器

《聊聊springboot中如何自定义消息转换器》SpringBoot通过HttpMessageConverter处理HTTP数据转换,支持多种媒体类型,接下来通过本文给大家介绍springboot中... 目录核心接口springboot默认提供的转换器如何自定义消息转换器Spring Boot 中的消息

MySQ中出现幻读问题的解决过程

《MySQ中出现幻读问题的解决过程》文章解析MySQLInnoDB通过MVCC与间隙锁机制在可重复读隔离级别下解决幻读,确保事务一致性,同时指出性能影响及乐观锁等替代方案,帮助开发者优化数据库应用... 目录一、幻读的准确定义与核心特征幻读 vs 不可重复读二、mysql隔离级别深度解析各隔离级别的实现差异

Nginx添加内置模块过程

《Nginx添加内置模块过程》文章指导如何检查并添加Nginx的with-http_gzip_static模块:确认该模块未默认安装后,需下载同版本源码重新编译,备份替换原有二进制文件,最后重启服务验... 目录1、查看Nginx已编辑的模块2、Nginx官网查看内置模块3、停止Nginx服务4、Nginx