尚品汇-MQ模块搭建测试、消息不丢失(重)(四十三)

2024-09-01 15:36

本文主要是介绍尚品汇-MQ模块搭建测试、消息不丢失(重)(四十三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录:

(1)消息不丢失

(2)消息确认

(3)消息确认业务封装

 (4)封装发送端消息确认

(5)封装消息发送

(6)发送确认消息测试

(7)消息发送失败,设置重发机制

(1)消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:

1,生产者不丢数据

2,MQ服务器不丢数据

3,消费者不丢数据

保证消息不丢失有两种实现方式:

  1. 开启事务模式
  2. 消息确认模式

说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

(2)消息确认

消息持久化

如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化

Exchange

声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)

Queue

声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)

message

发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:

1、将queue的持久化标识durable设置为true,则代表是一个持久的队列

2、发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认

有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认

有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?

要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

(3)消息确认业务封装

搭建rabbit-util模块

由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可

搭建方式如:common-util,导入常量类 MqConst

package com.atguigu.gmall.constant;public class MqConst {/*** 消息补偿*/public static final String MQ_KEY_PREFIX = "mq:list";public static final int RETRY_COUNT = 3;/*** 商品上下架*/public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";public static final String ROUTING_GOODS_UPPER = "goods.upper";public static final String ROUTING_GOODS_LOWER = "goods.lower";//队列public static final String QUEUE_GOODS_UPPER  = "queue.goods.upper";public static final String QUEUE_GOODS_LOWER  = "queue.goods.lower";/*** 取消订单,发送延迟队列*/public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;public static final String ROUTING_ORDER_CANCEL = "order.create";//延迟取消订单队列public static final String QUEUE_ORDER_CANCEL  = "queue.order.cancel";//取消订单 延迟时间 单位:秒 真实业务public static final int DELAY_TIME  = 24*60*60;//  测试取消订单// public static final int DELAY_TIME  = 3;/*** 订单支付*/public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";public static final String ROUTING_PAYMENT_PAY = "payment.pay";//队列public static final String QUEUE_PAYMENT_PAY  = "queue.payment.pay";/*** 减库存*/public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";public static final String ROUTING_WARE_STOCK = "ware.stock";//队列public static final String QUEUE_WARE_STOCK  = "queue.ware.stock";/*** 减库存成功,更新订单状态*/public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";public static final String ROUTING_WARE_ORDER = "ware.order";//队列public static final String QUEUE_WARE_ORDER  = "queue.ware.order";/*** 关闭交易*/public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";public static final String ROUTING_PAYMENT_CLOSE = "payment.close";//队列public static final String QUEUE_PAYMENT_CLOSE  = "queue.payment.close";/*** 定时任务*/public static final String EXCHANGE_DIRECT_TASK = "exchange.direct.task";public static final String ROUTING_TASK_1 = "seckill.task.1";//队列public static final String QUEUE_TASK_1  = "queue.task.1";/*** 秒杀*/public static final String EXCHANGE_DIRECT_SECKILL_USER = "exchange.direct.seckill.user";public static final String ROUTING_SECKILL_USER = "seckill.user";//队列public static final String QUEUE_SECKILL_USER  = "queue.seckill.user";/*** 定时任务*/public static final String ROUTING_TASK_18 = "seckill.task.18";//队列public static final String QUEUE_TASK_18  = "queue.task.18";}

pom.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>common</artifactId><groupId>com.atguigu.gmall</groupId><version>1.0</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rabbit-util</artifactId><dependencies><!--rabbitmq消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 协议--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies></project>

service-mq引入rabbit-util模块依赖

<!--rabbitmq消息--><dependency><groupId>com.atguigu.gmall</groupId><artifactId>rabbit-util</artifactId><version>1.0</version></dependency>

 (4)封装发送端消息确认

在rabbit-util 中添加类

package com.atguigu.gmall.common.config;/*** @Description 消息发送确认* <p>* ConfirmCallback  只确认消息是否正确到达 Exchange 中* ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行* <p>* 1. 如果消息没有到exchange,则confirm回调,ack=false* 2. 如果消息到达exchange,则confirm回调,ack=true* 3. exchange到queue成功,则不回调return* 4. exchange到queue失败,则回调return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化对象输出System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}}

(5)封装消息发送

在rabbit-util 中添加类RabbitService

package com.atguigu.gmall.common.service;@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/***  发送消息* @param exchange 交换机* @param routingKey 路由键* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}

(6)发送确认消息测试

在service-mq编写测试代码

消息发送端

package com.atguigu.gmall.mq.controller;@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息发送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");return Result.ok();}
}

消息接收端

在service-mq 中编写

这里交换机队列绑定:有两种方式一种是配置,一种是注解

我们这里使用注解进行创建绑定

@SneakyThrows 是Lombk的异常 处理注解

package com.atguigu.gmall.mq.receiver;@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "false"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// 参数一:消息的唯一标识,参数二:是否批量确认 false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

测试:http://localhost:8282/mq/sendConfirm

 加入了确认机制后,成功后:会有提示

把交换机写错,发送到交换机会失败

 

 

把路由key写错,交换机到队列消息失败:


 

(7)消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制

在rabbit-util 模块中添加依赖

<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {//  消息主体private Object message;//  交换机private String exchange;//  路由键private String routingKey;//  重试次数private int retryCount = 0;//  消息类型  是否是延迟消息private boolean isDelay = false;//  延迟时间private int delayTime = 10;
}

 

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){//  将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  声明一个correlationId的变量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);//  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法
//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);//  默认返回truereturn true;
}

发送失败调用重发方法  MQProducerAckConfig 配置类中修改 

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//  ack = true 说明消息正确发送到了交换机if (ack){System.out.println("哥们你来了.");log.info("消息发送到了交换机");}else {//  消息没有到交换机log.info("消息没发送到交换机");//  调用重试发送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);//  获取这个CorrelationData对象的Id  spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);//  消息没有到队列的时候,则会调用重试发送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);//  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.this.retrySendMsg(gmallCorrelationData);
}/*** 重试发送方法* @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {//  数据类型转换  统一转换为子类处理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;//  获取到重试次数 初始值 0int retryCount = gmallCorrelationData.getRetryCount();//  判断if (retryCount>=3){//  不需要重试了log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));} else {//  变量更新retryCount+=1;//  重新赋值重试次数 第一次重试 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重试次数:\t"+retryCount);//  更新缓存中的数据this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}

把路由写错,交换机到队列消息失败:

  把交换机写错:发送到交换机失败

 

总结

为了防止消息不丢失 ,从以下三方面考虑:

1,生产者不丢数据

2,MQ服务器不丢数据

3,消费者不丢数据

   

生产者:

MQ服务器:

RabbitMQ重启之后消息不丢失

消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号

开启确认回调(成功到达交换机发送ack标识,没有达到重发)、失败回调(设置重发机制)

为了防止消费者处理消息丢失,我们开启消费者消息消费确认回调,消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

这篇关于尚品汇-MQ模块搭建测试、消息不丢失(重)(四十三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1127343

相关文章

如何搭建并配置HTTPD文件服务及访问权限控制

《如何搭建并配置HTTPD文件服务及访问权限控制》:本文主要介绍如何搭建并配置HTTPD文件服务及访问权限控制的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、安装HTTPD服务二、HTTPD服务目录结构三、配置修改四、服务启动五、基于用户访问权限控制六、

pytest+allure环境搭建+自动化实践过程

《pytest+allure环境搭建+自动化实践过程》:本文主要介绍pytest+allure环境搭建+自动化实践过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、pytest下载安装1.1、安装pytest1.2、检测是否安装成功二、allure下载安装2.

使用vscode搭建pywebview集成vue项目实践

《使用vscode搭建pywebview集成vue项目实践》:本文主要介绍使用vscode搭建pywebview集成vue项目实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录环境准备项目源码下载项目说明调试与生成可执行文件核心代码说明总结本节我们使用pythonpywebv

MySQL启动报错:InnoDB表空间丢失问题及解决方法

《MySQL启动报错:InnoDB表空间丢失问题及解决方法》在启动MySQL时,遇到了InnoDB:Tablespace5975wasnotfound,该错误表明MySQL在启动过程中无法找到指定的s... 目录mysql 启动报错:InnoDB 表空间丢失问题及解决方法错误分析解决方案1. 启用 inno

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

python多线程并发测试过程

《python多线程并发测试过程》:本文主要介绍python多线程并发测试过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、并发与并行?二、同步与异步的概念?三、线程与进程的区别?需求1:多线程执行不同任务需求2:多线程执行相同任务总结一、并发与并行?1、

Windows Server 2025 搭建NPS-Radius服务器的步骤

《WindowsServer2025搭建NPS-Radius服务器的步骤》本文主要介绍了通过微软的NPS角色实现一个Radius服务器,身份验证和证书使用微软ADCS、ADDS,具有一定的参考价... 目录简介示意图什么是 802.1X?核心作用802.1X的组成角色工作流程简述802.1X常见应用802.

电脑提示Winmm.dll缺失怎么办? Winmm.dll文件丢失的多种修复技巧

《电脑提示Winmm.dll缺失怎么办?Winmm.dll文件丢失的多种修复技巧》有时电脑会出现无法启动程序,因为计算机中丢失winmm.dll的情况,其实,winmm.dll丢失是一个比较常见的问... 在大部分情况下出现我们运行或安装软件,游戏出现提示丢失某些DLL文件或OCX文件的原因可能是原始安装包

无法启动此程序因为计算机丢失api-ms-win-core-path-l1-1-0.dll修复方案

《无法启动此程序因为计算机丢失api-ms-win-core-path-l1-1-0.dll修复方案》:本文主要介绍了无法启动此程序,详细内容请阅读本文,希望能对你有所帮助... 在计算机使用过程中,我们经常会遇到一些错误提示,其中之一就是"api-ms-win-core-path-l1-1-0.dll丢失

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,