mediasoup源码分析--channel创建及信令交互

2024-06-18 09:44

本文主要是介绍mediasoup源码分析--channel创建及信令交互,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

mediasoup源码分析--channel创建及信令交互

    • 概述
    • 跨职能图
    • 业务流程图
    • 代码剖析

概述

在golang实现mediasoup的tcp服务及channel通道一文中,已经介绍过信令服务中tcp和channel的创建,本文主要讲解c++中mediasoup的channel创建,以及信令服务和mediasoup服务如何交互

跨职能图

c92bb199ca71184775fa8bea149e201.png

业务流程图

image.png

数据发送有两种方式:
应用层发送的request最后被封装在Requst对象中,其中包含着"id",因为Request对象中包含着Channel::UnixStreamSocket对象,所以可以直接调用Request::Accept()将处理后的结果告诉应用层进程。
Worker进程也可以主动给应用层进程发送消息,通过Notifier::Emit()即可以给应用进程发送消息,Notifier类中有Channel::UnixStreamSocket,所以直接调用Channel::UnixStreamSocket::Send()就可以发送消息。Notifier类内部的数据成员和函数成员都是静态的,所以在任意位置可以直接通过Channel::Notifier::Emit()函数发送消息。

代码剖析

1.channel创建

int main(int argc, char* argv[])
{// Ensure we are called by our Node library.if (argc == 1){std::cerr << "ERROR: you don't seem to be my real father" << std::endl;std::_Exit(EXIT_FAILURE);}std::string id = std::string(argv[1]);std::string ip = std::string(argv[2]);int port = atoi(argv[3]);int iperfPort = atoi(argv[4]);// Initialize libuv stuff (we need it for the Channel).DepLibUV::ClassInit();//..........省略部分代码..............// Set the Channel socket (this will be handled and deleted by the Worker).printf("new Channel to %s:%d\n",ip.c_str(),port);auto* channel = new Channel::UnixStreamSocket(ip,port);//..........省略部分代码..............try{// Run the Worker.Worker worker(id,channel);// Worker ended.destroy();exitSuccess();}catch (const MediaSoupError& error){MS_ERROR_STD("failure exit: %s", error.what());destroy();exitWithError();}
}

UnixStreamSocket构造函数

UnixStreamSocket::UnixStreamSocket(const std::string& ip,int port) : ::UnixStreamSocket::UnixStreamSocket(ip,port, MaxSize)
{MS_TRACE_STD();// Create the JSON reader.{Json::CharReaderBuilder builder;Json::Value settings = Json::nullValue;Json::Value invalidSettings;builder.strictMode(&settings);MS_ASSERT(builder.validate(&invalidSettings), "invalid Json::CharReaderBuilder");this->jsonReader = builder.newCharReader();}// Create the JSON writer.{Json::StreamWriterBuilder builder;Json::Value invalidSettings;builder["commentStyle"]            = "None";builder["indentation"]             = "";builder["enableYAMLCompatibility"] = false;builder["dropNullPlaceholders"]    = false;MS_ASSERT(builder.validate(&invalidSettings), "invalid Json::StreamWriterBuilder");this->jsonWriter = builder.newStreamWriter();}
}

跳转到handles\UnixStreamSocket.cpp下

UnixStreamSocket::UnixStreamSocket( const std::string& ip,int port,size_t bufferSize) : bufferSize(bufferSize)
{printf("::UnixStreamSocket::UnixStreamSocket\n");MS_TRACE_STD();int err;this->uvHandle       = new uv_tcp_t;this->uvHandle->data = (void*)this;err = uv_tcp_init(DepLibUV::GetLoop(), this->uvHandle);if (err != 0){delete this->uvHandle;this->uvHandle = nullptr;printf("uv_tcp_init() failed: %s\n", uv_strerror(err));MS_THROW_ERROR_STD("uv_tcp_init() failed: %s", uv_strerror(err));}struct sockaddr_in dest;uv_ip4_addr(ip.c_str(), port, &dest);this->connect = new uv_connect_t;printf("will connect to %s:%d\n",ip.c_str(),port);err = uv_tcp_connect(this->connect, this->uvHandle, (const struct sockaddr*)&dest, onConnect);if (err != 0){delete this->uvHandle;this->uvHandle = nullptr;printf("uv_tcp_connect() failed: %s\n", uv_strerror(err));MS_THROW_ERROR_STD("uv_tcp_connect() failed: %s", uv_strerror(err));}// Start reading.err = uv_read_start(reinterpret_cast<uv_stream_t*>(this->uvHandle),static_cast<uv_alloc_cb>(onAlloc),static_cast<uv_read_cb>(onRead));if (err != 0){uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));MS_THROW_ERROR_STD("uv_read_start() failed: %s", uv_strerror(err));}// NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb().
}

代码中的uv_read_start接口中onRead回调

    err = uv_read_start(reinterpret_cast<uv_stream_t*>(this->uvHandle),static_cast<uv_alloc_cb>(onAlloc),static_cast<uv_read_cb>(onRead));

跳转到onRead中

inline static void onRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{auto* socket = static_cast<UnixStreamSocket*>(handle->data);if (socket == nullptr)return;socket->OnUvRead(nread, buf);
}

OnUvRead中调用UserOnUnixStreamRead

	void UnixStreamSocket::UserOnUnixStreamRead(){MS_TRACE_STD();// Be ready to parse more than a single message in a single TCP chunk.while (true){if (IsClosed())return;size_t readLen  = this->bufferDataLen - this->msgStart;char* jsonStart = nullptr;size_t jsonLen;int nsRet = netstring_read(reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &jsonStart, &jsonLen);//.............省略部分代码..............// If here it means that jsonStart points to the beginning of a JSON string// with jsonLen bytes length, so recalculate readLen.readLen =reinterpret_cast<const uint8_t*>(jsonStart) - (this->buffer + this->msgStart) + jsonLen + 1;Json::Value json;std::string jsonParseError;if (this->jsonReader->parse((const char*)jsonStart, (const char*)jsonStart + jsonLen, &json, &jsonParseError)){Channel::Request* request = nullptr;try{request = new Channel::Request(this, json);}catch (const MediaSoupError& error){MS_ERROR_STD("discarding wrong Channel request");}if (request != nullptr){// Notify the listener.this->listener->OnChannelRequest(this, request);// Delete the Request.delete request;}//.............省略部分代码.................}}

channel创建完成,至此,跳转到worker.cpp中的OnChannelRequest接口。mediasoup监听channel信令并根据request->methodId分类处理
根据request->methodId,分别执行不同的业务
request->methodId有如下分类

	std::unordered_map<std::string, Request::MethodId> Request::string2MethodId ={{ "worker.dump",                       Request::MethodId::WORKER_DUMP                          },{ "worker.updateSettings",             Request::MethodId::WORKER_UPDATE_SETTINGS               },{ "worker.createRouter",               Request::MethodId::WORKER_CREATE_ROUTER                 },{ "router.close",                      Request::MethodId::ROUTER_CLOSE                         },{ "router.dump",                       Request::MethodId::ROUTER_DUMP                          },{ "router.createWebRtcTransport",      Request::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT       },{ "router.createPlainRtpTransport",    Request::MethodId::ROUTER_CREATE_PLAIN_RTP_TRANSPORT    },{ "router.createProducer",             Request::MethodId::ROUTER_CREATE_PRODUCER               },{ "router.createConsumer",             Request::MethodId::ROUTER_CREATE_CONSUMER               },{ "router.setAudioLevelsEvent",        Request::MethodId::ROUTER_SET_AUDIO_LEVELS_EVENT        },{ "transport.close",                   Request::MethodId::TRANSPORT_CLOSE                      },{ "transport.dump",                    Request::MethodId::TRANSPORT_DUMP                       },{ "transport.getStats",                Request::MethodId::TRANSPORT_GET_STATS                  },{ "transport.setRemoteDtlsParameters", Request::MethodId::TRANSPORT_SET_REMOTE_DTLS_PARAMETERS },{ "transport.setRemoteParameters",     Request::MethodId::TRANSPORT_SET_REMOTE_PARAMETERS      },{ "transport.setMaxBitrate",           Request::MethodId::TRANSPORT_SET_MAX_BITRATE            },{ "transport.changeUfragPwd",          Request::MethodId::TRANSPORT_CHANGE_UFRAG_PWD           },{ "transport.startMirroring",          Request::MethodId::TRANSPORT_START_MIRRORING            },{ "transport.stopMirroring",           Request::MethodId::TRANSPORT_STOP_MIRRORING             },{ "producer.close",                    Request::MethodId::PRODUCER_CLOSE                       },{ "producer.dump",                     Request::MethodId::PRODUCER_DUMP                        },{ "producer.getStats",                 Request::MethodId::PRODUCER_GET_STATS                   },{ "producer.pause",                    Request::MethodId::PRODUCER_PAUSE                       },{ "producer.resume" ,                  Request::MethodId::PRODUCER_RESUME                      },{ "producer.setPreferredProfile",      Request::MethodId::PRODUCER_SET_PREFERRED_PROFILE       },{ "consumer.close",                    Request::MethodId::CONSUMER_CLOSE                       },{ "consumer.dump",                     Request::MethodId::CONSUMER_DUMP                        },{ "consumer.getStats",                 Request::MethodId::CONSUMER_GET_STATS                   },{ "consumer.enable",                   Request::MethodId::CONSUMER_ENABLE                      },{ "consumer.pause",                    Request::MethodId::CONSUMER_PAUSE                       },{ "consumer.resume",                   Request::MethodId::CONSUMER_RESUME                      },{ "consumer.setPreferredProfile",      Request::MethodId::CONSUMER_SET_PREFERRED_PROFILE       },{ "consumer.setEncodingPreferences",   Request::MethodId::CONSUMER_SET_ENCODING_PREFERENCES    },{ "consumer.requestKeyFrame",          Request::MethodId::CONSUMER_REQUEST_KEY_FRAME           }};

下一章节介绍mediasoup如何将信令返回值及其他通知信息推送到信令服务,敬请期待!

这篇关于mediasoup源码分析--channel创建及信令交互的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1071855

相关文章

Java 虚拟线程的创建与使用深度解析

《Java虚拟线程的创建与使用深度解析》虚拟线程是Java19中以预览特性形式引入,Java21起正式发布的轻量级线程,本文给大家介绍Java虚拟线程的创建与使用,感兴趣的朋友一起看看吧... 目录一、虚拟线程简介1.1 什么是虚拟线程?1.2 为什么需要虚拟线程?二、虚拟线程与平台线程对比代码对比示例:三

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Linux创建服务使用systemctl管理详解

《Linux创建服务使用systemctl管理详解》文章指导在Linux中创建systemd服务,设置文件权限为所有者读写、其他只读,重新加载配置,启动服务并检查状态,确保服务正常运行,关键步骤包括权... 目录创建服务 /usr/lib/systemd/system/设置服务文件权限:所有者读写js,其他

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

idea+spring boot创建项目的搭建全过程

《idea+springboot创建项目的搭建全过程》SpringBoot是Spring社区发布的一个开源项目,旨在帮助开发者快速并且更简单的构建项目,:本文主要介绍idea+springb... 目录一.idea四种搭建方式1.Javaidea命名规范2JavaWebTomcat的安装一.明确tomcat

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe