IM项目:进阶版即时通讯项目---文件存储和消息转发

2024-08-26 16:44

本文主要是介绍IM项目:进阶版即时通讯项目---文件存储和消息转发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 文件传输服务
    • 基本功能
    • 模块划分
    • 流程图
    • 实现逻辑
    • 代码实现
  • 消息转发
    • 功能设计
    • 模块划分
    • 获取转发目标和消息处理
    • 代码实现

文件传输服务

基本功能

  1. 文件的上传
  2. 文件的下载

模块划分

  1. 基于gflags进行参数和配置文件的解析
  2. 基于spdlog进行日志输出
  3. 基于etcd进行服务注册
  4. 基于brpc进行RPC服务器远程调用
  5. 基于文件流操作进行读写的封装

流程图

在这里插入图片描述

实现逻辑

  1. 单个文件上传
  • 获取文件的元数据
  • 分配文件的ID
  • 以文件ID为文件名打开文件,写入数据
  • 组织响应返回
  1. 单个文件下载
  • 从请求中获取文件ID
  • 打开文件,获取大小,读取数据
  • 组织响应返回
  1. 多个文件上传

这个就是循环一下

  1. 多个文件下载

这个就是循环一下

代码实现

/*** @file file_server.hpp* @brief 文件传输服务,和语音传输服务基本相同* @author zhaobohan (zhaobohan_free@163.com)*/
#include <brpc/server.h>
#include <butil/logging.h>#include "../../common/etcd.hpp"     // 服务注册模块封装
#include "../../common/logger.hpp"   // 日志模块封装
#include "../../common/utils.hpp"
// #include "base.pb.h"
// #include "file.pb.h"#include "../build/base.pb.h"
#include "../build/file.pb.h"namespace im 
{// 对于文件服务的封装
class FileServiceImpl : public im::FileService 
{
public:FileServiceImpl(const std::string &storage_path): _storage_path(storage_path){umask(0);mkdir(storage_path.c_str(), 0775);if (_storage_path.back() != '/') _storage_path.push_back('/');}~FileServiceImpl(){}void GetSingleFile(google::protobuf::RpcController* controller,const ::im::GetSingleFileReq* request,::im::GetSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 取出请求中的文件ID(起始就是文件名)std::string fid = request->file_id();std::string filename = _storage_path + fid;// 2. 将文件ID作为文件名,读取文件数据std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_data()->set_file_id(fid);response->mutable_file_data()->set_file_content(body);}void GetMultiFile(google::protobuf::RpcController* controller,const ::im::GetMultiFileReq* request,::im::GetMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 循环取出请求中的文件ID,读取文件数据进行填充for (int i = 0; i < request->file_id_list_size(); i++) {std::string fid = request->file_id_list(i);std::string filename = _storage_path + fid;std::string body;bool ret = readFile(filename, body);if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 读取文件数据失败!", request->request_id());return;}FileDownloadData data;data.set_file_id(fid);data.set_file_content(body);response->mutable_file_data()->insert({fid, data});}response->set_success(true);}void PutSingleFile(google::protobuf::RpcController* controller,const ::im::PutSingleFileReq* request,::im::PutSingleFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());// 1. 为文件生成一个唯一uudi作为文件名 以及 文件IDstd::string fid = uuid();std::string filename = _storage_path + fid;// 2. 取出请求中的文件数据,进行文件数据写入bool ret = writeFile(filename, request->file_data().file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}// 3. 组织响应response->set_success(true);response->mutable_file_info()->set_file_id(fid);response->mutable_file_info()->set_file_size(request->file_data().file_size());response->mutable_file_info()->set_file_name(request->file_data().file_name());}void PutMultiFile(google::protobuf::RpcController* controller,const ::im::PutMultiFileReq* request,::im::PutMultiFileRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done);response->set_request_id(request->request_id());for (int i = 0; i < request->file_data_size(); i++) {std::string fid = uuid();std::string filename = _storage_path + fid;bool ret = writeFile(filename, request->file_data(i).file_content());if (ret == false) {response->set_success(false);response->set_errmsg("读取文件数据失败!");LOG_ERROR("{} 写入文件数据失败!", request->request_id());return;}im::FileMessageInfo *info  = response->add_file_info();info->set_file_id(fid);info->set_file_size(request->file_data(i).file_size());info->set_file_name(request->file_data(i).file_name());}response->set_success(true);}private:std::string _storage_path;
};class FileServer 
{
public:using ptr = std::shared_ptr<FileServer>;FileServer(const Registry::ptr &reg_client,const std::shared_ptr<brpc::Server> &server):_reg_client(reg_client),_rpc_server(server){}~FileServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};class FileServerBuilder 
{
public:// 用于构造服务注册客户端对象void make_reg_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads, const std::string &path = "./data/") {_rpc_server = std::make_shared<brpc::Server>();FileServiceImpl *file_service = new FileServiceImpl(path);int ret = _rpc_server->AddService(file_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}FileServer::ptr build() {if (!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}FileServer::ptr server = std::make_shared<FileServer>(_reg_client, _rpc_server);return server;}private:Registry::ptr _reg_client;std::shared_ptr<brpc::Server> _rpc_server;
};
}

消息转发

功能设计

消息转发子服务,主要是涉及到对于一条消息内容,组织消息的ID,以及其他元数据,然后传递给网关服务器,应该给谁进行发送

转发的目标一般是以聊天会话为基础进行传输的,通过会话就可以找到对应的聊天成员,作为一个转发的目标,并且还要把对应的数据存放在消息队列当中,消息队列收到消息后就会进行对应的回调处理,其实回调就是把数据存放在MySQL,elasticsearch,或者是文件存储系统中,这些内容我在后面的内容也都会涉及到,这里考虑到篇幅原因就不多多进行分析了

模块划分

  1. 基于gflags进行参数和配置文件解析
  2. 基于spdlog进行日志输出
  3. 基于etcd框架进行服务注册
  4. 基于ODB进行数据库对象操作
  5. 基于brpc进行RPC服务器搭建和远程调用
  6. 基于MQ将消息发布到消息队列并且进行对应的存储

下面来聊一下在这当中比较重要的接口

获取转发目标和消息处理

基本流程大概为

  1. 从请求中取出消息内容,会话ID,用户ID
  2. 根据用户ID,从用户管理子服务获取用户信息
  3. 根据消息内容进行填充消息结构,比如分配消息ID,填充发送者信息,填充消息产生时间
  4. 把消息传送给消息队列,进行持久化存储
  5. 从数据库获取目标会话的所有成员ID
  6. 组织响应,也就是完整的消息以及所有要被发送的用户ID,然后发送给网关,网关进行发送

代码实现

class TransmiteServiceImpl : public im::MsgTransmitService 
{
public:TransmiteServiceImpl(const std::string &user_service_name,const ServiceManager::ptr &channels,const std::shared_ptr<odb::core::database> &mysql_client,const std::string &exchange_name,const std::string &routing_key,const MQClient::ptr &mq_client): _user_service_name(user_service_name), _mm_channels(channels), _mysql_session_member_table(std::make_shared<ChatSessionMemeberTable>(mysql_client)), _exchange_name(exchange_name), _routing_key(routing_key), _mq_client(mq_client){}~TransmiteServiceImpl(){}// 获取转发对象void GetTransmitTarget(google::protobuf::RpcController* controller,const ::im::NewMessageReq* request,::im::GetTransmitTargetRsp* response,::google::protobuf::Closure* done) override {brpc::ClosureGuard rpc_guard(done);auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 从请求中获取关键信息:用户ID,所属会话ID,消息内容std::string rid = request->request_id();std::string uid = request->user_id();std::string chat_ssid = request->chat_session_id();const MessageContent &content = request->message();// 进行消息组织:发送者-用户子服务获取信息,所属会话,消息内容,产生时间,消息IDauto channel = _mm_channels->choose(_user_service_name);if (!channel) {LOG_ERROR("{}-{} 没有可供访问的用户子服务节点!", rid, _user_service_name);return err_response(rid, "没有可供访问的用户子服务节点!");}// 去查找用户的信息UserService_Stub stub(channel.get());GetUserInfoReq req;GetUserInfoRsp rsp;req.set_request_id(rid);req.set_user_id(uid);brpc::Controller cntl;stub.GetUserInfo(&cntl, &req, &rsp, nullptr);if (cntl.Failed() == true || rsp.success() == false) {LOG_ERROR("{} - 用户子服务调用失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "用户子服务调用失败!");}// 构造消息结构体MessageInfo message;message.set_message_id(uuid());message.set_chat_session_id(chat_ssid);message.set_timestamp(time(nullptr));message.mutable_sender()->CopyFrom(rsp.user_info());message.mutable_message()->CopyFrom(content);// 获取消息转发客户端用户列表auto target_list = _mysql_session_member_table->members(chat_ssid);// 将封装完毕的消息,发布到消息队列,待消息存储子服务进行消息持久化bool ret = _mq_client->publish(_exchange_name, message.SerializeAsString(), _routing_key);if (ret == false) {LOG_ERROR("{} - 持久化消息发布失败:{}!", request->request_id(), cntl.ErrorText());return err_response(request->request_id(), "持久化消息发布失败:!");}// 组织响应,告诉网关要给谁传消息response->set_request_id(rid);response->set_success(true);response->mutable_message()->CopyFrom(message);for (const auto &id : target_list) {response->add_target_id_list(id);}}private:// 用户子服务调用相关信息std::string _user_service_name;ServiceManager::ptr _mm_channels;// 聊天会话成员表的操作句柄ChatSessionMemeberTable::ptr _mysql_session_member_table;// 消息队列客户端句柄std::string _exchange_name;std::string _routing_key;MQClient::ptr _mq_client;
};class TransmiteServer 
{
public:using ptr = std::shared_ptr<TransmiteServer>;TransmiteServer(const std::shared_ptr<odb::core::database> &mysql_client,const Discovery::ptr discovery_client,const Registry::ptr &registry_client,const std::shared_ptr<brpc::Server> &server):_service_discoverer(discovery_client),_registry_client(registry_client),_mysql_client(mysql_client),_rpc_server(server){}~TransmiteServer(){}// 搭建RPC服务器,并启动服务器void start() {_rpc_server->RunUntilAskedToQuit();}private:// 服务发现客户端Discovery::ptr _service_discoverer;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};class TransmiteServerBuilder 
{
public:// 构造mysql客户端对象void make_mysql_object(const std::string &user,const std::string &pswd,const std::string &host,const std::string &db,const std::string &cset,int port,int conn_pool_count) {_mysql_client = ODBFactory::create(user, pswd, host, db, cset, port, conn_pool_count);}// 用于构造服务发现客户端&信道管理对象void make_discovery_object(const std::string &reg_host,const std::string &base_service_name,const std::string &user_service_name) {_user_service_name = user_service_name;_mm_channels = std::make_shared<ServiceManager>();_mm_channels->declared(user_service_name);LOG_DEBUG("设置用户子服务为需添加管理的子服务:{}", user_service_name);auto put_cb = std::bind(&ServiceManager::onServiceOnline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&ServiceManager::onServiceOffline, _mm_channels.get(), std::placeholders::_1, std::placeholders::_2);_service_discoverer = std::make_shared<Discovery>(reg_host, base_service_name, put_cb, del_cb);}// 用于构造服务注册客户端对象void make_registry_object(const std::string &reg_host,const std::string &service_name,const std::string &access_host) {_registry_client = std::make_shared<Registry>(reg_host);_registry_client->registry(service_name, access_host);}// 用于构造rabbitmq客户端对象void make_mq_object(const std::string &user, const std::string &passwd,const std::string &host,const std::string &exchange_name,const std::string &queue_name,const std::string &binding_key) {_routing_key = binding_key;_exchange_name = exchange_name;_mq_client = std::make_shared<MQClient>(user, passwd, host);_mq_client->declareComponents(exchange_name, queue_name, binding_key);}// 构造RPC服务器对象void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {if (!_mq_client) {LOG_ERROR("还未初始化消息队列客户端模块!");abort();}if (!_mm_channels) {LOG_ERROR("还未初始化信道管理模块!");abort();}if (!_mysql_client) {LOG_ERROR("还未初始化Mysql数据库模块!");abort();}_rpc_server = std::make_shared<brpc::Server>();TransmiteServiceImpl *transmite_service = new TransmiteServiceImpl(_user_service_name, _mm_channels, _mysql_client, _exchange_name, _routing_key, _mq_client);int ret = _rpc_server->AddService(transmite_service, brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}brpc::ServerOptions options;options.idle_timeout_sec = timeout;options.num_threads = num_threads;ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}TransmiteServer::ptr build() {if (!_service_discoverer) {LOG_ERROR("还未初始化服务发现模块!");abort();}if (!_registry_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}TransmiteServer::ptr server = std::make_shared<TransmiteServer>(_mysql_client, _service_discoverer, _registry_client, _rpc_server);return server;}private:std::string _user_service_name;ServiceManager::ptr _mm_channels;Discovery::ptr _service_discoverer;std::string _routing_key;std::string _exchange_name;MQClient::ptr _mq_client;// 服务注册客户端Registry::ptr _registry_client;// mysql数据库客户端std::shared_ptr<odb::core::database> _mysql_client;std::shared_ptr<brpc::Server> _rpc_server;
};

这篇关于IM项目:进阶版即时通讯项目---文件存储和消息转发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1109118

相关文章

vite搭建vue3项目的搭建步骤

《vite搭建vue3项目的搭建步骤》本文主要介绍了vite搭建vue3项目的搭建步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1.确保Nodejs环境2.使用vite-cli工具3.进入项目安装依赖1.确保Nodejs环境

idea+spring boot创建项目的搭建全过程

《idea+springboot创建项目的搭建全过程》SpringBoot是Spring社区发布的一个开源项目,旨在帮助开发者快速并且更简单的构建项目,:本文主要介绍idea+springb... 目录一.idea四种搭建方式1.Javaidea命名规范2JavaWebTomcat的安装一.明确tomcat

pycharm跑python项目易出错的问题总结

《pycharm跑python项目易出错的问题总结》:本文主要介绍pycharm跑python项目易出错问题的相关资料,当你在PyCharm中运行Python程序时遇到报错,可以按照以下步骤进行排... 1. 一定不要在pycharm终端里面创建环境安装别人的项目子模块等,有可能出现的问题就是你不报错都安装

uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)

《uni-app小程序项目中实现前端图片压缩实现方式(附详细代码)》在uni-app开发中,文件上传和图片处理是很常见的需求,但也经常会遇到各种问题,下面:本文主要介绍uni-app小程序项目中实... 目录方式一:使用<canvas>实现图片压缩(推荐,兼容性好)示例代码(小程序平台):方式二:使用uni

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

MyCat分库分表的项目实践

《MyCat分库分表的项目实践》分库分表解决大数据量和高并发性能瓶颈,MyCat作为中间件支持分片、读写分离与事务处理,本文就来介绍一下MyCat分库分表的实践,感兴趣的可以了解一下... 目录一、为什么要分库分表?二、分库分表的常见方案三、MyCat简介四、MyCat分库分表深度解析1. 架构原理2. 分

k8s搭建nfs共享存储实践

《k8s搭建nfs共享存储实践》本文介绍NFS服务端搭建与客户端配置,涵盖安装工具、目录设置及服务启动,随后讲解K8S中NFS动态存储部署,包括创建命名空间、ServiceAccount、RBAC权限... 目录1. NFS搭建1.1 部署NFS服务端1.1.1 下载nfs-utils和rpcbind1.1

Redis高性能Key-Value存储与缓存利器常见解决方案

《Redis高性能Key-Value存储与缓存利器常见解决方案》Redis是高性能内存Key-Value存储系统,支持丰富数据类型与持久化方案(RDB/AOF),本文给大家介绍Redis高性能Key-... 目录Redis:高性能Key-Value存储与缓存利器什么是Redis?为什么选择Redis?Red

linux查找java项目日志查找报错信息方式

《linux查找java项目日志查找报错信息方式》日志查找定位步骤:进入项目,用tail-f实时跟踪日志,tail-n1000查看末尾1000行,grep搜索关键词或时间,vim内精准查找并高亮定位,... 目录日志查找定位在当前文件里找到报错消息总结日志查找定位1.cd 进入项目2.正常日志 和错误日

在.NET项目中嵌入Python代码的实践指南

《在.NET项目中嵌入Python代码的实践指南》在现代开发中,.NET与Python的协作需求日益增长,从机器学习模型集成到科学计算,从脚本自动化到数据分析,然而,传统的解决方案(如HTTPAPI或... 目录一、CSnakes vs python.NET:为何选择 CSnakes?二、环境准备:从 Py