【Rust投稿】从零实现消息中间件(4)-SERVER.CLIENT

2024-06-23 00:32

本文主要是介绍【Rust投稿】从零实现消息中间件(4)-SERVER.CLIENT,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这部分主要说的是服务器端对于来自client连接的数据的处理. 主要功能包括

  1. 接收消息

  2. 收到sub消息,就记录到全局列表中

  3. 收到pub消息,就发送给相关订阅的client

  4. 出错,删除订阅,关闭连接

数据结构定义

Client中除了cid以外,其他两项都使用了Mutex进行保护,上一篇讲到过,凡是多线程读写的都需要Arc<Mutex>保护.

  • srv: 主要还是pub sub的时候都需要访问全局的sublist.

  • msg_sender: 之所以用Mutex保护是因为除了client自己要发送消息,当其他client pub消息的时候也要通过这个ClientMessageSender发送消息
    ClientMessageSender在我们这个版本中则非常简单,就是一个TcpStream的writer.
    rust #[derive(Debug)] pub struct Client<T: SubListTrait> { pub srv: Arc<Mutex<ServerState<T>>>, pub cid: u64, pub msg_sender: Arc<Mutex<ClientMessageSender>>, } #[derive(Debug)] pub struct ClientMessageSender { writer: WriteHalf<TcpStream>, }

代码实现

process_connection

  • 创建Client以及可以共享使用的ClientMessageSender

  • 启动client_task

    impl<T: SubListTrait + Send + 'static> Client<T> {pub fn process_connection(cid: u64,srv: Arc<Mutex<ServerState<T>>>,conn: TcpStream,
    ) -> Arc<Mutex<ClientMessageSender>> {let (reader, writer) = tokio::io::split(conn);let msg_sender = Arc::new(Mutex::new(ClientMessageSender::new(writer)));let c = Client {srv: srv,cid,msg_sender: msg_sender.clone(),};tokio::spawn(Client::client_task(c, reader));msg_sender
    }...
    }
    

client_task

主要功能:

  • 读取,解析消息

  • 分发消息给相应的处理函数

    • process_error

    • process_sub

    • process_pub

这个其实就是一个tcp连接的主循环,说到这里我想把tokio::spawn 和 go语言中的go关键字做一个类比.
在go中TcpServer接收到一个连接以后,紧接着就是单独起一个goroutine来处理.类似于go client.processConnection(),而到了Rust中基本上可以等价为

tokio::spawn(async move{Client::process_connection();
});

当然Rust重要复杂很多,涉及到所有权,生命周期等一系列问题.

 async fn client_task(self, mut reader: ReadHalf<TcpStream>) {let mut buf = [0; 1024];let mut parser = Parser::new();let mut subs = HashMap::new();loop {let r = reader.read(&mut buf[..]).await;if r.is_err() {let e = r.unwrap_err();self.process_error(e, subs).await;return;}let r = r.unwrap();let n = r;if n == 0 {self.process_error(NError::new(ERROR_CONNECTION_CLOSED), subs).await;return;}let mut buf = &buf[0..n];loop {let r = parser.parse(&buf[..]);if r.is_err() {self.process_error(r.unwrap_err(), subs).await;return;}let (result, left) = r.unwrap();match result {ParseResult::NoMsg => {break;}ParseResult::Sub(ref sub) => {if let Err(e) = self.process_sub(sub, &mut subs).await {self.process_error(e, subs).await;return;}}ParseResult::Pub(ref pub_arg) => {if let Err(e) = self.process_pub(pub_arg).await {self.process_error(e, subs).await;return;}}}if left == buf.len() {break;}buf = &buf[left..];}}}

从整个代码中也可以看出client_task的主要工作就是接受消息,并处理.

process_error

  1. 删除所有订阅

  2. 关闭连接
    rust async fn process_error<E: Error>(&self, err: E, subs: HashMap<String, ArcSubscription>) { println!("client {} process err {:?}", self.cid, err); { let mut sublist = &mut self.srv.lock().await.sublist; for (_, sub) in subs { sublist.remove(sub); } } let r = self.msg_sender.lock().await.writer.shutdown().await; if r.is_err() { println!("shutdown err {:?}", r.unwrap_err()); } }

process_sub

对于收到的sub则是

  1. 全局订阅列表中保存一份

  2. 本地连接保存一份,这样连接断开的时候好删除
    为了避免内存分配,我们的SubArg里面使用的还是Parer缓冲区中的内存,当我们需要在连接之外访问这些信息的时候,我们就必须单独保存一份了,这里我们用的是sub.subject.to_string()来分配一个新的内存.
    rust async fn process_sub( &self, sub: &SubArg<'_>, subs: &mut HashMap<String, ArcSubscription>, ) -> crate::error::Result<()> { let sub = Subscription { subject: sub.subject.to_string(), queue: sub.queue.map(|q| q.to_string()), sid: sub.sid.to_string(), msg_sender: self.msg_sender.clone(), }; let sub = Arc::new(sub); subs.insert(sub.subject.clone(), sub.clone()); let sublist = &mut self.srv.lock().await.sublist; sublist.insert(sub); Ok(()) }

process_pub

收到pub消息,

  1. 查找所有的订阅

  2. 将消息逐一转发给他们
    转发的过程中要稍微麻烦一点,因为考虑到设计中的负载均衡问题,qsubs则是从同一个queue中随机选择一个来推送消息.
    rust async fn process_pub(&self, pub_arg: &PubArg<'_>) -> crate::error::Result<()> { let sub_result = { let sub_list = &mut self.srv.lock().await.sublist; sub_list.match_subject(pub_arg.subject)? }; for sub in sub_result.subs.iter() { self.send_message(sub.as_ref(), pub_arg) .await .map_err(|e| NError::new(ERROR_CONNECTION_CLOSED))?; } //qsubs 要考虑负载均衡问题 let mut rng = rand::rngs::StdRng::from_entropy(); for qsubs in sub_result.qsubs.iter() { let n = rng.next_u32(); let n = n as usize % qsubs.len(); let sub = qsubs.get(n).unwrap(); self.send_message(sub.as_ref(), pub_arg) .await .map_err(|e| NError::new(ERROR_CONNECTION_CLOSED))?; } Ok(()) }

send_message

就是拼装消息格式
因为是第一个版本,也是展示关键api的使用,里面用到了大量的await,实际上没有必要.
实际项目中,肯定会使用缓冲区来做.

///消息格式
///```
/// MSG <subject> <sid> <size>\r\n
/// <message>\r\n
/// ```
async fn send_message(&self, sub: &Subscription, pub_arg: &PubArg<'_>) -> std::io::Result<()> {let writer = &mut sub.msg_sender.lock().await.writer;writer.write("MSG ".as_bytes()).await?;writer.write(sub.subject.as_bytes()).await?;writer.write(" ".as_bytes()).await?;writer.write(sub.sid.as_bytes()).await?;writer.write(" ".as_bytes()).await?;writer.write(pub_arg.size_buf.as_bytes()).await?;writer.write("\r\n".as_bytes()).await?;writer.write(pub_arg.msg).await?;writer.write("\r\n".as_bytes()).await?;Ok(())
}

其他

相关代码都在我的github rnats 欢迎围观

https://github.com/nkbai/learnrustbynats

这篇关于【Rust投稿】从零实现消息中间件(4)-SERVER.CLIENT的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1085827

相关文章

Spring Boot配置和使用两个数据源的实现步骤

《SpringBoot配置和使用两个数据源的实现步骤》本文详解SpringBoot配置双数据源方法,包含配置文件设置、Bean创建、事务管理器配置及@Qualifier注解使用,强调主数据源标记、代... 目录Spring Boot配置和使用两个数据源技术背景实现步骤1. 配置数据源信息2. 创建数据源Be

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

linux批量替换文件内容的实现方式

《linux批量替换文件内容的实现方式》本文总结了Linux中批量替换文件内容的几种方法,包括使用sed替换文件夹内所有文件、单个文件内容及逐行字符串,强调使用反引号和绝对路径,并分享个人经验供参考... 目录一、linux批量替换文件内容 二、替换文件内所有匹配的字符串 三、替换每一行中全部str1为st

SpringBoot集成MyBatis实现SQL拦截器的实战指南

《SpringBoot集成MyBatis实现SQL拦截器的实战指南》这篇文章主要为大家详细介绍了SpringBoot集成MyBatis实现SQL拦截器的相关知识,文中的示例代码讲解详细,有需要的小伙伴... 目录一、为什么需要SQL拦截器?二、MyBATis拦截器基础2.1 核心接口:Interceptor

SpringBoot集成EasyPoi实现Excel模板导出成PDF文件

《SpringBoot集成EasyPoi实现Excel模板导出成PDF文件》在日常工作中,我们经常需要将数据导出成Excel表格或PDF文件,本文将介绍如何在SpringBoot项目中集成EasyPo... 目录前言摘要简介源代码解析应用场景案例优缺点分析类代码方法介绍测试用例小结前言在日常工作中,我们经

基于Python实现简易视频剪辑工具

《基于Python实现简易视频剪辑工具》这篇文章主要为大家详细介绍了如何用Python打造一个功能完备的简易视频剪辑工具,包括视频文件导入与格式转换,基础剪辑操作,音频处理等功能,感兴趣的小伙伴可以了... 目录一、技术选型与环境搭建二、核心功能模块实现1. 视频基础操作2. 音频处理3. 特效与转场三、高

Python实现中文文本处理与分析程序的示例详解

《Python实现中文文本处理与分析程序的示例详解》在当今信息爆炸的时代,文本数据的处理与分析成为了数据科学领域的重要课题,本文将使用Python开发一款基于Python的中文文本处理与分析程序,希望... 目录一、程序概述二、主要功能解析2.1 文件操作2.2 基础分析2.3 高级分析2.4 可视化2.5

Java实现预览与打印功能详解

《Java实现预览与打印功能详解》在Java中,打印功能主要依赖java.awt.print包,该包提供了与打印相关的一些关键类,比如PrinterJob和PageFormat,它们构成... 目录Java 打印系统概述打印预览与设置使用 PageFormat 和 PrinterJob 类设置页面格式与纸张

使用Go实现文件复制的完整流程

《使用Go实现文件复制的完整流程》本案例将实现一个实用的文件操作工具:将一个文件的内容完整复制到另一个文件中,这是文件处理中的常见任务,比如配置文件备份、日志迁移、用户上传文件转存等,文中通过代码示例... 目录案例说明涉及China编程知识点示例代码代码解析示例运行练习扩展小结案例说明我们将通过标准库 os

Python实现终端清屏的几种方式详解

《Python实现终端清屏的几种方式详解》在使用Python进行终端交互式编程时,我们经常需要清空当前终端屏幕的内容,本文为大家整理了几种常见的实现方法,有需要的小伙伴可以参考下... 目录方法一:使用 `os` 模块调用系统命令方法二:使用 `subprocess` 模块执行命令方法三:打印多个换行符模拟