RabbitMQ文档翻译七(JAVA).发送者确认

2024-08-26 09:08

本文主要是介绍RabbitMQ文档翻译七(JAVA).发送者确认,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

发布者确认(Publisher confirms)是一个RabbitMQ扩展,用于实现可靠的消息发布。当在通道上启用发布者确认时,客户端发布的消息将由代理异步确认,这意味着它们已在服务器端得到处理。

概览

在本教程中,我们将使用publisher confirms来确保发布的消息已安全到达代理。我们将介绍几种使用publisher confirms的策略,并解释它们的优缺点。

在通道上启用发布者确认

发布者确认是AMQP 0.9.1协议的RabbitMQ扩展,因此默认情况下不会启用它们。使用confirmSelect方法在通道级别启用发布者确认:

Channel channel = connection.createChannel();
channel.confirmSelect();

必须在您希望使用publisher confirms的每个通道上调用此方法。确认应该只启用一次,而不是对发布的每个消息启用一次。

策略1:单独发布消息

让我们从使用confirms发布消息的最简单方法开始,即发布消息并同步等待消息的确认:

while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);// uses a 5 second timeoutchannel.waitForConfirmsOrDie(5_000);
}

在前面的示例中,我们像往常一样发布消息,并使用Channel#waitForConfirmsOrDie(long)方法等待消息的确认。消息确认后,该方法立即返回。如果消息在超时时间内没有得到确认,或者消息是nack-ed(这意味着代理由于某种原因无法处理它),那么该方法将抛出一个异常。异常的处理通常包括记录错误消息和/或重试发送消息。

不同的客户端库有不同的方法来同步处理发布者确认,所以一定要仔细阅读所使用的客户端的文档。

这种技术非常简单,但也有一个主要的缺点:它大大减慢了发布速度,因为消息的确认会阻止所有后续消息的发布。这种方法不会提供每秒超过几百条已发布消息的吞吐量。然而,对于某些应用来说,这已经足够好了。

发布者确认是异步的吗?
我们在开始时提到代理异步确认已发布的消息,但在第一个示例中,代码将同步等待,直到消息被确认。客户端实际上异步接收确认,并相应地解除对waitForConfirmsOrDie的调用的阻塞。可以把waitForConfirmsOrDie看作一个同步助手,它在幕后依赖异步通知。

策略2:批量发布消息

为了改进前面的示例,我们可以发布一批消息并等待整个批被确认。以下示例使用一批100:

int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {byte[] body = ...;BasicProperties properties = ...;channel.basicPublish(exchange, queue, properties, body);outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}
}
if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);
}

与等待单个消息的确认相比,等待一批消息被确认大大提高了吞吐量(对于远程RabbitMQ节点,可以达到20-30倍)。一个缺点是,我们不知道在失败的情况下到底出了什么问题,所以我们可能必须在内存中保留一整批数据,以便记录某些有意义的内容或重新发布消息。而且这个解决方案仍然是同步的,所以它阻止了消息的发布。

策略3:异步处理发布者确认

代理以异步方式确认已发布的消息,只需在客户端上注册一个回调即可收到这些确认的通知:

Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {// code when message is confirmed
}, (sequenceNumber, multiple) -> {// code when message is nack-ed
});

有两个回调:一个用于确认消息,一个用于nack-ed消息(代理可以认为丢失的消息)。每个回调有2个参数:

  • sequenceNumber:标识已确认或不正确的消息的编号。我们将很快看到如何将它与发布的消息关联起来。
  • multiple:这是一个布尔值。如果为false,则只有一条消息被confirmed/nack-ed;如果为true,则序列号较低或相等的所有消息都被确认/nack-ed。

在发布之前,可以使用Channel#getNextPublishSeqNo()获取序列号:

int sequenceNumber = channel.getNextPublishSeqNo());
ch.basicPublish(exchange, queue, properties, body);

将消息与序列号关联的一种简单方法是使用映射。假设我们想要发布字符串,因为它们很容易变成一个字节数组进行发布。下面是一个代码示例,它使用映射将发布序列号与消息的字符串正文相关联:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// ... code for confirm callbacks will come later
String body = "...";
outstandingConfirms.put(channel.getNextPublishSeqNo(), body);
channel.basicPublish(exchange, queue, properties, body.getBytes());

发布代码现在使用映射跟踪出站消息。当确认到达时,我们需要清理此map,并在消息没有被确认时记录警告:

ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {if (multiple) {ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);confirmed.clear();} else {outstandingConfirms.remove(sequenceNumber);}
};channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {String body = outstandingConfirms.get(sequenceNumber);System.err.format("Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",body, sequenceNumber, multiple);cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code

上一个示例包含一个回调函数,当确认到达时,该回调函数将清理映射。这个回调可以处理一个或多个确认。此回调用于确认到达时(作为Channel#addConfirmListener的第一个参数)。未确认消息的回调将检索消息正文并发出警告。然后,它重新使用前面的回调来清除未完成确认的映射(无论消息是确认的还是未确认的,都必须删除映射中相应的条目)

如何跟踪未完成的确认?
我们的样本使用ConcurrentNavigableMap跟踪未完成的确认。这种数据结构很方便,有几个原因。它允许轻松地将序列号与消息(无论消息数据是什么)相关联,并容易地将条目清理到给定的序列id(以处理多个确认/未确认)。最后,它支持并发访问,因为confirm回调是在客户端库拥有的线程中调用的,它应该与发布线程保持不同。
与复杂的映射实现相比,还有其他跟踪未完成确认的方法,例如使用简单的并发哈希映射和变量来跟踪发布序列的下限,但它们通常涉及的内容更多,不属于教程。

综上所述,异步处理发布者确认通常需要以下步骤:

  • 提供一种将发布序列号与消息关联的方法。
  • 在通道上注册一个确认侦听器,以便在发布者确认/未确认到达时得到通知,以执行适当的操作,如记录或重新发布未确认消息。在这一步中,序列号与消息的关联关系可能还需要进行一些清理。
  • 在发布消息之前跟踪发布序列号。

重新发布未确认消息?
从关联的回调中重新发布未确认消息是很有诱惑力的,但这应该避免,因为confirm回调是在I/O线程中调度的,其中通道不应该执行操作。一个更好的解决方案是将消息排队到由发布线程轮询的内存队列中。像ConcurrentLinkedQueue这样的类是在confirm回调和发布线程之间传输消息的一个很好的候选类。

总结

在某些应用程序中,确保已发布的消息已发送给代理程序是至关重要的。发布者确认是一个RabbitMQ特性,有助于满足这一要求。发布者确认本质上是异步的,但也可以同步处理它们。没有确定的方法来实现publisher-confirms,这通常归结为应用程序和整个系统中的约束。典型的技术有:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,为一个批同步等待确认:简单、合理的吞吐量,但很难判断出什么时候出了问题。
  • 异步处理:最佳的性能和资源利用率,在发生错误时能很好地控制,但正确地调用实现有难度(原文:best performance and use of resources, good control in case of error, but can be involved to implement correctly.)。

把它们放在一起

这个PublisherConfirms.java类包含我们讨论的技术代码。我们可以编译它,按原样执行,然后看看它们各自的执行情况:

javac -cp $CP PublisherConfirms.java
java -cp $CP PublisherConfirms

输出如下所示:

Published 50,000 messages individually in 5,549 ms
Published 50,000 messages in batch in 2,331 ms
Published 50,000 messages and handled confirms asynchronously in 4,054 ms

如果客户端和服务器位于同一台计算机上,则计算机上的输出应该类似。单独发布消息的性能不如预期,但是异步处理的结果与批量发布相比有点令人失望。

发布确认它非常依赖于网络,因此我们最好尝试使用远程节点,这更现实,因为客户机和服务器通常不在同一台机器上生产。PublisherConfirms.java可以轻松更改为使用非本地节点:

static Connection createConnection() throws Exception {ConnectionFactory cf = new ConnectionFactory();cf.setHost("remote-host");cf.setUsername("remote-user");cf.setPassword("remote-password");return cf.newConnection();
}

重新编译类,再次执行,然后等待结果:

Published 50,000 messages individually in 231,541 ms
Published 50,000 messages in batch in 7,232 ms
Published 50,000 messages and handled confirms asynchronously in 6,332 ms

我们看到现在单条发送的表现非常糟糕。但是在客户机和服务器之间的网络中,批量发布和异步处理现在表现得类似,但是异步处理稍有一点小优势。

请记住,批量发布的实现很简单,但是在发布者确认失败的情况下,不容易知道哪些消息发送给代理失败。实现异步处理发布者确认需要耗费更多的时间,但在发布消息为未确认时能对要执行的操作提供更细的粒度和的更好的控制。

这篇关于RabbitMQ文档翻译七(JAVA).发送者确认的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1108141

相关文章

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

java中BigDecimal里面的subtract函数介绍及实现方法

《java中BigDecimal里面的subtract函数介绍及实现方法》在Java中实现减法操作需要根据数据类型选择不同方法,主要分为数值型减法和字符串减法两种场景,本文给大家介绍java中BigD... 目录Java中BigDecimal里面的subtract函数的意思?一、数值型减法(高精度计算)1.

Java空指针异常NullPointerException的原因与解决方案

《Java空指针异常NullPointerException的原因与解决方案》在Java开发中,NullPointerException(空指针异常)是最常见的运行时异常之一,通常发生在程序尝试访问或... 目录一、空指针异常产生的原因1. 变量未初始化2. 对象引用被显式置为null3. 方法返回null

一文彻底搞懂Java 中的 SPI 是什么

《一文彻底搞懂Java中的SPI是什么》:本文主要介绍Java中的SPI是什么,本篇文章将通过经典题目、实战解析和面试官视角,帮助你从容应对“SPI”相关问题,赢得技术面试的加分项,需要的朋... 目录一、面试主题概述二、高频面试题汇总三、重点题目详解✅ 面试题1:Java 的 SPI 是什么?如何实现一个

Spring中管理bean对象的方式(专业级说明)

《Spring中管理bean对象的方式(专业级说明)》在Spring框架中,Bean的管理是核心功能,主要通过IoC(控制反转)容器实现,下面给大家介绍Spring中管理bean对象的方式,感兴趣的朋... 目录1.Bean的声明与注册1.1 基于XML配置1.2 基于注解(主流方式)1.3 基于Java

RabbitMQ工作模式中的RPC通信模式详解

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧... 目录RPC通信模式概述工作流程代码案例引入依赖常量类编写客户端代码编写服务端代码RPC通信模式概述在R

SpringCloud中的@FeignClient注解使用详解

《SpringCloud中的@FeignClient注解使用详解》在SpringCloud中使用Feign进行服务间的调用时,通常会使用@FeignClient注解来标记Feign客户端接口,这篇文章... 在Spring Cloud中使用Feign进行服务间的调用时,通常会使用@FeignClient注解

Java Spring 中的监听器Listener详解与实战教程

《JavaSpring中的监听器Listener详解与实战教程》Spring提供了多种监听器机制,可以用于监听应用生命周期、会话生命周期和请求处理过程中的事件,:本文主要介绍JavaSprin... 目录一、监听器的作用1.1 应用生命周期管理1.2 会话管理1.3 请求处理监控二、创建监听器2.1 Ser

JVisualVM之Java性能监控与调优利器详解

《JVisualVM之Java性能监控与调优利器详解》本文将详细介绍JVisualVM的使用方法,并结合实际案例展示如何利用它进行性能调优,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录1. JVisualVM简介2. JVisualVM的安装与启动2.1 启动JVisualVM2