RabbitMQ 教程译文(二) + 学习

2024-05-01 00:58
文章标签 学习 教程 rabbitmq 译文

本文主要是介绍RabbitMQ 教程译文(二) + 学习,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址
以下图片,除了特殊声明的,其他均来自官网教程

工作队列
在这里插入图片描述
在第一篇教程中,我们完成了从一个队列中发送和接收信息的程序。在本篇教程中,我们会创建一个工作队列Work Queue,我们会通过这个队列向多个节点发送比较耗时的任务。

工作队列的核心思想就是避免立即处理比较耗时的操作,然后阻塞等待处理结果。我们会将任务封装成一个信息存储在队列中,然后延后处理任务。一个工作进程会在后台弹出队列中的任务,并完成该任务。当你运行多个工作进程的时候,任务就会分发给他们。

这个概念特别适合web应用,因为web应用不可能再很短的请求窗口中处理完成复杂的任务。

准备
在之前的教程中,我们发送一条“Hello World”信息。现在我们需要发送一条表示复杂任务的信息。我们没有一个真实环境下的复杂任务,比如调整图片大小、渲染PDF文件,所以我们通过*Thread.sleep()*方法表示任务的复杂程度。我们会在发送信息的字符串中加入若干个“点”,“点”的多少就表示任务的复杂度。每一个“点”就表示一秒钟的工作,比如“Hello…”表示一个需要花费三秒的任务。

我们会稍微修改下之前的“Send.java”代码,以便可以通过命令行发送特定的信息。这个程序将会把我们的任务放到队列中,所以我们命名它为“NewTask.java”。

String message = String.join(" ", argv);channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

我们原来的“Recv.java”也需要修改,以便可以根据信息中的“点”来模拟工作时间。这个程序会处理信息,所以我们叫它“Worker.java”。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

我们的模拟工作代码

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}

循环调度
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
任务队列的一个优势是很容易完成并行工作。如果我们任务堆积的越来越多,那我们只需要增加工作节点就可以了,规模很好控制。

首先,我们一次运行两个工作节点,它们都会从队列中接收到信息,但是实际情况呢?

打开两个控制台,分别运行两个工作节点C1 C2,在第三个控制台运行NewTask,发送信息,观察C1 C2的控制台。我们发现信息是依次交替的发送给两个工作节点。

默认情况下,RabbiMQ会依次交替的发送给所有的消费者,平均情况下,每个消费者接收到的信息数量是一样的。这种发送信息的方式叫做 循环发送round-robin

信息应答
处理一个任务会花费一定的时间。你可能想知道如果一个消费者在处理一个比较耗时的任务时死掉的话要怎么办?在我们现有代码的情况下,一旦RabbitMQ发送信息给消费者,它就会马上标记这条信息然后删除信息。在这种情况下,如果你kill一个正在处理消息的消费者,那么它正在处理的信息也会丢失。该消费者接收到的,还没有处理的信息也会丢失。

但是我们不希望丢失任何任务,我们希望当一个节点死掉的时候可以把任务发送给其他节点。

为了保证信息不回丢失,RabbitMQ提供了信息应答机制,一个ack(信息应答)就是消费者发送给RabbitMQ,通知其某个信息已经被接收或者处理,然后RabbitMQ就可以放心的删除信息了。

如果一个消费者没有个ack就挂掉了,那么RabbitMQ会认为某个信息是没有完全被处理的,所以会将该信息重新入队列。如果当前正好有其他消费者,那么就会立刻发送该条信息给消费者。这样就可以保证当消费者偶尔挂掉也不会丢失信息。

不会有任何信息超时;RabbitMQ会重新分发信息当消费者挂掉。如果处理信息需要花费很长时间也是没有关系的。

手动信息应答Manual message acknowledgments默认是开启的,前面的例子我们通过设置autoAck = true关闭了手动信息应答,现在我们设置这个标识为false,然后在我们完成任务的时候发送一个合适的应答信息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

使用这段代码就可以保证当你杀掉消费者时也不会丢失掉任何信息,在消费者挂掉后,没有收到应答的信息会被重新分发。

应答信息发送使用的通道必须和接收信息使用的通道一致,否则会报通道协议相关的异常,详情见doc guide on confirmations

忘记应答
忘记发送应答信息是一种很容易发生的错误,但是后果却是严重的,虽然没有应答的信息将会重新发送,但是RabbitMQ会使用越来越多的内存来存储没有应答的信息。
为了定位这种问题,我们可以使用rabbitmqctl命令来打印未应答信息域messages_unacknowledged

信息持久化
我们已经知道,当消费者挂掉怎么保证信息不丢失。但是当RabbitMQ挂掉还是会丢失信息。

当RabbitMQ挂掉,它会丢失队列和信息,除非我们不让它这么做。有两个条件可以保证信息不丢失,它们就是队列和信息,只有保证它们持久化才可以保证信息不丢失。

首先,我们需要保证RabbitMQ不会丢失队列,为了保证队列不丢失,我们需要声明队列为持久的。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然上述代码是正确的,但是它在我们的代码下是无效的,这是因为我们已经声明了一个不持久的“hello”的队列,RabbitMQ不允许对已经存在的队列进行不同参数的重新定义,任何这样的操作RabbitMQ会返回错误信息。但是这里有一个快速的解决方案,重新声明一个不同名称的队列。

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

生产者和消费者中的队列声明都需要做相应修改。

至此,我们可以保证当RabbitMQ重启时不会丢失队列。现在我们需要保证我们的信息也是持久化的,只需要设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

关于信息持久化
向上述中的信息持久化操作并不保证信息一定不会丢失,虽然上面的设置已经告诉RabbitMQ要存储信息到硬盘,但是还存在很短的时间,RabbitMQ收到信息但是还没有存储到硬盘,所以上述的持久化不是一定的,但是它已经足够我们的简单任务队列了,如果你希望一个更有保证的持久化设置,那么你可以使用 publisher confirms.

公平分发
你可能注意到了现在的分发策略还没有向我们希望的那样工作。比如,某个场景下有两个消费者,当所有的奇数编号的信息都很复杂,偶数编号的信息都很简单,那么一个消费者就会非常忙,而另一个则非常轻松。然而RabbitMQ并不知道这些,它还会继续按照当前策略继续发送信息。

这种情况的发生是因为当信息进入队列时RabbitMQ就会分发信息。它并不会检查消费者还没有应答的信息数量。RabbitMQ只是盲目的将第n条信息发送给第n个消费者。
在这里插入图片描述
为了打破这种情况,我们使用设置了prefetchCount = 1basicQos方法。这样就会通知RabbitMQ不要在同一时间给同一个消费者多条信息,也就是说,不要把信息发送给还没有处理完信息的消费者。这条信息会发送给空闲消费者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列长度
当所有消费者都处于忙碌状态,那么你的队列可能会被充满,所以你可能需要监控队列情况,可能增加消费者或者指定其他策略。

合体!!!
下面是“NewTask.java”和“Worker.java”的完整代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = String.join(" ", argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

打完收工 ~~~

这篇关于RabbitMQ 教程译文(二) + 学习的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/950351

相关文章

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen

如何为Yarn配置国内源的详细教程

《如何为Yarn配置国内源的详细教程》在使用Yarn进行项目开发时,由于网络原因,直接使用官方源可能会导致下载速度慢或连接失败,配置国内源可以显著提高包的下载速度和稳定性,本文将详细介绍如何为Yarn... 目录一、查询当前使用的镜像源二、设置国内源1. 设置为淘宝镜像源2. 设置为其他国内源三、还原为官方

Maven的使用和配置国内源的保姆级教程

《Maven的使用和配置国内源的保姆级教程》Maven是⼀个项目管理工具,基于POM(ProjectObjectModel,项目对象模型)的概念,Maven可以通过一小段描述信息来管理项目的构建,报告... 目录1. 什么是Maven?2.创建⼀个Maven项目3.Maven 核心功能4.使用Maven H

IDEA自动生成注释模板的配置教程

《IDEA自动生成注释模板的配置教程》本文介绍了如何在IntelliJIDEA中配置类和方法的注释模板,包括自动生成项目名称、包名、日期和时间等内容,以及如何定制参数和返回值的注释格式,需要的朋友可以... 目录项目场景配置方法类注释模板定义类开头的注释步骤类注释效果方法注释模板定义方法开头的注释步骤方法注

Python虚拟环境终极(含PyCharm的使用教程)

《Python虚拟环境终极(含PyCharm的使用教程)》:本文主要介绍Python虚拟环境终极(含PyCharm的使用教程),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录一、为什么需要虚拟环境?二、虚拟环境创建方式对比三、命令行创建虚拟环境(venv)3.1 基础命令3

使用Node.js制作图片上传服务的详细教程

《使用Node.js制作图片上传服务的详细教程》在现代Web应用开发中,图片上传是一项常见且重要的功能,借助Node.js强大的生态系统,我们可以轻松搭建高效的图片上传服务,本文将深入探讨如何使用No... 目录准备工作搭建 Express 服务器配置 multer 进行图片上传处理图片上传请求完整代码示例

python连接本地SQL server详细图文教程

《python连接本地SQLserver详细图文教程》在数据分析领域,经常需要从数据库中获取数据进行分析和处理,下面:本文主要介绍python连接本地SQLserver的相关资料,文中通过代码... 目录一.设置本地账号1.新建用户2.开启双重验证3,开启TCP/IP本地服务二js.python连接实例1.

Python 安装和配置flask, flask_cors的图文教程

《Python安装和配置flask,flask_cors的图文教程》:本文主要介绍Python安装和配置flask,flask_cors的图文教程,本文通过图文并茂的形式给大家介绍的非常详细,... 目录一.python安装:二,配置环境变量,三:检查Python安装和环境变量,四:安装flask和flas

Spring Security基于数据库的ABAC属性权限模型实战开发教程

《SpringSecurity基于数据库的ABAC属性权限模型实战开发教程》:本文主要介绍SpringSecurity基于数据库的ABAC属性权限模型实战开发教程,本文给大家介绍的非常详细,对大... 目录1. 前言2. 权限决策依据RBACABAC综合对比3. 数据库表结构说明4. 实战开始5. MyBA