本文主要是介绍浅谈如何自我实现一个消息队列服务器(5)—— 网络通信设计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 一、前情回顾
- 二、自定义应用层协议
- 2.1、约定 自定义应用层协议 的协议格式
- 2.2、约定 type 值
- 2.4、约定 length
- 2.5、约定 payload
- 三、编写 请求类 及 响应类
- 四、编写 公共参数类 及 公共返回值类
- 四、编写不同 API 下对应的不同 参数类
一、前情回顾
在前面我们已经实现了使用虚拟主机来将 交换机、队列、绑定、消息 管理起来,并且在不同的虚拟主机里,其所含数据都是互不干扰的。那么由于我们设计的 mq 并不是一个单机程序,而是一个客户端-服务器程序,因此对于客户端-服务器程序来说,是需要使用网络通信来进行数据传输的,前面我们已经约定了基于 TCP 协议来进行客户端-服务器的通信,但是由于 TCP 协议仅是一个传输层协议,即 TCP 协议并不关心载荷部分(payload),但是因为客户端传入给服务器的数据是要作为请求的载荷部分来进行传入的,因此此时我们就需要基于 TCP 自定义应用层协议,通过应用层协议来实现将客户端发送的数据作为请求的载荷部分传输给服务器,以完成我们已有的功能。
一般我们常接触的应用层协议有:HTTP、JSON…但这些应用层协议属于文本协议,只能传输文本格式的数据,无法传输二进制格式数据。对于我们的 mq 来说,我们传输的是二进制数据(message,message 是二进制格式数据)。如果真的想使用文本格式的数据来传输二进制格式数据,可以使用 base64 编码将文本数据转码成二进制数据,但是这样的过程繁琐、低效,所以还是不考虑了。
当然也有一些现成的二进制方式,譬如说第三方库提供的 protobuffer、Hessian…,可以传输二进制数据。但是因为前面学习了网络通信、计算机原理的课程之后,还是希望在本次项目中,通过自定义一个应用层协议完成项目,以锻炼自己的所学知识,并加深印象。
二、自定义应用层协议
2.1、约定 自定义应用层协议 的协议格式
既然是进行网络通信,那必然既会有 请求,也会有 响应。此处我为了简化项目代码,便将请求、响应 的协议格式设成一致。
请求格式:
响应格式:
接下来来简述一下请求、响应里的3部分,分别是什么意思。
type:描述当前 请求 和 响应 分别是干什么的。
其实对于当前我们的项目 mq 来说,客户端(生产者、消费者) 与 服务器 之间要进行的操作就是:客户端需要通过发送请求 来 触发服务器的虚拟主机里的9个核心API的某个具体的API,到底是哪个API,根据 tyoe 的值来确定,实现远程调用,服务器收到客户端的请求之后,会根据客户端的请求类型,返回对应的响应。
远程调用:我们期望客户端能够通过网络去远程调用服务器的虚拟主机里的9个核心API。因此此处 type 就是在描述当前的 请求/响应 调用的是哪个 API。
2.2、约定 type 值
我们约定 type值 对应的 API:
0x1: 创建channel
0x2: 关闭channel
0x3:exchangeDeclare() 创建交换机
0x4:exchangeDelete() 删除交换机
0x5:queueDeclare() 创建队列
0x6:queueDelete() 删除队列
0x7:queueBind() 创建绑定
0x8:queueUnbind() 删除绑定
0x9:basicPublish() 发布消息
0xa:basicConsume() 订阅消息
0xb:basicAck() 确认应答
0xc:subscribeReturns() 消费者订阅消息成功后,服务器收到来自生产者的消息后,会将消息推送给消费者,让消费者来消费消息
TCP 是有连接的,后续我们会定义一个 Connection 类 表示 一个TCP连接。由于 一个 TCP连接 的创建与销毁所需成本较高(TCP 连接需要 3次握手 才能完成,TCP断开连接 需要进行 4次挥手,才能完成),因此我们为了节约成本、开销,决定复用TCP,复用TCP的方式即:在逻辑上定义 channel(信道) 概念,一个 TCP连接 中可以有多个 channel,每个 channel 进行通信时互不干扰,相互独立。
2.4、约定 length
length:描述当前 payload 的长度。
2.5、约定 payload
payload 是变长的,其里边的内容会根据当前是 请求 还是 响应,当前的 type 值是什么,而有不同。具体多少个字节由 length 描述。
譬如:当 type = 0x3 ,且这是一个 请求,那么此时 payload 里的内容就相当于 exchangeDeclare() 这个 API 中的参数进行序列化之后的内容。
譬如:当 type = 0x3 ,但这是一个 响应,那么此时的 payload 里的内容就相当于 exchangeDeclare() 这个 API 其返回结果序列化之后的内容。
其实网络通信的目的就是让客户端得以通过网络远程调用服务器端的方法。那么此时客户端需要通过 请求报文/响应报文 告知 服务器:1、我客户端想调用服务器的哪个方法(在报文的 type 部分,我们已经约定了方法们对应的值,如0x1、0x2、0x3…)2、远程调用这个方法之后,客户端就需要传对应方法所需的参数过去给服务器(在报文中,我们使用 payload 来存放客户端传给服务器的参数(这些参数是被序列化之后的))。3、远程调用之后,服务器给客户端的 响应 报文中,payload 携带方法的返回值。
三、编写 请求类 及 响应类
在上边我们已经基于 TCP 协议自定义应用层协议了,那么此时就可以开始编写代码了。我们需要新建两个类,分别是:请求类和响应类,用于发起请求以及返回响应。我在项目的 common 包下 创建一个 Request 类,表示一个网络通信中的请求对象。再创建一个 Response 类,表示一个网络通信中的响应对象。
public class Request {private int type;private int length;private byte[] payload;public int getType() {return type;}public void setType(int type) {this.type = type;}public int getLength() {return length;}public void setLength(int length) {this.length = length;}public byte[] getPayload() {return payload;}public void setPayload(byte[] payload) {this.payload = payload;}
}
四、编写 公共参数类 及 公共返回值类
可能有同学疑惑了?为什么要编写这两个类呢?前面我们已经讨论过了,请求报文 和 响应报文 中的 payload 的内容,其实就是远程调用的方法 的参数 或 返回值 序列化后的内容。但网络通信是复杂的,同时我们此时为了复用TCP,在一次连接中允许定义多个channel(一次TCP连接中,可以使用channe表明一个连接,channel可以有多个,同时她们之间互不干扰,通信也是相互独立,且创建、销毁的开销较小。我们无需使用一次TCP连接进行通信之后立马销毁,而是一次TCP连接长久不销毁,由里面的 channel 来进行多次的连接)。那么此时就会出现一个问题了:一个客户端连接服务器,借助 channel 可以发起多个请求,那必然会有多个响应返回,那么此时我要怎么知道当前哪个请求对应哪个响应?并且每次进行通信的 channel 的身份标识也需要被记录,不然一个TCP连接可以通过 channel 发起多个通信,此时怎么知道哪些请求、响应是对应哪个 channel。
因此定义一个公共参数类——BasicArguments ,来记录方法的公共参数,每个方法(API)都会有不同的参数,但是公共参数类里的参数,是必须都要含有的。因此后续每个不同方法所含有的参数类,都要继承这个公共参数类。
public class BasicArguments implements Serializable {
// 表示请求/响应的身份标识,可以把一次完整的请求、响应对上。通过 UUID 生成唯一标识的 ridprotected String rid;
// 表示 此次通信使用的 channel 的身份标识protected String channelId;public String getRid() {return rid;}public void setRid(String rid) {this.rid = rid;}public String getChannelId() {return channelId;}public void setChannelId(String channelId) {this.channelId = channelId;}
}
公共返回值类 BasicReturns 也与公共参数类一致,不过由于远程调用的方法们都是 boolean 类型,因此比公共参数类多了一个 boolean 的成员变量罢了,用来表示当前远程调用方法的返回值。
这里还需要注意一个点,由于 公共参数类 与 公共返回值类 都是需要给后续不同远程调用方法们创建的不同参数类继承,因此类里的参数需是 protected 修饰,以便子类能够获取。
四、编写不同 API 下对应的不同 参数类
远程调用即调用服务器里定义的那些API,那么我们再回顾一下有哪些API。
1、创建交换机(exchangeDeclare)
2、销毁交换机(exchangeDelete)
3、创建队列(queueDeclare)
4、销毁队列(queueDelete)
5、创建绑定(queueBind)
6、解除绑定(queueUnbind)
7、发布消息(basicPublish)
8、订阅消息(basicConsume)
9、确认消息(basicAck)
那么我们就基于上述的 9 个 API,创建出 9 个 不同的 参数类。分别是 ExchangeDeclareArguments、ExchangeDeleteArguments、QueueDeclareArguments、QueueDeleteArguments、QueueBindArguments、QueueUnbindArguments、BasicPublishArguments、BasicConsumeArguments、BasicAckArguments 。这些类 都是 公共参数类 的子类。
还有一个类是 SubscribeReturns,由于消费者订阅消息成功之后,服务器收到了来自生产者生产的消息,就会将消息源源不断地推送给消费者,那么此时也需要定义一个 API,把服务器源源不断的推送给消费者的这些消息 作为 服务器 返回给 客户端的响应,这个方法 属于 公共返回值类 BasicReturns 的子类。
注意了:不管是 公共参数类,还是 公共返回值类 ,还是 不同的参数类,返回值类,都需要 实现 Serializable 接口,以便后续进行 序列化。
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String,Object> arguments;public String getExchangeName() {return exchangeName;}public void setExchangeName(String exchangeName) {this.exchangeName = exchangeName;}public ExchangeType getExchangeType() {return exchangeType;}public void setExchangeType(ExchangeType exchangeType) {this.exchangeType = exchangeType;}public boolean isDurable() {return durable;}public void setDurable(boolean durable) {this.durable = durable;}public boolean isAutoDelete() {return autoDelete;}public void setAutoDelete(boolean autoDelete) {this.autoDelete = autoDelete;}public Map<String, Object> getArguments() {return arguments;}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}
}
那么写到这里,我们已经可以明确,当我们的客户端发起一个请求时,该请求期望远程调用服务器的 exchangeDeclare() API,此时我们的 请求报文、响应报文 长什么样呢?
也就是说,当客户端远程调用 服务器 中的 API 时,(譬如说 exchangeDeclare(),那么 服务器中的 exchangeDeclare() API 其所需参数,就需要客户端通过 网络通信 传输过来的请求报文中的 payload部分携带其所需参数)。
上述我们定义了 请求类 Request,响应类 Response ,公共参数类 BasicArguments,公共返回值类 BasicReturns ,不同的参数类:ExchangeDeclareArguments、ExchangeDeleteArguments、QueueDeclareArguments、QueueDeleteArguments、QueueBindArguments、QueueUnbindArguments、BasicPublishArguments、BasicConsumeArguments、BasicAckArguments。不同的 返回值类:SubscribeReturns。
定义这些类,都是为了接下来的网络通信做准备,所以这些类,至关重要。
上面定义的 BasicConsumeArguments 类,由于API basicConsume() 其参数之一是回调方法 Consumer consumer ,回调方法是无法通过网络通信传输的,因此 BasicConsumeArguments 类 就无需具有 Comsumer 这样的成员变量。同时,客户端发起一个请求远程调用 basicConsume() API 进行订阅消息时,也无需将回调方法 Comsumer 作为请求的payload部分传给服务器,这是因为:1、回调方法 本就无法通过网络通信传输。2、回调方法的作用就是客户端针对订阅后的消息进行处理。不同的客户端对消息的处理都是不一样的,比如说 客户端A 收到消息后需要将消息打印至控制台,客户端B 收到消息后需要将消息保存起来,客户端C 收到消息后需要将消息进行加密…都不一样,因此此时客户端无需将自己处理消息的一套业务逻辑的回调方法作为参数之一跟着请求传给服务器。服务器中指定一个固定模板的回调方法即可,这个回调方法的作用就是在收到生产者发来的消息后,将消息推送/返回给响应消费者,消费者中又有自己一套业务逻辑的回调方法来处理/消费消息。
这篇关于浅谈如何自我实现一个消息队列服务器(5)—— 网络通信设计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!