浅谈如何自我实现一个消息队列服务器(5)—— 网络通信设计

2024-04-23 20:28

本文主要是介绍浅谈如何自我实现一个消息队列服务器(5)—— 网络通信设计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 一、前情回顾
  • 二、自定义应用层协议
    • 2.1、约定 自定义应用层协议 的协议格式
    • 2.2、约定 type 值
    • 2.4、约定 length
    • 2.5、约定 payload
  • 三、编写 请求类 及 响应类
  • 四、编写 公共参数类 及 公共返回值类
  • 四、编写不同 API 下对应的不同 参数类

一、前情回顾

在前面我们已经实现了使用虚拟主机来将 交换机、队列、绑定、消息 管理起来,并且在不同的虚拟主机里,其所含数据都是互不干扰的。那么由于我们设计的 mq 并不是一个单机程序,而是一个客户端-服务器程序,因此对于客户端-服务器程序来说,是需要使用网络通信来进行数据传输的,前面我们已经约定了基于 TCP 协议来进行客户端-服务器的通信,但是由于 TCP 协议仅是一个传输层协议,即 TCP 协议并不关心载荷部分(payload),但是因为客户端传入给服务器的数据是要作为请求的载荷部分来进行传入的,因此此时我们就需要基于 TCP 自定义应用层协议,通过应用层协议来实现将客户端发送的数据作为请求的载荷部分传输给服务器,以完成我们已有的功能。

一般我们常接触的应用层协议有:HTTP、JSON…但这些应用层协议属于文本协议,只能传输文本格式的数据,无法传输二进制格式数据。对于我们的 mq 来说,我们传输的是二进制数据(message,message 是二进制格式数据)。如果真的想使用文本格式的数据来传输二进制格式数据,可以使用 base64 编码将文本数据转码成二进制数据,但是这样的过程繁琐、低效,所以还是不考虑了。

当然也有一些现成的二进制方式,譬如说第三方库提供的 protobuffer、Hessian…,可以传输二进制数据。但是因为前面学习了网络通信、计算机原理的课程之后,还是希望在本次项目中,通过自定义一个应用层协议完成项目,以锻炼自己的所学知识,并加深印象。

二、自定义应用层协议

2.1、约定 自定义应用层协议 的协议格式

既然是进行网络通信,那必然既会有 请求,也会有 响应。此处我为了简化项目代码,便将请求、响应 的协议格式设成一致。

请求格式:
在这里插入图片描述

响应格式:
在这里插入图片描述
接下来来简述一下请求、响应里的3部分,分别是什么意思。

type:描述当前 请求 和 响应 分别是干什么的。

其实对于当前我们的项目 mq 来说,客户端(生产者、消费者) 与 服务器 之间要进行的操作就是:客户端需要通过发送请求 来 触发服务器的虚拟主机里的9个核心API的某个具体的API,到底是哪个API,根据 tyoe 的值来确定,实现远程调用,服务器收到客户端的请求之后,会根据客户端的请求类型,返回对应的响应。

远程调用:我们期望客户端能够通过网络去远程调用服务器的虚拟主机里的9个核心API。因此此处 type 就是在描述当前的 请求/响应 调用的是哪个 API。

2.2、约定 type 值

我们约定 type值 对应的 API:
0x1: 创建channel
0x2: 关闭channel
0x3:exchangeDeclare() 创建交换机
0x4:exchangeDelete() 删除交换机
0x5:queueDeclare() 创建队列
0x6:queueDelete() 删除队列
0x7:queueBind() 创建绑定
0x8:queueUnbind() 删除绑定
0x9:basicPublish() 发布消息
0xa:basicConsume() 订阅消息
0xb:basicAck() 确认应答
0xc:subscribeReturns() 消费者订阅消息成功后,服务器收到来自生产者的消息后,会将消息推送给消费者,让消费者来消费消息

TCP 是有连接的,后续我们会定义一个 Connection 类 表示 一个TCP连接。由于 一个 TCP连接 的创建与销毁所需成本较高(TCP 连接需要 3次握手 才能完成,TCP断开连接 需要进行 4次挥手,才能完成),因此我们为了节约成本、开销,决定复用TCP,复用TCP的方式即:在逻辑上定义 channel(信道) 概念,一个 TCP连接 中可以有多个 channel,每个 channel 进行通信时互不干扰,相互独立。

2.4、约定 length

length:描述当前 payload 的长度。

2.5、约定 payload

payload 是变长的,其里边的内容会根据当前是 请求 还是 响应,当前的 type 值是什么,而有不同。具体多少个字节由 length 描述。

譬如:当 type = 0x3 ,且这是一个 请求,那么此时 payload 里的内容就相当于 exchangeDeclare() 这个 API 中的参数进行序列化之后的内容。

譬如:当 type = 0x3 ,但这是一个 响应,那么此时的 payload 里的内容就相当于 exchangeDeclare() 这个 API 其返回结果序列化之后的内容。

其实网络通信的目的就是让客户端得以通过网络远程调用服务器端的方法。那么此时客户端需要通过 请求报文/响应报文 告知 服务器:1、我客户端想调用服务器的哪个方法(在报文的 type 部分,我们已经约定了方法们对应的值,如0x1、0x2、0x3…)2、远程调用这个方法之后,客户端就需要传对应方法所需的参数过去给服务器(在报文中,我们使用 payload 来存放客户端传给服务器的参数(这些参数是被序列化之后的))。3、远程调用之后,服务器给客户端的 响应 报文中,payload 携带方法的返回值。

三、编写 请求类 及 响应类

在上边我们已经基于 TCP 协议自定义应用层协议了,那么此时就可以开始编写代码了。我们需要新建两个类,分别是:请求类和响应类,用于发起请求以及返回响应。我在项目的 common 包下 创建一个 Request 类,表示一个网络通信中的请求对象。再创建一个 Response 类,表示一个网络通信中的响应对象。

public class Request {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload = payload;}
}

四、编写 公共参数类 及 公共返回值类

可能有同学疑惑了?为什么要编写这两个类呢?前面我们已经讨论过了,请求报文 和 响应报文 中的 payload 的内容,其实就是远程调用的方法 的参数 或 返回值 序列化后的内容。但网络通信是复杂的,同时我们此时为了复用TCP,在一次连接中允许定义多个channel(一次TCP连接中,可以使用channe表明一个连接,channel可以有多个,同时她们之间互不干扰,通信也是相互独立,且创建、销毁的开销较小。我们无需使用一次TCP连接进行通信之后立马销毁,而是一次TCP连接长久不销毁,由里面的 channel 来进行多次的连接)。那么此时就会出现一个问题了:一个客户端连接服务器,借助 channel 可以发起多个请求,那必然会有多个响应返回,那么此时我要怎么知道当前哪个请求对应哪个响应?并且每次进行通信的 channel 的身份标识也需要被记录,不然一个TCP连接可以通过 channel 发起多个通信,此时怎么知道哪些请求、响应是对应哪个 channel。

因此定义一个公共参数类——BasicArguments ,来记录方法的公共参数,每个方法(API)都会有不同的参数,但是公共参数类里的参数,是必须都要含有的。因此后续每个不同方法所含有的参数类,都要继承这个公共参数类。

public class BasicArguments implements Serializable {
//    表示请求/响应的身份标识,可以把一次完整的请求、响应对上。通过 UUID 生成唯一标识的 ridprotected String rid;
//    表示 此次通信使用的 channel 的身份标识protected String channelId;public String getRid() {return rid;}public void setRid(String rid) {this.rid = rid;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}
}

公共返回值类 BasicReturns 也与公共参数类一致,不过由于远程调用的方法们都是 boolean 类型,因此比公共参数类多了一个 boolean 的成员变量罢了,用来表示当前远程调用方法的返回值。

这里还需要注意一个点,由于 公共参数类 与 公共返回值类 都是需要给后续不同远程调用方法们创建的不同参数类继承,因此类里的参数需是 protected 修饰,以便子类能够获取。

四、编写不同 API 下对应的不同 参数类

远程调用即调用服务器里定义的那些API,那么我们再回顾一下有哪些API。
1、创建交换机(exchangeDeclare)
2、销毁交换机(exchangeDelete)
3、创建队列(queueDeclare)
4、销毁队列(queueDelete)
5、创建绑定(queueBind)
6、解除绑定(queueUnbind)
7、发布消息(basicPublish)
8、订阅消息(basicConsume)
9、确认消息(basicAck)

那么我们就基于上述的 9 个 API,创建出 9 个 不同的 参数类。分别是 ExchangeDeclareArguments、ExchangeDeleteArguments、QueueDeclareArguments、QueueDeleteArguments、QueueBindArguments、QueueUnbindArguments、BasicPublishArguments、BasicConsumeArguments、BasicAckArguments 。这些类 都是 公共参数类 的子类。

还有一个类是 SubscribeReturns,由于消费者订阅消息成功之后,服务器收到了来自生产者生产的消息,就会将消息源源不断地推送给消费者,那么此时也需要定义一个 API,把服务器源源不断的推送给消费者的这些消息 作为 服务器 返回给 客户端的响应,这个方法 属于 公共返回值类 BasicReturns 的子类。

注意了:不管是 公共参数类,还是 公共返回值类 ,还是 不同的参数类,返回值类,都需要 实现 Serializable 接口,以便后续进行 序列化。

public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String,Object> arguments;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public ExchangeType getExchangeType() {return exchangeType;}public void setExchangeType(ExchangeType exchangeType) {this.exchangeType = exchangeType;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public Map<String, Object> getArguments() {return arguments;}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}

那么写到这里,我们已经可以明确,当我们的客户端发起一个请求时,该请求期望远程调用服务器的 exchangeDeclare() API,此时我们的 请求报文、响应报文 长什么样呢?

在这里插入图片描述

也就是说,当客户端远程调用 服务器 中的 API 时,(譬如说 exchangeDeclare(),那么 服务器中的 exchangeDeclare() API 其所需参数,就需要客户端通过 网络通信 传输过来的请求报文中的 payload部分携带其所需参数)。

在这里插入图片描述

在这里插入图片描述
上述我们定义了 请求类 Request,响应类 Response ,公共参数类 BasicArguments,公共返回值类 BasicReturns ,不同的参数类:ExchangeDeclareArguments、ExchangeDeleteArguments、QueueDeclareArguments、QueueDeleteArguments、QueueBindArguments、QueueUnbindArguments、BasicPublishArguments、BasicConsumeArguments、BasicAckArguments。不同的 返回值类:SubscribeReturns。

定义这些类,都是为了接下来的网络通信做准备,所以这些类,至关重要。

上面定义的 BasicConsumeArguments 类,由于API basicConsume() 其参数之一是回调方法 Consumer consumer ,回调方法是无法通过网络通信传输的,因此 BasicConsumeArguments 类 就无需具有 Comsumer 这样的成员变量。同时,客户端发起一个请求远程调用 basicConsume() API 进行订阅消息时,也无需将回调方法 Comsumer 作为请求的payload部分传给服务器,这是因为:1、回调方法 本就无法通过网络通信传输。2、回调方法的作用就是客户端针对订阅后的消息进行处理。不同的客户端对消息的处理都是不一样的,比如说 客户端A 收到消息后需要将消息打印至控制台,客户端B 收到消息后需要将消息保存起来,客户端C 收到消息后需要将消息进行加密…都不一样,因此此时客户端无需将自己处理消息的一套业务逻辑的回调方法作为参数之一跟着请求传给服务器。服务器中指定一个固定模板的回调方法即可,这个回调方法的作用就是在收到生产者发来的消息后,将消息推送/返回给响应消费者,消费者中又有自己一套业务逻辑的回调方法来处理/消费消息。

这篇关于浅谈如何自我实现一个消息队列服务器(5)—— 网络通信设计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/929803

相关文章

Go语言开发实现查询IP信息的MCP服务器

《Go语言开发实现查询IP信息的MCP服务器》随着MCP的快速普及和广泛应用,MCP服务器也层出不穷,本文将详细介绍如何在Go语言中使用go-mcp库来开发一个查询IP信息的MCP... 目录前言mcp-ip-geo 服务器目录结构说明查询 IP 信息功能实现工具实现工具管理查询单个 IP 信息工具的实现服

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

springboot上传zip包并解压至服务器nginx目录方式

《springboot上传zip包并解压至服务器nginx目录方式》:本文主要介绍springboot上传zip包并解压至服务器nginx目录方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录springboot上传zip包并解压至服务器nginx目录1.首先需要引入zip相关jar包2.然

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

SpringKafka错误处理(重试机制与死信队列)

《SpringKafka错误处理(重试机制与死信队列)》SpringKafka提供了全面的错误处理机制,通过灵活的重试策略和死信队列处理,下面就来介绍一下,具有一定的参考价值,感兴趣的可以了解一下... 目录引言一、Spring Kafka错误处理基础二、配置重试机制三、死信队列实现四、特定异常的处理策略五

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

浅谈配置MMCV环境,解决报错,版本不匹配问题

《浅谈配置MMCV环境,解决报错,版本不匹配问题》:本文主要介绍浅谈配置MMCV环境,解决报错,版本不匹配问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录配置MMCV环境,解决报错,版本不匹配错误示例正确示例总结配置MMCV环境,解决报错,版本不匹配在col

基于Python打造一个可视化FTP服务器

《基于Python打造一个可视化FTP服务器》在日常办公和团队协作中,文件共享是一个不可或缺的需求,所以本文将使用Python+Tkinter+pyftpdlib开发一款可视化FTP服务器,有需要的小... 目录1. 概述2. 功能介绍3. 如何使用4. 代码解析5. 运行效果6.相关源码7. 总结与展望1

使用Python开发一个简单的本地图片服务器

《使用Python开发一个简单的本地图片服务器》本文介绍了如何结合wxPython构建的图形用户界面GUI和Python内建的Web服务器功能,在本地网络中搭建一个私人的,即开即用的网页相册,文中的示... 目录项目目标核心技术栈代码深度解析完整代码工作流程主要功能与优势潜在改进与思考运行结果总结你是否曾经