Day10_08_消息队列之RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现

本文主要是介绍Day10_08_消息队列之RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现

一. 消息可靠传递的重要性

我们在项目中使用RabbitMQ时,可能会遇到这样的问题:假如在一个订单系统中,用户付款成功,此时我们往RabbitMQ消息队列中发送一条消息,然后期望消息消费者修改订单状态,但是最终实际订单状态却并没有被成功修改.遇到这种问题我们排查的思路如下:

  • 1️⃣.消息是否被成功的发送到消息队列?

  • 2️⃣.消息是否有丢失的情况?

  • 3️⃣.消息是否被成功的消费了?

在生产环境中是不允许出现消息发送/消费错误的情况的,因为这可能会给企业带来巨大的损失.本文将介绍RabbitMQ如何保证消息的可靠性(生产者保证消息可靠投递,消费者保证消息可靠消费,RabbitMQ持久化).

二. 保证消息可靠传递的手段

  • 1️⃣.设置交换机、队列和消息都为持久化;

  • 2️⃣.生产者消息确认机制;

  • 3️⃣.消费者消息确认机制;

  • 4️⃣.死信队列.

三. 设置交换机、消息队列和消息持久化

在生产过程中,难免会发生服务器宕机的事情,RabbitMQ也不例外.可能由于某种特殊情况下的异常而导致RabbitMQ宕机从而重启,那么这个时候对于消息队列里的数据,包括交换机、队列以及队列中存在的消息的恢复就显得尤为重要了.RabbitMQ本身带有持久化机制,包括交换机、队列以及消息的持久化.持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据(当然凡事都不是100%的,只能尽最大程度的保证消息不会丢失).

持久化: 保证服务器重启的时候消息不丢失,重点解决服务器的异常崩溃而导致的消息丢失问题.但是,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不只一点点.对于可靠性要求不是那么高的消息可以不持久化以提高整体的吞吐率.在选择是否要将消息进行持久化时,需要在可靠性和吞吐量之间做一个权衡.

对于某些应用场景,如大流量的订单交易系统,为了不影响性能,我们可以不设置持久化.但是我们会定时扫描数据库中的未发送成功的消息,进行重试发送.

1. 交换机的持久化

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel =connection.createChannel();#该交换机默认没有持久化
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

使用这种方法声明的交换机,默认不是持久化的,在服务器重启之后,交换机会消失.我们在管理台的Exchange页签下查看交换机,可以看到使用上述方法声明的交换机,Features一列是空的,即没有任何附加属性.

我们换用另一种方法声明交换机:

try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection=factory.newConnection();Channel channel = connection.createChannel();#该交换机实现了持久化channel.exchangeDeclare(EXCHANGE_NAME,"fanout",true);
} catch (IOException e) {e.printStackTrace();
} catch (TimeoutException e) {e.printStackTrace();
}

查看一下方法的说明:

    /*** Actively declare a non-autodelete exchange with no extra arguments* @see com.rabbitmq.client.AMQP.Exchange.Declare* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk* @param exchange the name of the exchange* @param type the exchange type* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)* @throws java.io.IOException if an error is encountered* @return a declaration-confirm method to indicate the exchange was successfully declared*/Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

我们可以看到第三个参数durable,如果为true时则表示要做持久化,当服务重启时,交换机依然存在.所以使用该方法声明的交换机是下面这个样子的(做测试的时候,需要先在管理台删掉原来的同名交换机).D表示durable,鼠标放在上边会显示为true.

2. 队列的持久化

与交换机的持久化相同,队列的持久化也是通过durable参数实现的,默认生成的随机队列不是持久化的.前面示例中声明的带有我们自定义名字的队列都是持久化的.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
#第个参数用于进行持久化设置
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

看一下方法的定义:

/*** Declare a queue* @see com.rabbitmq.client.AMQP.Queue.Declare* @see com.rabbitmq.client.AMQP.Queue.DeclareOk* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

第二个参数跟交换机方法的参数一样,true表示做持久化.当RabbitMQ服务重启时,队列依然存在.
这里说一下后边的三个参数:

  • exclusive是排他队列:如果一个队列被声明为排他队列,那么这个队列只能被第一次声明他的连接所见,并在连接断开的时候自动删除.这里有三点需要说明:

    1️⃣.同一个连接的不同channel,是可以访问同一连接下创建的排他队列的;

    2️⃣.排他队列只能被声明一次,其他连接不允许声明同名的排他队列;

    3️⃣.即使排他队列是持久化的,当连接断开或者客户端退出时,排他队列依然会被删除.

  • autoDelete是自动删除:为true时,当没有任何消费者订阅该队列时,队列会被自动删除;

  • arguments:其它参数.

3. 消息的持久化

消息的持久化是指当消息从交换机发送到队列之后,被消费者消费之前,服务器突然宕机重启,消息仍然存在.消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无意义.

通过如下代码设置消息的持久化:

channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

其中MessageProperties.PERSISTENT_TEXT_PLAIN是设置持久化的参数.

我们查看basicPublish方法的定义.

/*** Publish a message* @see com.rabbitmq.client.AMQP.Basic.Publish* @param exchange the exchange to publish the message to* @param routingKey the routing key* @param props other properties for the message - routing headers etc* @param body the message body* @throws java.io.IOException if an error is encountered*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

在看下BasicProperties的类型

public static class BasicProperties {private String contentType;private String contentEncoding;private Map<String,Object> headers;#deliveryMode是设置消息持久化的参数private Integer deliveryMode;private Integer priority;private String correlationId;private String replyTo;private String expiration;private String messageId;private Date timestamp;private String type;private String userId;private String appId;private String clusterId;

其中deliveryMode是设置消息持久化的参数,等于1表示不设置持久化,等于2设置持久化.PERSISTENT_TEXT_PLAIN是进行实例化的一个deliveryMode=2的常量对象,便于编程.

public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0,null, null,null,null,null,null, null,null,null);

设置了队列的持久化和消息的持久化之后,当服务器宕机重启,存在队列中未发送的消息会依然存在.

注意:

持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清除.

非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间.

以上就是关于RabbitMQ中持久化的一些内容,但是并不会严格的100%保证信息不会丢失.

四. 消息确认机制

1. 消息确认种类

RabbitMQ的消息确认有两种:

消息发送确认: 这种是用来确认生产者将消息发送到交换机,交换机传递到队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列.

消费接收确认: 这种是确认消费者是否成功消费了队列中的消息.

2. 消息发送确认

2.1 ConfirmCallback

通过实现ConfirmCallBack接口,消息发送到交换机Exchange后触发该回调.

使用该功能需要开启确认,spring-boot中配置如下:

spring.rabbitmq.publisher-confirms = true

2.2 ReturnCallback

通过实现ReturnCallback接口,如果消息从交换机发送到对应的队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发该回调).

使用该功能需要开启确认,spring-boot中配置如下:

spring.rabbitmq.publisher-returns = true

3. 消息接收确认

3.1 消息接收的确认模式

  • AcknowledgeMode.NONE: 不确认

  • AcknowledgeMode.AUTO: 自动确认

  • AcknowledgeMode.MANUAL: 手动确认

spring-boot中配置方法:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

3.2 手动确认

3.2.1 成功确认

void basicAck(long deliveryTag, boolean multiple) throws IOException;#deliveryTag:该消息的index;#multiple: 是否批量. true: 将一次性ack所有小于deliveryTag的消息.#消费者成功处理后,调用
#channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)
#方法对消息进行确认.

3.2.2 失败确认

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;#deliveryTag:该消息的index.#multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息.#requeue:被拒绝的消息是否重新进入队列.
void basicReject(long deliveryTag, boolean requeue) throws IOException;#deliveryTag:该消息的index.
#requeue: 被拒绝的是否重新入队列.

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息.

五. 生产者的消息发送确认机制

1. 消息发送确认机制概述

当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了.

为解决这个问题,主要有如下两种方案:

  • 通过事务机制实现;

  • 通过生产者消息确认机制(publisher confirm)实现.

但是使用事务机制实现会严重降低RabbitMQ的消息吞吐量,我们采用一种轻量级的方案---生产者消息确认机制.

2. 什么是消息发送确认机制?

简而言之,就是 生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地.

如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出.

Mandatory参数:当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息.

六. 消费者的消息接收确认机制

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者的消息确认机制(message acknowledgement).采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题.因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息.

七. 死信队列

1. 死信队列与私信概述

DLX,Dead Letter Exchange 的缩写,又被称为死信邮箱、死信交换机.DLX就是一个普通的交换机,和一般的交换机没有任何区别.

当一个消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,RabbitMQ会自动发送).

2. 什么样的消息会变成死信?

  • 1️⃣.消息被拒绝(basic.reject或basic.nack)并且requeue=false;

  • 2️⃣.消息TTL过期;

  • 3️⃣.队列达到最大长度(队列满了,无法再添加数据到mq中).

3. 死信交换机应用场景

在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因.

4. 如何使用死信交换机?

定义业务(普通)队列的时候指定参数:

  • x-dead-letter-exchange: 用来设置死信后发送的交换机
  • x-dead-letter-routing-key: 用来设置死信的routingKey
@Bean
public Queue helloQueue() {//将普通队列绑定到私信交换机上Map<String, Object> args = new HashMap<>(2);args.put(DEAD_LETTER_EXCHANGE_KEY, deadExchangeName);args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);Queue queue = new Queue(queueName, true, false, false, args);return queue;
}

八. 消息确认的实现

创建一个SpringBoot项目,在该项目中创建一个新的模块,项目结构如图:

注意:本案例是把消息发送和消息接收的实现写在了同一个项目模块中了,没有用两个模块实现.

2. 添加相关依赖

在模块的pom.xml中添加如下依赖包.

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

3. 开启生产者消息发送确认机制

# 开启发送确认
spring.rabbitmq.publisher-confirms=true# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

4. 开启消费者消息接收确认机制

# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

5. 完整配置文件

spring:application:name: rabbitmq-reliabilityrabbitmq:host: localhostport: 5672username: guestpassword: guest# 开启发送确认publisher-confirms: true# 开启发送失败退回publisher-returns: true# 开启ACKlistener:simple:acknowledge-mode: manual

6. 创建RabbitMQ配置类

@Configuration
public class RabbitConfig {
​public final static String queueName = "hello_queue";
​/*** 死信队列:*/public final static String deadQueueName = "dead_queue";public final static String deadRoutingKey = "dead_routing_key";public final static String deadExchangeName = "dead_exchange";
​/*** 死信队列 交换机标识符*/public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";/*** 死信队列交换机绑定键标识符*/public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";​@Beanpublic Queue helloQueue() {//将普通队列绑定到私信交换机上Map<String, Object> args = new HashMap<>(2);args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);Queue queue = new Queue(queueName, true, false, false, args);return queue;}​/*** 死信队列:*/@Beanpublic Queue deadQueue() {Queue queue = new Queue(deadQueueName, true);return queue;}​@Beanpublic DirectExchange deadExchange() {return new DirectExchange(deadExchangeName);}​@Beanpublic Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);}
​
}

注释: hello_queue就配置了死信交换机、死信队列.

7. 生产者发送消息的核心代码

@Component
public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
​@Autowiredprivate RabbitTemplate rabbitTemplate;​public void send(String exchange, String routingKey) {String context = "你好现在是 " + new Date();System.out.println("send content = " + context);this.rabbitTemplate.setMandatory(true);this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);this.rabbitTemplate.convertAndSend(exchange, routingKey, context);}​/*** 确认生产者是否把消息成功的发送到了交换机;* 确认后回调:* @param correlationData* @param ack* @param cause*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {System.out.println("send ack fail, cause = " + cause);} else {System.out.println("send ack success");}}​/*** 交换机中的消息是否被成功的发送到队列.* 失败后return回调:** @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);}
}

8. 消费者接收消息的核心代码

@Component
@RabbitListener(queues = RabbitConfig.queueName)
public class HelloReceiver {
​@RabbitHandlerpublic void process(String hello, Channel channel, Message message) throws IOException {try {Thread.sleep(2000);System.out.println("睡眠2s");} catch (InterruptedException e) {e.printStackTrace();}try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("receiver success = " + hello);} catch (Exception e) {e.printStackTrace();//丢弃这条消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);System.out.println("receiver fail");}}
}

9. 测试生产者消息确认功能

分为4种场景来测试.创建一个Controller,创建几个接口方法进行测试.

9.1 exchange,queue都正确, confirm被回调,ack=true

@RequestMapping("/send1")
@ResponseBody
public String send1() {helloSender.send(null, RabbitConfig.queueName);return "success";
}

9.2 exchange 错误,queue 正确,confirm被回调,ack=false

@RequestMapping("/send2")
@ResponseBody
public String send2() {helloSender.send("fail-exchange", RabbitConfig.queueName);return "success";
}

9.3 exchange 正确,queue错误,confirm被回调,ack=true; return被回调 replyText:NO_ROUTE

@RequestMapping("/send3")
@ResponseBody
public String send3() {helloSender.send(null, "fail-queue");return "success";
}

9.4 exchange 错误,queue 错误, confirm被回调,ack=false

@RequestMapping("/send4")
@ResponseBody
public String send4() {helloSender.send("fail-exchange", "fail-queue");return "success";
}

10. 测试消费者消息确认功能

10.1 当添加这行代码的时候:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

测试结果: 消息被正常消费,消息从队列中删除.

10.2 当注释掉这行代码的时候:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

测试结果: 消息会被重复消费,一直保留在队列当中.

11. 测试死信队列

当执行这行代码的时候:

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

消息会被加入到死信队列中:

九. 拓展

除了我们上面讲的基本可靠性保证外,其实还有很多性能优化方案、可靠性保证方案:集群监控、流控、镜像队列、HAProxy+Keeplived高可靠负载均衡.

 

这篇关于Day10_08_消息队列之RabbitMQ消息可靠性传输 消息确认机制 死信队列详解及代码实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/774272

相关文章

一文详解如何在idea中快速搭建一个Spring Boot项目

《一文详解如何在idea中快速搭建一个SpringBoot项目》IntelliJIDEA作为Java开发者的‌首选IDE‌,深度集成SpringBoot支持,可一键生成项目骨架、智能配置依赖,这篇文... 目录前言1、创建项目名称2、勾选需要的依赖3、在setting中检查maven4、编写数据源5、开启热

C++中零拷贝的多种实现方式

《C++中零拷贝的多种实现方式》本文主要介绍了C++中零拷贝的实现示例,旨在在减少数据在内存中的不必要复制,从而提高程序性能、降低内存使用并减少CPU消耗,零拷贝技术通过多种方式实现,下面就来了解一下... 目录一、C++中零拷贝技术的核心概念二、std::string_view 简介三、std::stri

Python常用命令提示符使用方法详解

《Python常用命令提示符使用方法详解》在学习python的过程中,我们需要用到命令提示符(CMD)进行环境的配置,:本文主要介绍Python常用命令提示符使用方法的相关资料,文中通过代码介绍的... 目录一、python环境基础命令【Windows】1、检查Python是否安装2、 查看Python的安

C++高效内存池实现减少动态分配开销的解决方案

《C++高效内存池实现减少动态分配开销的解决方案》C++动态内存分配存在系统调用开销、碎片化和锁竞争等性能问题,内存池通过预分配、分块管理和缓存复用解决这些问题,下面就来了解一下... 目录一、C++内存分配的性能挑战二、内存池技术的核心原理三、主流内存池实现:TCMalloc与Jemalloc1. TCM

OpenCV实现实时颜色检测的示例

《OpenCV实现实时颜色检测的示例》本文主要介绍了OpenCV实现实时颜色检测的示例,通过HSV色彩空间转换和色调范围判断实现红黄绿蓝颜色检测,包含视频捕捉、区域标记、颜色分析等功能,具有一定的参考... 目录一、引言二、系统概述三、代码解析1. 导入库2. 颜色识别函数3. 主程序循环四、HSV色彩空间

HTML5 搜索框Search Box详解

《HTML5搜索框SearchBox详解》HTML5的搜索框是一个强大的工具,能够有效提升用户体验,通过结合自动补全功能和适当的样式,可以创建出既美观又实用的搜索界面,这篇文章给大家介绍HTML5... html5 搜索框(Search Box)详解搜索框是一个用于输入查询内容的控件,通常用于网站或应用程

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

Python实现精准提取 PDF中的文本,表格与图片

《Python实现精准提取PDF中的文本,表格与图片》在实际的系统开发中,处理PDF文件不仅限于读取整页文本,还有提取文档中的表格数据,图片或特定区域的内容,下面我们来看看如何使用Python实... 目录安装 python 库提取 PDF 文本内容:获取整页文本与指定区域内容获取页面上的所有文本内容获取

基于Python实现一个Windows Tree命令工具

《基于Python实现一个WindowsTree命令工具》今天想要在Windows平台的CMD命令终端窗口中使用像Linux下的tree命令,打印一下目录结构层级树,然而还真有tree命令,但是发现... 目录引言实现代码使用说明可用选项示例用法功能特点添加到环境变量方法一:创建批处理文件并添加到PATH1

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.