RabbitMQ学习笔记:实现简单的RPC功能

2024-08-27 15:48

本文主要是介绍RabbitMQ学习笔记:实现简单的RPC功能,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

环境

window10
虚拟机、secureCRT
Intellij IDEA

RPC

Remote Procedure Call。

远程过程调用:调用的程序或者函数,并不在本地,而是在远程计算机中。这个时候不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

要利用RabbitMQ来实现RPC功能是很简单的;客户端发送消息,服务端回复响应的消息。为了接收响应的消息,我们需要在请求中发送一个回调队列。可以使用默认的队列。

String message = "rabbitmq rpc";
// 匿名回调队列
String callbackQueueName = channel.queueDeclare().getQueue();
// 设置回调队列
AMQP.BasicProperties props = 
new AMQP.BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());

流程图:
在这里插入图片描述

步骤

① 客户端创建一个匿名队列作为回调队列。
② 客户端为RPC请求设置2个属性:replayTo用来告知RPC服务端回复请求时的目的队列(也就是上面的匿名队列)即回调队列;。correlationId用来标记请求的唯一性。
③ 请求发送到rpc_queue队列中。
④ RPC服务端监听rpc_queue队列中的请求,当请求来到时,服务端会处理并且把带有结果的消息发送给客户端。接收的队列就是replyTo设定的回调队列。
⑤ 客户端监听回调队列,当有消息时,检查correlationId属性,如果与请求匹配,那就是结果了。

这里贴出官网的代码:

server端:

import com.rabbitmq.client.*;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";private static int fib(int n) {if (n == 0) return 0;if (n == 1) return 1;return fib(n - 1) + fib(n - 2);}public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);// 清空队列里的内容channel.queuePurge(RPC_QUEUE_NAME);// 设置客户端最多接收未被确认ack的消息的个数channel.basicQos(1);System.out.println(" [x] Awaiting RPC requests");Object monitor = new Object();DeliverCallback deliverCallback = (consumerTag, delivery) -> {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {String message = new String(delivery.getBody(), "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");response += fib(n);} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}
}

client端:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient implements AutoCloseable {private Connection connection;private Channel channel;private String requestQueueName = "rpc_queue";public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();}public static void main(String[] argv) {try (RPCClient fibonacciRpc = new RPCClient()) {for (int i = 0; i < 32; i++) {String i_str = Integer.toString(i);System.out.println(" [x] Requesting fib(" + i_str + ")");String response = fibonacciRpc.call(i_str);System.out.println(" [.] Got '" + response + "'");}} catch (IOException | TimeoutException | InterruptedException e) {e.printStackTrace();}}public String call(String message) throws IOException, InterruptedException {final String corrId = UUID.randomUUID().toString();String replyQueueName = channel.queueDeclare().getQueue();AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();// 空字符就是使用默认交换器 direct类型的。// 这个默认交换器有个特性:// 每个队列都会自动和这个默认交换器进行绑定,绑定的路由键就是队列名// 这个是rabbitmq自动完成的。// 这也就可以解释:我们没有将回调队列和默认交换器进行绑定,// 但是下面这段代码却可以将消息发送出去。channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);// 监听匿名队列,消费消息String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {if (delivery.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(delivery.getBody(), "UTF-8"));}}, consumerTag -> {});String result = response.take();// 取消订阅channel.basicCancel(ctag);return result;}public void close() throws IOException {connection.close();}
}

注意的地方:

① 客户端发送消息使用的是默认交换器;默认交换器有一个特性,所有的队列都会对它进行绑定,绑定的路由键就是队列名。
② 服务端消费消息后,需要返回结果的话,需要将消息发送到回调队列,并且也要将correlationId带回去。

总结

① 默认交换器有个特性:所有的队列都会对它进行绑定,绑定的路由键就是队列名。
② 取消订阅使用channel.basicCancel(ctag);其中ctag是消费者标签;channel.basicConsume()返回的就是消费者标签。
③ 实现RPC关键点:

  • 对replayTo属性指定回调函数
  • 指定请求唯一表示:correlationId;

参考地址:

《RabbitMQ》实战指南

这篇关于RabbitMQ学习笔记:实现简单的RPC功能的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1112100

相关文章

C#借助Spire.XLS for .NET实现在Excel中添加文档属性

《C#借助Spire.XLSfor.NET实现在Excel中添加文档属性》在日常的数据处理和项目管理中,Excel文档扮演着举足轻重的角色,本文将深入探讨如何在C#中借助强大的第三方库Spire.... 目录为什么需要程序化添加Excel文档属性使用Spire.XLS for .NET库实现文档属性管理Sp

Python+FFmpeg实现视频自动化处理的完整指南

《Python+FFmpeg实现视频自动化处理的完整指南》本文总结了一套在Python中使用subprocess.run调用FFmpeg进行视频自动化处理的解决方案,涵盖了跨平台硬件加速、中间素材处理... 目录一、 跨平台硬件加速:统一接口设计1. 核心映射逻辑2. python 实现代码二、 中间素材处

Java数组动态扩容的实现示例

《Java数组动态扩容的实现示例》本文主要介绍了Java数组动态扩容的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1 问题2 方法3 结语1 问题实现动态的给数组添加元素效果,实现对数组扩容,原始数组使用静态分配

Python实现快速扫描目标主机的开放端口和服务

《Python实现快速扫描目标主机的开放端口和服务》这篇文章主要为大家详细介绍了如何使用Python编写一个功能强大的端口扫描器脚本,实现快速扫描目标主机的开放端口和服务,感兴趣的小伙伴可以了解下... 目录功能介绍场景应用1. 网络安全审计2. 系统管理维护3. 网络故障排查4. 合规性检查报错处理1.

Python轻松实现Word到Markdown的转换

《Python轻松实现Word到Markdown的转换》在文档管理、内容发布等场景中,将Word转换为Markdown格式是常见需求,本文将介绍如何使用FreeSpire.DocforPython实现... 目录一、工具简介二、核心转换实现1. 基础单文件转换2. 批量转换Word文件三、工具特性分析优点局

Springboot3统一返回类设计全过程(从问题到实现)

《Springboot3统一返回类设计全过程(从问题到实现)》文章介绍了如何在SpringBoot3中设计一个统一返回类,以实现前后端接口返回格式的一致性,该类包含状态码、描述信息、业务数据和时间戳,... 目录Spring Boot 3 统一返回类设计:从问题到实现一、核心需求:统一返回类要解决什么问题?

Java使用Spire.Doc for Java实现Word自动化插入图片

《Java使用Spire.DocforJava实现Word自动化插入图片》在日常工作中,Word文档是不可或缺的工具,而图片作为信息传达的重要载体,其在文档中的插入与布局显得尤为关键,下面我们就来... 目录1. Spire.Doc for Java库介绍与安装2. 使用特定的环绕方式插入图片3. 在指定位

Java使用Spire.Barcode for Java实现条形码生成与识别

《Java使用Spire.BarcodeforJava实现条形码生成与识别》在现代商业和技术领域,条形码无处不在,本教程将引导您深入了解如何在您的Java项目中利用Spire.Barcodefor... 目录1. Spire.Barcode for Java 简介与环境配置2. 使用 Spire.Barco

Java利用Spire.Doc for Java实现在模板的基础上创建Word文档

《Java利用Spire.DocforJava实现在模板的基础上创建Word文档》在日常开发中,我们经常需要根据特定数据动态生成Word文档,本文将深入探讨如何利用强大的Java库Spire.Do... 目录1. Spire.Doc for Java 库介绍与安装特点与优势Maven 依赖配置2. 通过替换

Android使用java实现网络连通性检查详解

《Android使用java实现网络连通性检查详解》这篇文章主要为大家详细介绍了Android使用java实现网络连通性检查的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录NetCheck.Java(可直接拷贝)使用示例(Activity/Fragment 内)权限要求