TeamTalk消息服务器(未读计数)

2024-09-01 07:44

本文主要是介绍TeamTalk消息服务器(未读计数),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

信令和协议设计

enum MessageCmdID {// ...... 省略无关逻辑 CID_MSG_UNREAD_CNT_REQUEST = 775,CID_MSG_UNREAD_CNT_RESPONSE = 776,// ...... 省略无关逻辑 
};message IMUnreadMsgCntReq{//cmd id:		0x0307required uint32 user_id = 1;optional bytes attach_data = 20;	
}message IMUnreadMsgCntRsp{//cmd id:		0x0308required uint32 user_id = 1;required uint32 total_cnt = 2; // 多个人的未读消息repeated IM.BaseDefine.UnreadInfo unreadinfo_list = 3;optional bytes attach_data = 20;
}message UnreadInfo{required uint32 session_id = 1; // 会话IDrequired SessionType session_type = 2; // 会话类型required uint32 unread_cnt = 3; // 未读消息数量required uint32 latest_msg_id = 4; // 最新的消息IDrequired bytes latest_msg_data = 5; // 最新的消息required MsgType latest_msg_type = 6;  // 消息类型required uint32 latest_msg_from_user_id = 7;  //发送的用户id
}

流程图:

请添加图片描述

代码分析

msg_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 CMsgConn::_HandleClientUnreadMsgCntRequest 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// ...... 省略无关逻辑 switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑          case CID_MSG_UNREAD_CNT_REQUEST:_HandleClientUnreadMsgCntRequest(pPdu );break;              // ...... 省略无关逻辑 }
}void CMsgConn::_HandleClientUnreadMsgCntRequest(CImPdu* pPdu)
{log("HandleClientUnreadMsgCntReq, from_id=%u ", GetUserId());IM::Message::IMUnreadMsgCntReq msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));CDBServConn* pDBConn = get_db_serv_conn_for_login();if (pDBConn) {CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_user_id(GetUserId());msg.set_attach_data(attach.GetBuffer(), attach.GetLength());pPdu->SetPBMsg(&msg);pDBConn->SendPdu(pPdu);}
}

db_proxy_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 DB_PROXY::getUnreadMsgCounter函数

值得注意的是,返回的未读消息里面包含每个会话的未读消息个数,消息类型,最后一条消息。

m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));void getUnreadMsgCounter(CImPdu* pPdu, uint32_t conn_uuid)
{IM::Message::IMUnreadMsgCntReq msg;IM::Message::IMUnreadMsgCntRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){CImPdu* pPduResp = new CImPdu;uint32_t nUserId = msg.user_id();list<IM::BaseDefine::UnreadInfo> lsUnreadCount;uint32_t nTotalCnt = 0;// 从redis获取未读消息数量 和 从mysql获取最后一条未读消息CMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);CGroupMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);msgResp.set_user_id(nUserId);msgResp.set_total_cnt(nTotalCnt);for(auto it= lsUnreadCount.begin(); it!=lsUnreadCount.end(); ++it){IM::BaseDefine::UnreadInfo* pInfo = msgResp.add_unreadinfo_list();pInfo->set_session_id(it->session_id());pInfo->set_session_type(it->session_type());pInfo->set_unread_cnt(it->unread_cnt());pInfo->set_latest_msg_id(it->latest_msg_id());pInfo->set_latest_msg_data(it->latest_msg_data());pInfo->set_latest_msg_type(it->latest_msg_type());pInfo->set_latest_msg_from_user_id(it->latest_msg_from_user_id());}log("userId=%d, unreadCnt=%u, totalCount=%u", nUserId, msgResp.unreadinfo_list_size(), nTotalCnt);msgResp.set_attach_data(msg.attach_data());pPduResp->SetPBMsg(&msgResp);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_UNREAD_CNT_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("parse pb failed");}
}
void CMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{// redisCacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){map<string, string> mapUnread;string strKey = "unread_" + int2string(nUserId);bool bRet = pCacheConn->hgetAll(strKey, mapUnread);pCacheManager->RelCacheConn(pCacheConn);if(bRet){IM::BaseDefine::UnreadInfo cUnreadInfo;for (auto it = mapUnread.begin(); it != mapUnread.end(); it++) {cUnreadInfo.set_session_id(atoi(it->first.c_str()));cUnreadInfo.set_unread_cnt(atoi(it->second.c_str()));cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_SINGLE);uint32_t nMsgId = 0;string strMsgData;IM::BaseDefine::MsgType nMsgType;// 从mysql获取最后一条未读消息 mysqlgetLastMsg(cUnreadInfo.session_id(), nUserId, nMsgId, strMsgData, nMsgType); if(IM::BaseDefine::MsgType_IsValid(nMsgType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nMsgType);cUnreadInfo.set_latest_msg_from_user_id(cUnreadInfo.session_id());lsUnreadCount.push_back(cUnreadInfo);nTotalCnt += cUnreadInfo.unread_cnt();}else{log("invalid msgType. userId=%u, peerId=%u, msgType=%u", nUserId, cUnreadInfo.session_id(), nMsgType);}}}else{log("hgetall %s failed!", strKey.c_str());}}else{log("no cache connection for unread");}
}
void CMessageModel::getLastMsg(uint32_t nFromId, uint32_t nToId, uint32_t& nMsgId, string& strMsgData, IM::BaseDefine::MsgType& nMsgType, uint32_t nStatus)
{uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, false);if (nRelateId != INVALID_VALUE){CDBManager* pDBManager = CDBManager::getInstance();// 读从库CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){string strTableName = "IMMessage_" + int2string(nRelateId % 8);string strSql = "select msgId,type,content from " + strTableName + " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId) + " and status = 0 order by created desc, id desc limit 1";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nMsgId = pResultSet->GetInt("msgId");nMsgType = IM::BaseDefine::MsgType(pResultSet->GetInt("type"));if (nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO){// "[语音]"加密后的字符串strMsgData = strAudioEnc;}else{strMsgData = pResultSet->GetString("content");}}delete pResultSet;}else{log("no result set: %s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);}else{log("no db connection_slave");}}else{log("no relation between %lu and %lu", nFromId, nToId);}
}

db_proxy_server回复信令CID_MSG_UNREAD_CNT_RESPONSE给msg_server,调用CDBServConn::_HandleUnreadMsgCountResponse

void CDBServConn::_HandleUnreadMsgCountResponse(CImPdu* pPdu)
{IM::Message::IMUnreadMsgCntRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t total_cnt = msg.total_cnt();uint32_t user_unread_cnt = msg.unreadinfo_list_size();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleUnreadMsgCntResp, userId=%u, total_cnt=%u, user_unread_cnt=%u.", user_id,total_cnt, user_unread_cnt);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, handle);if (pMsgConn && pMsgConn->IsOpen()) {msg.clear_attach_data();pPdu->SetPBMsg(&msg);pMsgConn->SendPdu(pPdu);}
}

这篇关于TeamTalk消息服务器(未读计数)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1126340

相关文章

MySQL MCP 服务器安装配置最佳实践

《MySQLMCP服务器安装配置最佳实践》本文介绍MySQLMCP服务器的安装配置方法,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下... 目录mysql MCP 服务器安装配置指南简介功能特点安装方法数据库配置使用MCP Inspector进行调试开发指

在Windows上使用qemu安装ubuntu24.04服务器的详细指南

《在Windows上使用qemu安装ubuntu24.04服务器的详细指南》本文介绍了在Windows上使用QEMU安装Ubuntu24.04的全流程:安装QEMU、准备ISO镜像、创建虚拟磁盘、配置... 目录1. 安装QEMU环境2. 准备Ubuntu 24.04镜像3. 启动QEMU安装Ubuntu4

C++ RabbitMq消息队列组件详解

《C++RabbitMq消息队列组件详解》:本文主要介绍C++RabbitMq消息队列组件的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1. RabbitMq介绍2. 安装RabbitMQ3. 安装 RabbitMQ 的 C++客户端库4. A

Windows Server 2025 搭建NPS-Radius服务器的步骤

《WindowsServer2025搭建NPS-Radius服务器的步骤》本文主要介绍了通过微软的NPS角色实现一个Radius服务器,身份验证和证书使用微软ADCS、ADDS,具有一定的参考价... 目录简介示意图什么是 802.1X?核心作用802.1X的组成角色工作流程简述802.1X常见应用802.

SpringCloud整合MQ实现消息总线服务方式

《SpringCloud整合MQ实现消息总线服务方式》:本文主要介绍SpringCloud整合MQ实现消息总线服务方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录一、背景介绍二、方案实践三、升级版总结一、背景介绍每当修改配置文件内容,如果需要客户端也同步更新,

使用Nginx配置文件服务器方式

《使用Nginx配置文件服务器方式》:本文主要介绍使用Nginx配置文件服务器方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1. 为什么选择 Nginx 作为文件服务器?2. 环境准备3. 配置 Nginx 文件服务器4. 将文件放入服务器目录5. 启动 N

一文带你搞懂Redis Stream的6种消息处理模式

《一文带你搞懂RedisStream的6种消息处理模式》Redis5.0版本引入的Stream数据类型,为Redis生态带来了强大而灵活的消息队列功能,本文将为大家详细介绍RedisStream的6... 目录1. 简单消费模式(Simple Consumption)基本概念核心命令实现示例使用场景优缺点2

Go语言开发实现查询IP信息的MCP服务器

《Go语言开发实现查询IP信息的MCP服务器》随着MCP的快速普及和广泛应用,MCP服务器也层出不穷,本文将详细介绍如何在Go语言中使用go-mcp库来开发一个查询IP信息的MCP... 目录前言mcp-ip-geo 服务器目录结构说明查询 IP 信息功能实现工具实现工具管理查询单个 IP 信息工具的实现服

Redis消息队列实现异步秒杀功能

《Redis消息队列实现异步秒杀功能》在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给Redis处理,并通过异步方式执行,Redis提供了多种数据结构来实现消息队列,总结三种,本文详细介绍Re... 目录1 Redis消息队列1.1 List 结构1.2 Pub/Sub 模式1.3 Stream 结

springboot上传zip包并解压至服务器nginx目录方式

《springboot上传zip包并解压至服务器nginx目录方式》:本文主要介绍springboot上传zip包并解压至服务器nginx目录方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录springboot上传zip包并解压至服务器nginx目录1.首先需要引入zip相关jar包2.然