RabbitMQ工作模式中的RPC通信模式详解

2025-06-03 16:50

本文主要是介绍RabbitMQ工作模式中的RPC通信模式详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《RabbitMQ工作模式中的RPC通信模式详解》在RabbitMQ中,RPC模式通过消息队列实现远程调用功能,这篇文章给大家介绍RabbitMQ工作模式之RPC通信模式,感兴趣的朋友一起看看吧...

RPC通信模式

概述

在RabbitMQ中,RPC模式通过消息队列实现远程调用功能。客户端(生产者)发送消息到消费队列,服务端(消费者)进行消息消费并执行相应的程序,然后将结果发送到回调队列供客户端使用。这是一种双向的生产消费模式,其中客户端既是生产者又是消费者,服务端则专注于处理消息并生成响应。

RabbitMQ工作模式中的RPC通信模式详解

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程.

工作流程

RabbitMQ工作模式中的RPC通信模式详解

1.客户端发送请求:

客户端连接到RabbitMQ服务器
客户端声明一个用于发送RPC请求的队列(通常是固定的,如rpc_queue)。
客户端创建一个临时的回调队列,并在发送请求时,将回调队列的名称作为消息属性(reply_to)发送给交换机
客户端为每个请求生成一个唯一的correlation_id,并将其作为消息属性发送,以便在接收响应时能够匹配请求与响应。

2.交换机路由请求:

交换机接收到RPC请求后,根据路由键将请求路由到服务端监听的队列。

3.服务端处理请求:

服务端(消费者)从队列中接收请求。
服务端处理请求,并生成响应。
服务端将响应发送到客户端指定的回调队列,并在消息属性中设置相同的correlation_id。

4.客户端接收响应:

客户端监听其回调队列以接收响应。
当接收到响应时,客户端检查correlation_id以确定响应是否与之前的请求匹配。
如果匹配,客户端处理响应;如果不匹配,客户端可能丢弃该响应。

特点

1.解耦:客户端和服务端之间不需要直接通信,降低了系统间的耦合度。
2.灵活性:支持多种语言和平台之间的远程调用。
3.可扩展性:通过增加服务端(消费者)的数量,可以轻松扩展RPC服务。
4.性能开销:由于涉及到网络传输和消息队列的处理,RPC调用的性能通常低于本地调用。
5.复杂性:需要处理消息队列的可靠性、持久性、消息确认等复杂问题。
6.安全性:远程调用可能面临更多的安全风险,如消息篡改、中间人攻击等。

应用场景

RabbitMQ的RPC通信模式适用于需要远程调用服务的场景,如分布式系统中的服务调用、微服务架构中的服务通信等。通过RabbitMQ的消息队列机制,可以实现跨系统、跨语言的远程调用,提高系统的灵活性和可扩展性。

代码案例

引入依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-cliChina编程ent</artifactId>
    <version>5.21.0</version>
</dependency>

常量类

public class Constants {
    public static final String HOST = "47.98.109.138";
    public static final int PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "aaa";
    //rpc 模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}

编写客户端代码

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import Java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayblockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
 * rpc 客户端
 * 1. 发送请求
 * 2. 接收响应
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPasswojavascriptrd(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
      js  final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);
    }
}

编写服务端代码

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * RPC server
 * 1. 接收请求
 * 2. 发送响应
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到请求:"+ request);
                String response = "针对request:"+ request +编程", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Constants.RPC_REphpQUEST_QUEUE, false, consumer);
    }
}

运行程序(先运行客户端,再运行服务端)

可以在管理界面看到其中一个队列中有1条消息

RabbitMQ工作模式中的RPC通信模式详解

RabbitMQ工作模式中的RPC通信模式详解

RabbitMQ工作模式中的RPC通信模式详解

我们可以看到,服务端接收到了消息并给客户端发送了响应,与预期符合。

到此这篇关于RabbitMQ工作模式之RPC通信模式的文章就介绍到这了,更多相关RabbitMQ RPC通信模式内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!

这篇关于RabbitMQ工作模式中的RPC通信模式详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1154889

相关文章

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

MySQL8 密码强度评估与配置详解

《MySQL8密码强度评估与配置详解》MySQL8默认启用密码强度插件,实施MEDIUM策略(长度8、含数字/字母/特殊字符),支持动态调整与配置文件设置,推荐使用STRONG策略并定期更新密码以提... 目录一、mysql 8 密码强度评估机制1.核心插件:validate_password2.密码策略级

从入门到精通详解Python虚拟环境完全指南

《从入门到精通详解Python虚拟环境完全指南》Python虚拟环境是一个独立的Python运行环境,它允许你为不同的项目创建隔离的Python环境,下面小编就来和大家详细介绍一下吧... 目录什么是python虚拟环境一、使用venv创建和管理虚拟环境1.1 创建虚拟环境1.2 激活虚拟环境1.3 验证虚

详解python pycharm与cmd中制表符不一样

《详解pythonpycharm与cmd中制表符不一样》本文主要介绍了pythonpycharm与cmd中制表符不一样,这个问题通常是因为PyCharm和命令行(CMD)使用的制表符(tab)的宽... 这个问题通常是因为PyCharm和命令行(CMD)使用的制表符(tab)的宽度不同导致的。在PyChar

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队