2024.2.27 模拟实现 RabbitMQ —— 网络通信设计(客户端)

2024-02-26 19:20

本文主要是介绍2024.2.27 模拟实现 RabbitMQ —— 网络通信设计(客户端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

需求分析

RabbitMQ 客户端设定

ConnectionFactory(连接工厂)

Connection(连接)

Channel(通道)

针对 客户端 和 服务器 单元测试


需求分析

RabbitMQ 客户端设定

  • 一个客户端可以有多个模块
  • 每个模块均可以和 Broker Server 之间建立 "逻辑上的连接"(channel)
  • 这几个模块的 channel 彼此之间是相互不影响的
  • 同时这几个 channel 复用了同一个 TCP 连接
  • 此处我们将仿照 RabbitMQ 客户端设定

ConnectionFactory(连接工厂)

  • 这个类持有服务器的地址
  • 该类用于创建 Connection 对象

具体代码编写:

import lombok.Getter;
import lombok.Setter;import java.io.IOException;@Getter
@Setter
public class ConnectionFactory {
//    broker server 的 ip 地址private String host;
//    broker server 的端口号private int port;public Connection newConnection() throws IOException {Connection connection = new Connection(host,port);return connection;}//    访问 broker server 的哪个虚拟主机
//    下列几个属性暂时先不搞了
//    private String virtualHostName;
//    private String username;
//    private String password;
}

Connection(连接)

  • 这个类表示一个 TCP 连接,持有 Socket 对象
  • 该类用于写入请求/读取响应,管理多个 Channel 对象

具体代码编写:

  • 编写成员变量
    private Socket socket = null;
//    需要管理多个 channel 使用一个 hash 表把若干个 channel 组织起来private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataOutputStream dataOutputStream;private DataInputStream dataInputStream;//用来处理 0xc 的回调,这里开销可能会很大,不希望把 扫描线程 阻塞住,因此使用 线程池 来处理private ExecutorService callbackPool = null;
  • 编写构造方法
  • 此处不仅需要初始化成员变量,还需创建一个扫描线程,不停的从 socket 中读取响应数据,并将读取到的响应交给 dispatchResponse 方法执行
 public Connection(String host, int port) throws IOException {socket = new Socket(host,port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//        创建一个 扫描线程,由这个线程负责不停的从 socket 中取响应数据 把这个响应数据再交给对应的 channel 负责处理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}}catch (SocketException e) {
//                连接正常断开,此时这个异常直接忽略System.out.println("[Connection] 连接正常断开!");}catch (IOException | ClassNotFoundException | MqException e) {System.out.println("[Connection] 连接异常断开!");e.printStackTrace();}});t.start();}
  • 编写 dispatchResponse 方法
  • 使用该方法来区分,当前响应是一个针对控制请求的响应,还是服务器推送过来的消息
  • 如果是服务器推送过来的消息,type = 0xc,也就需要执行回调,通过线程池来执行
  • 如果只是一个普通的响应,就将该结果放到 channel 的 哈希表中
  • 随后 channel 的 putReturns 方法会唤醒所有阻塞等待的线程,让这些线程从 哈希表中拿与自己 rid 相等的返回结果
//    使用这个方法来分别处理,当前的响应是一个针对控制请求的响应,还是服务器推送的消息private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if(response.getType() == 0xc) {
//            服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
//            根据 channelId 找到对应的 channel 对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId());}
//            执行该 channel 对象内部的回调
//            此处我们直接将回调方法交给线程池来执行,而不是用扫描线程来执行callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(),subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});}else {
//            当前响应是针对刚才控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
//            把这个结果放到对应的 channel 的 hash 表中Channel channel = channelMap.get(basicReturns.getChannelId());if(channel == null) {throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId());}channel.putReturns(basicReturns);}}
  • 编写 发送请求 与 读取响应 的方法
//    发送请求public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("[Connection] 发送请求! type = " + request.getType() + ",length = " + request.getLength());}//    读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if(n != payload.length) {throw new IOException("读取的响应数据不完整!");}response.setPayload(payload);System.out.println("[Connection] 收到响应! type = " + response.getType() + ",length = " + response.getLength());return response;}
  • 编写创建 channel 的方法

注意:

  • 我们的代码中使用了多次 UUID 
  • message 的 id,就是用 UUID 当时加了个 M- 前缀
  • 现在 channel 的 id 也是使用 UUID 此时加个 C- 前缀
  • rid 也使用 UUID 来生成,加个前缀  R-
//    通过这个方法,在 Connection 中能够创建出一个 Channelpublic Channel createChannel() throws IOException, InterruptedException {String channelId = "C-" + UUID.randomUUID().toString();Channel channel = new Channel(channelId,this);
//        把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中channelMap.put(channelId,channel);
//        同时也需要把 创建 channel 的这个消息也告诉服务器boolean ok = channel.createChannel();if(!ok) {
//            服务器这里创建失败了!!整个这次创建 channel 操作不顺利!
//            把刚才已经加入 hash 表的键值对,再删了channelMap.remove(channelId);return null;}return channel;}
  • 编写释放 Connection 相关资源的方法
    public void close() {
//        关闭 Connection 释放上述资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (IOException e) {e.printStackTrace();}}

Channel(通道)

  • 这个类表示一个逻辑上的连接

  • 该类用于提供一系列的方法,去和服务器提供的核心 API 相对应

  • 客户端提供的这些方法,其方法内部就是发送了一个特定的请求

具体代码编写:

  • 编写成员变量  与 构造方法
    private String channelId;
//    当前这个 channel 属于哪个连接的private Connection connection;
//    用来存储后续客户端收到的服务器的响应private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
//    如果当前 Channel 订阅了某个队列,就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候,调用回调
//    此处约定一个 Channel 中只能有一个回调private Consumer consumer = null;public Channel(String channelId,Connection connection) {this.channelId = channelId;this.connection = connection;}
  • 实现 type = 0x1,即创建 channel
  • 构造请求发给服务器,随后阻塞等待,唤醒后从 basicReturnsMap 中尝试获取响应结果
  • 其余 type (0xc 除外,因为 0xc 只有响应没有请求)类型的请求与 0x1 大差不差,对着所需参数,构造即可
//    在这个方法中,和服务器进行交互,告知服务器,此处客户端创建了新的 channel 了public Boolean createChannel() throws IOException, InterruptedException {
//        对于创建 Channel 来说,BasicArguments basicArguments = new BasicArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//        构造出完整请求之后,就可以发送这个请求了connection.writeRequest(request);
//        等待服务器的响应BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}private String generateRid() {return "R-" + UUID.randomUUID().toString();}//    期望使用这个方法来阻塞等待服务器的响应private BasicReturns waitResult(String rid) throws InterruptedException {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {
//            如果查询结果为 null,说明响应还没回来
//            此时就需要阻塞等待synchronized (this) {wait();}}
//        读取成功之后,还需要把这个消息从 哈希表中给删除掉basicReturnsMap.remove(rid);System.out.println("[Channel] 获取到服务器响应!rid = " + rid);return basicReturns;}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this) {
//            当前也不知道有多少个线程在等待上述的这个响应
//            把所有的等待的线程都唤醒notifyAll();}}
  • 特别注意 type = 0xa ,即 订阅消息

  • 其次值得注意的是 consumerTag 使用 channelId 来表示
//    订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException, InterruptedException {
//        先设置回调if(this.consumer != null) {throw new MqException("该 channel 已经设置过消费消息的回调了,不能重复设置!");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);
//        此处 ConsumerTag 也使用 channelId 来表示basicConsumeArguments.setConsumerTag(channelId);basicConsumeArguments.setQueueName(queueName);basicConsumeArguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.isOk();}

针对 客户端 和 服务器 单元测试

  • 编写测试用例代码是十分重要的!
package com.example.demo;import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.BrokerServer;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;import java.io.File;
import java.io.IOException;public class MqClientTests {private BrokerServer brokerServer = null;private ConnectionFactory connectionFactory = null;private Thread t = null;@BeforeEachpublic void setUp() throws IOException {
//        1、先启动服务器DemoApplication.context = SpringApplication.run(DemoApplication.class);brokerServer = new BrokerServer(9090);t = new Thread(() -> {
//        这个 start 方法会进入一个死循环,使用一个新的线程来运行 start 即可!try {brokerServer.start();} catch (IOException e) {e.printStackTrace();}});t.start();//        2、配置 ConnectionFactoryconnectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);}@AfterEachpublic void tearDown() throws IOException {
//        停止服务器brokerServer.stop();
//        t.join();DemoApplication.context.close();//        删除必要的文件File file = new File("./data");FileUtils.deleteDirectory(file);connectionFactory = null;}@Testpublic void testConnection() throws IOException {Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);}@Testpublic void testChannel() throws IOException, InterruptedException {Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);}@Testpublic void testExchange() throws IOException, InterruptedException {Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.exchangeDelete("testExchange");Assertions.assertTrue(ok);//        此处稳妥起见,把该关闭的要进行关闭channel.close();connection.close();}@Testpublic void testQueue() throws IOException, InterruptedException{Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.queueDelete("testQueue");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testBinding() throws IOException, InterruptedException{Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);ok = channel.queueBind("testQueue","testExchange","testBindingKey");Assertions.assertTrue(ok);ok = channel.queueUnbind("testQueue","testExchange");Assertions.assertTrue(ok);channel.close();connection.close();}@Testpublic void testMessage() throws IOException, InterruptedException, MqException {Connection connection = connectionFactory.newConnection();Assertions.assertNotNull(connection);Channel channel = connection.createChannel();Assertions.assertNotNull(channel);boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok = channel.queueDeclare("testQueue",true,false,false,null);Assertions.assertTrue(ok);byte[] requestBody = "hello".getBytes();ok = channel.basicPublish("testExchange","testQueue",null,requestBody);Assertions.assertTrue(ok);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumeTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);Assertions.assertArrayEquals(requestBody,body);System.out.println("[消费数据] 结束!");}});Assertions.assertTrue(ok);Thread.sleep(500);channel.close();connection.close();}
}

这篇关于2024.2.27 模拟实现 RabbitMQ —— 网络通信设计(客户端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/749895

相关文章

C#之List集合去重复对象的实现方法

《C#之List集合去重复对象的实现方法》:本文主要介绍C#之List集合去重复对象的实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录C# List集合去重复对象方法1、测试数据2、测试数据3、知识点补充总结C# List集合去重复对象方法1、测试数据

Linux实现线程同步的多种方式汇总

《Linux实现线程同步的多种方式汇总》本文详细介绍了Linux下线程同步的多种方法,包括互斥锁、自旋锁、信号量以及它们的使用示例,通过这些同步机制,可以解决线程安全问题,防止资源竞争导致的错误,示例... 目录什么是线程同步?一、互斥锁(单人洗手间规则)适用场景:特点:二、条件变量(咖啡厅取餐系统)工作流

SpringBoot读取ZooKeeper(ZK)属性的方法实现

《SpringBoot读取ZooKeeper(ZK)属性的方法实现》本文主要介绍了SpringBoot读取ZooKeeper(ZK)属性的方法实现,强调使用@ConfigurationProperti... 目录1. 在配置文件中定义 ZK 属性application.propertiesapplicati

Java Multimap实现类与操作的具体示例

《JavaMultimap实现类与操作的具体示例》Multimap出现在Google的Guava库中,它为Java提供了更加灵活的集合操作,:本文主要介绍JavaMultimap实现类与操作的... 目录一、Multimap 概述Multimap 主要特点:二、Multimap 实现类1. ListMult

C#实现将Office文档(Word/Excel/PDF/PPT)转为Markdown格式

《C#实现将Office文档(Word/Excel/PDF/PPT)转为Markdown格式》Markdown凭借简洁的语法、优良的可读性,以及对版本控制系统的高度兼容性,逐渐成为最受欢迎的文档格式... 目录为什么要将文档转换为 Markdown 格式使用工具将 Word 文档转换为 Markdown(.

MyBatis设计SQL返回布尔值(Boolean)的常见方法

《MyBatis设计SQL返回布尔值(Boolean)的常见方法》这篇文章主要为大家详细介绍了MyBatis设计SQL返回布尔值(Boolean)的几种常见方法,文中的示例代码讲解详细,感兴趣的小伙伴... 目录方案一:使用COUNT查询存在性(推荐)方案二:条件表达式直接返回布尔方案三:存在性检查(EXI

Java反射实现多属性去重与分组功能

《Java反射实现多属性去重与分组功能》在Java开发中,​​List是一种非常常用的数据结构,通常我们会遇到这样的问题:如何处理​​List​​​中的相同字段?无论是去重还是分组,合理的操作可以提高... 目录一、开发环境与基础组件准备1.环境配置:2. 代码结构说明:二、基础反射工具:BeanUtils

使用Python实现base64字符串与图片互转的详细步骤

《使用Python实现base64字符串与图片互转的详细步骤》要将一个Base64编码的字符串转换为图片文件并保存下来,可以使用Python的base64模块来实现,这一过程包括解码Base64字符串... 目录1. 图片编码为 Base64 字符串2. Base64 字符串解码为图片文件3. 示例使用注意

使用Python实现获取屏幕像素颜色值

《使用Python实现获取屏幕像素颜色值》这篇文章主要为大家详细介绍了如何使用Python实现获取屏幕像素颜色值,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 一、一个小工具,按住F10键,颜色值会跟着显示。完整代码import tkinter as tkimport pyau

在Java中将XLS转换为XLSX的实现方案

《在Java中将XLS转换为XLSX的实现方案》在本文中,我们将探讨传统ExcelXLS格式与现代XLSX格式的结构差异,并为Java开发者提供转换方案,通过了解底层原理、性能优势及实用工具,您将掌握... 目录为什么升级XLS到XLSX值得投入?实际转换过程解析推荐技术方案对比Apache POI实现编程