typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

本文主要是介绍typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); //10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);

这篇关于typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/856145

相关文章

C++中unordered_set哈希集合的实现

《C++中unordered_set哈希集合的实现》std::unordered_set是C++标准库中的无序关联容器,基于哈希表实现,具有元素唯一性和无序性特点,本文就来详细的介绍一下unorder... 目录一、概述二、头文件与命名空间三、常用方法与示例1. 构造与析构2. 迭代器与遍历3. 容量相关4

C++中悬垂引用(Dangling Reference) 的实现

《C++中悬垂引用(DanglingReference)的实现》C++中的悬垂引用指引用绑定的对象被销毁后引用仍存在的情况,会导致访问无效内存,下面就来详细的介绍一下产生的原因以及如何避免,感兴趣... 目录悬垂引用的产生原因1. 引用绑定到局部变量,变量超出作用域后销毁2. 引用绑定到动态分配的对象,对象

SpringBoot基于注解实现数据库字段回填的完整方案

《SpringBoot基于注解实现数据库字段回填的完整方案》这篇文章主要为大家详细介绍了SpringBoot如何基于注解实现数据库字段回填的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解... 目录数据库表pom.XMLRelationFieldRelationFieldMapping基础的一些代

Java HashMap的底层实现原理深度解析

《JavaHashMap的底层实现原理深度解析》HashMap基于数组+链表+红黑树结构,通过哈希算法和扩容机制优化性能,负载因子与树化阈值平衡效率,是Java开发必备的高效数据结构,本文给大家介绍... 目录一、概述:HashMap的宏观结构二、核心数据结构解析1. 数组(桶数组)2. 链表节点(Node

Java AOP面向切面编程的概念和实现方式

《JavaAOP面向切面编程的概念和实现方式》AOP是面向切面编程,通过动态代理将横切关注点(如日志、事务)与核心业务逻辑分离,提升代码复用性和可维护性,本文给大家介绍JavaAOP面向切面编程的概... 目录一、AOP 是什么?二、AOP 的核心概念与实现方式核心概念实现方式三、Spring AOP 的关

Python实现字典转字符串的五种方法

《Python实现字典转字符串的五种方法》本文介绍了在Python中如何将字典数据结构转换为字符串格式的多种方法,首先可以通过内置的str()函数进行简单转换;其次利用ison.dumps()函数能够... 目录1、使用json模块的dumps方法:2、使用str方法:3、使用循环和字符串拼接:4、使用字符

MyBatis延迟加载与多级缓存全解析

《MyBatis延迟加载与多级缓存全解析》文章介绍MyBatis的延迟加载与多级缓存机制,延迟加载按需加载关联数据提升性能,一级缓存会话级默认开启,二级缓存工厂级支持跨会话共享,增删改操作会清空对应缓... 目录MyBATis延迟加载策略一对多示例一对多示例MyBatis框架的缓存一级缓存二级缓存MyBat

Linux下利用select实现串口数据读取过程

《Linux下利用select实现串口数据读取过程》文章介绍Linux中使用select、poll或epoll实现串口数据读取,通过I/O多路复用机制在数据到达时触发读取,避免持续轮询,示例代码展示设... 目录示例代码(使用select实现)代码解释总结在 linux 系统里,我们可以借助 select、

Linux挂载linux/Windows共享目录实现方式

《Linux挂载linux/Windows共享目录实现方式》:本文主要介绍Linux挂载linux/Windows共享目录实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录文件共享协议linux环境作为服务端(NFS)在服务器端安装 NFS创建要共享的目录修改 NFS 配

通过React实现页面的无限滚动效果

《通过React实现页面的无限滚动效果》今天我们来聊聊无限滚动这个现代Web开发中不可或缺的技术,无论你是刷微博、逛知乎还是看脚本,无限滚动都已经渗透到我们日常的浏览体验中,那么,如何优雅地实现它呢?... 目录1. 早期的解决方案2. 交叉观察者:IntersectionObserver2.1 Inter