SRS4.0源码分析-RTMP入口

2024-06-24 01:48
文章标签 分析 源码 入口 rtmp srs4.0

本文主要是介绍SRS4.0源码分析-RTMP入口,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SRS 的社群来了,想加入微信社群的朋友请购买《SRS原理》电子书,里有更高级的内容与答疑服务。


本文采用的 SRS 版本是 4.0-b8 , 下载地址:github


上篇文章 《SRS4.0源码分析-main》 讲解了 SRS main 函数的基本流程,但是可能有些朋友还是比较懵逼。说实话,其实对于SRS的具体逻辑流程,我现在也是比较懵逼。在这里,分享一个研究开源项目源码的经验,怎么快速跳出这种懵逼的状态。

首先,研究一个项目的源码,需要有一个目标。例如之前《RTMP协议分析》的系列文章,已经讲解了一个 RTMP 客户端的实现,我研究 SRS 的源码就是为了看一下 RTMP 服务器端是如何实现的。这就是一个目标。

如果茫无目的地看源码,把日志处理,配置加载,等等源码都看一遍,虽然能看完,但是印象会不太深刻。因为没有目标跟需求驱动。

确认了目标是 RTMP 服务器的实现原理之后,后续要做的事情就比较简单了,首先找到 SRS 的 RTMP 业务的入口,因为 SRS 是多个功能混合的,他支持 RTMP,SRT,webrtc。在你了解 RTMP 业务入口的同时,其他功能的入口也会找到。

同时,在研究 RTMP 服务器端逻辑的时候,各种 日志处理,配置加载,也会一并了解。


下面就开始寻找 RTMP 服务器的入口。先分析 run_directly_or_daemon() 函数,这个函数的代码比较简单,就不画流程图,直接截图代码。请看下图:

上图代码有几个重点:

1,用了比较多的srs_trace() 函数来记录日志。srs_trace() 函数会往 srs.log 写入一条日志,具体请看《SRS4.0源码分析-日志处理》

2,SRS的守护进程没用 setsid() 跟 umask(022) ,也就是当前进程没有脱离 从父进程继承 的 SessionID、进程组ID和打开的终端,SRS 为什么不脱离我也不太清楚。这个问题,我后续问下杨成立大神。

3,调用 run_hybrid_server(),重点就是 run_hybrid_server 函数。

run_directly_or_daemon() 函数分析完毕。


下面继续 看 run_hybrid_server() 函数的实现,流程图如下:

上图主要有几个重点:

1,利用 依赖注入 把 SrsSrtRtc 的 Adapter 注入给 _srs_hybrid。里面其实是一个 vector,std::vector<ISrsHybridServer*> servers

2,然后初始化 _srs_hybrid,SRS 是一个混合的服务器,他结合了 RTMP,SRT,webrtc。所以叫 hybird。

3,_srs_circuit_breaker 具体的作用后面补充,可能是类似一个 watchdog 的机制。(TODO)

4,_srs_hybrid->run() 应该就会开启协程,然后一直阻塞在这里。


分析到这里,还没找到 RTMP 的入口。继续分析 _srs_hybrid->run() 函数。请看下图:

_srs_hybrid->run() 的代码比较简单,就是遍历之前注入的 vector,然后执行他们的 run 函数。RTMP 应该是在 SrsServerAdapter 里面处理的,而不是 Srt 或者 RTC 的 Adapter 。所以只需要找 SrsServerAdapter 的 run 函数就行。为了便于理解,先画个结构图,如下

上图的 SrsServer 有非常多的变量跟函数,我只画出一部分的重点。

接下来讲解一下 class ISrsHybridServer 这个类。代码如下:

上图的重点是 = 0 这种语法是纯虚函数的写法,意思是把这个函数指针赋值为 0。推荐阅读《C++ 虚函数和纯虚函数的区别》。

定义一个函数为虚函数,不代表函数为不被实现的函数。

定义他为虚函数是为了允许用基类的指针来调用子类的这个函数。

定义一个函数为纯虚函数,才代表函数没有被实现。

定义纯虚函数是为了实现一个接口,起到一个规范的作用,规范继承这个类的程序员必须实现这个函数。

SrsSrtRtc 都会继承这个 ISrsHybridServer 类,实现 initialize (初始化),run (运行),stop(停止)函数。

SrsServerAdapter 的 run 函数实现代码如下:

rs_error_t SrsServerAdapter::run(SrsWaitGroup* wg)
{srs_error_t err = srs_success;
​// Initialize the whole system, set hooks to handle server level events.if ((err = srs->initialize(NULL)) != srs_success) {return srs_error_wrap(err, "server initialize");}
​if ((err = srs->initialize_st()) != srs_success) {return srs_error_wrap(err, "initialize st");}
​if ((err = srs->acquire_pid_file()) != srs_success) {return srs_error_wrap(err, "acquire pid file");}
​if ((err = srs->initialize_signal()) != srs_success) {return srs_error_wrap(err, "initialize signal");}
​if ((err = srs->listen()) != srs_success) {return srs_error_wrap(err, "listen");}
​if ((err = srs->register_signal()) != srs_success) {return srs_error_wrap(err, "register signal");}
​if ((err = srs->http_handle()) != srs_success) {return srs_error_wrap(err, "http handle");}
​if ((err = srs->ingest()) != srs_success) {return srs_error_wrap(err, "ingest");}
​if ((err = srs->start(wg)) != srs_success) {return srs_error_wrap(err, "start");}
​return err;
}

从上面代码可以看出,run 里面调了相当多的 class SrsServer 里面的函数。如下:

1,srs->initialize() ,这个函数里面初始了几个 http 服务器,但是还没开始 listen

2,srs->initialize_st(),这个函数 跟 st 库没有关系,主要是 对 supervisor 的场景做处理。

3,srs->acquire_pid_file(),生成 pid 进程文件。

4,srs->initialize_signal(),对信号做处理,应该会把信号转成 IO 事情。

5,srs->listen(),开始监听端口了,listen fd 会保存在对象里面,一个协程监听一个listen fd。

6,srs->register_signal(),还是跟信号有关,请看后续文章《SRS4.0源码分析-信号处理》

7,srs->http_handle() 处理HTTP 请求。

8,srs->ingest(),如果我没猜错,应该是那个 web 后台服务。

9,srs->start(wg),启动,这个 wg 是重点,后面会分析。


上面一共调了 9 个函数,但是实际上只有 两个 重点函数,srs->listen() 跟 srs->start(wg)

srs->http_handle() 虽然也是重点,但是本文主要是找到 RTMP 的入口,所以 http 相关的分析,请看后续文章《SRS4.0源码分析-HTTP》。

SRS 的 http 服务器好像是自己写的, http 报文的解析在 trunk\src\protocol\srs_http_stack.hpp 文件


srs->listen(),这个函数是重中之中,开始监听端口了,代码如下:

先做个简单的介绍,SRS 启动之后,只看到一个进程,而且搜索源代码,也没发现 pthread_create() 的函数在 SRS的代码里面出现,也就是说 SRS 的所有业务都是基于 ST 协程实现的,没有用线程。

上图把 往 Srsserver 类的 private 变量 std::vector<SrsListener*> listeners 插数据,因为 RTMP 可以监听多个IP跟端口。然后调 SrsBufferListener::listen(),然后再调 SrsTcpListener::listen(),这个链路有点长,我画个流程图,如下:

所以重点 在TCP 的listen 函数里面,RTMP 是基于 TCP 的,所以肯定是 会listen 一个 tcp 的 fd,现在就深入看 SrsTcpListener::listen()

如上图,SrsTcpListener 类里面有个 变量 srs_netfd_t lfdl 是 listen 的缩写。这个 srs_netfd_t 实际上就是 st 库里面的 st_netfd_t,只是换了个命名。SRS 代码的数据结构,有很多都是用 st 的数据结构,例如 条件变量,互斥锁,等等。

下面正式开始看 SrsTcpListener::listen() 函数,请看代码:

这个函数的调用链还是有点长,我还是画个流程图吧。

上图最重要的其实是 srs_netfd_open_socket() 这个其实是 st 库的函数。在之前专栏《state-thread源码分析》有讲过这个函数。请看下图

srs_tcp_listen() 函数执行完之后,就已经拿到了 ST 库的 netfd,就会开始创建协程。SrsSTCoroutine 继承 SrsFastCoroutine,所以这里创建协程使用的是 SrsFastCoroutine::start() 函数,SrsFastCoroutine 类里面有个 srs_thread_t ,这实际跟 ST 库的 _st_thread_t 是一样的。

SrsFastCoroutine 类就是对 ST 库的 协程做封装,请看下图:

从上图可以看到,start() 函数里面调的就是 ST库的创建协程的函数。所以 srs_tcp_listen() 函数执行完之后,就已经拿到了 ST 库的 netfd,就调 SrsFastCoroutine::start() 创建一个协程。注意这里 _pfn_st_thread_create() 传递的 是 pfn, 所以协程的 start 函数 是 SrsFastCoroutine::pfn(),上下文切换的时候,这个协程会从 SrsFastCoroutine::pfn() 函数开始执行。协程 start 函数 的参数是 this,就是对象自己

至此,虽然还有一点东西没讲,但是整体的流程图已经可以画出来了,如下:

从上图可以看出,listen_rtmp()listen_http_api(),等等函数都会创建一个协程SrsServer::listen() 执行完之后,一共创建了 7 个协程。但是这 7个协程还未开始运行。因为还没开始切换上下文。

最后还有一个重点函数SrsServer::start(),代码如下:

SrsServer::start() 函数实际上是把 自己 也变成 一个协程,丢进去 RUNQ 里面了,this 是 SrsServer 对象。

此时此刻,协程还是没开始运行。

上面流程图中的 SrtServerAdapter::run() 是创建 SRT 相关的协程, RtcServerAdapter::run() 是创建 RTC 相关的协程,这些不是本文重点,不用管。

此时此刻,协程还是没开始运行。那什么时候协程开始运行?在上面截图中,SrsServer::start() 函数最后有两句代码:

// OK, we start SRS server.
wg_ = wg;
wg->add(1);

每个 Server start的时候都会往 wg add 一下。

上面的流程图,我用绿色画出了一个框,wg.wait(),真正开始切换上下文,让之前创建的协程全部跑起来,我猜测就是在这里做的。

SrsWaitGroup::wait() 的实现非常简单,就是等待一个协程条件变量。

void SrsWaitGroup::wait()
{if (nn_ > 0) {srs_cond_wait(done_);}
}

注意,这里的 srs_cond_wait() 会让当前协程阻塞,实际上是切换到其他地方开始执行,这是 ST 的函数,ST 的阻塞函数就会导致上下文切换,进入 _st_vp_schedule() ,开始把之前创建的协程拿出来,一一运行起来。关于 ST库的分析,请看 《ST源码分析》专栏。


代码运行到 wg.wait() 的时候,之前的协程已经开始跑起来,那 RTMP 会在哪个地方跑起来呢?之前说过,协程 start 函数 是 SrsFastCoroutine::pfn(),所以 RTMP 的业务会在这个函数 pfn() 函数跑起来,实际上SRT ,RTC也是在这个 pfn() 函数跑起来的。

下面继续 分析 SrsFastCoroutine::pfn() 函数的实现,

从上图可以看到,pfn() 实际上是调了 子类的 cycle() 来循环处理业务。那 RTMP 业务的子类是啥?是 SrsTcpListener ,所以需要看 SrsTcpListener 的 cycle 实现。请看下图

从上图可以看出, 直接用 ST 库的函数 srs_accept() 阻塞,等待 tcp 客户端来。然后丢给 handler 的 on_tcp_client 处理逻辑。当初初始化 RTMP 的时候,handler 传的是什么?请看下图:

从上图可以看到 传的是 this,肯定又是子类传参法。所以 handler->on_tcp_client() 的实现如下:

srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{srs_error_t err = server->accept_client(type, stfd);if (err != srs_success) {srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());srs_freep(err);}return srs_success;
}

现在又有一个疑问,上面的 server 是什么?请看下图:

从上图可以看到,传的是this,所以 server 就是 SrsServer,所以 RTMP 接受到一个 tcp 客户端 fd 的时候,就会执行 SrsServer::accept_client() 函数

下面开始分析 SrsServer::accept_client() 函数,代码如下:

上图的重点是 fd_to_resource() 函数,代码如下:

此时此刻,已经追踪到了 rtmp 连接的处理入口,就是 new SrsRtmpConn()。从上图能看到,RTMP,HTTP 是同一个地方处理的。所以 SrsServer 实际上就是处理 RTMP,http,等请求的。

从 pfn() 到 cycle() 到 srs_accept() 再到 rtmp 的入口,这个链条有点长,我画个流程图便于理解。

到这里,已经找到 RTMP 业务的入口了,就是 new SrsRtmpConn(),下一篇文章 《SRS4.0源码分析-建立RTMP链接》 开始分析 RTMP 链接的建立。

扩展知识:

1,::bind ,::listen,前面带了 两个冒号。应该是代表使用原始的函数。

相关阅读:

1,《SRS3.0源码分析-夏立新》


由于笔者的水平有限, 加之编写的同时还要参与开发工作,文中难免会出现一些错误或者不准确的地方,恳请读者批评指正。

这篇关于SRS4.0源码分析-RTMP入口的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088854

相关文章

Nginx分布式部署流程分析

《Nginx分布式部署流程分析》文章介绍Nginx在分布式部署中的反向代理和负载均衡作用,用于分发请求、减轻服务器压力及解决session共享问题,涵盖配置方法、策略及Java项目应用,并提及分布式事... 目录分布式部署NginxJava中的代理代理分为正向代理和反向代理正向代理反向代理Nginx应用场景

Redis中的有序集合zset从使用到原理分析

《Redis中的有序集合zset从使用到原理分析》Redis有序集合(zset)是字符串与分值的有序映射,通过跳跃表和哈希表结合实现高效有序性管理,适用于排行榜、延迟队列等场景,其时间复杂度低,内存占... 目录开篇:排行榜背后的秘密一、zset的基本使用1.1 常用命令1.2 Java客户端示例二、zse

Redis中的AOF原理及分析

《Redis中的AOF原理及分析》Redis的AOF通过记录所有写操作命令实现持久化,支持always/everysec/no三种同步策略,重写机制优化文件体积,与RDB结合可平衡数据安全与恢复效率... 目录开篇:从日记本到AOF一、AOF的基本执行流程1. 命令执行与记录2. AOF重写机制二、AOF的

MyBatis Plus大数据量查询慢原因分析及解决

《MyBatisPlus大数据量查询慢原因分析及解决》大数据量查询慢常因全表扫描、分页不当、索引缺失、内存占用高及ORM开销,优化措施包括分页查询、流式读取、SQL优化、批处理、多数据源、结果集二次... 目录大数据量查询慢的常见原因优化方案高级方案配置调优监控与诊断总结大数据量查询慢的常见原因MyBAT

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

Java中最全最基础的IO流概述和简介案例分析

《Java中最全最基础的IO流概述和简介案例分析》JavaIO流用于程序与外部设备的数据交互,分为字节流(InputStream/OutputStream)和字符流(Reader/Writer),处理... 目录IO流简介IO是什么应用场景IO流的分类流的超类类型字节文件流应用简介核心API文件输出流应用文

java 恺撒加密/解密实现原理(附带源码)

《java恺撒加密/解密实现原理(附带源码)》本文介绍Java实现恺撒加密与解密,通过固定位移量对字母进行循环替换,保留大小写及非字母字符,由于其实现简单、易于理解,恺撒加密常被用作学习加密算法的入... 目录Java 恺撒加密/解密实现1. 项目背景与介绍2. 相关知识2.1 恺撒加密算法原理2.2 Ja

Nginx屏蔽服务器名称与版本信息方式(源码级修改)

《Nginx屏蔽服务器名称与版本信息方式(源码级修改)》本文详解如何通过源码修改Nginx1.25.4,移除Server响应头中的服务类型和版本信息,以增强安全性,需重新配置、编译、安装,升级时需重复... 目录一、背景与目的二、适用版本三、操作步骤修改源码文件四、后续操作提示五、注意事项六、总结一、背景与

Android实现图片浏览功能的示例详解(附带源码)

《Android实现图片浏览功能的示例详解(附带源码)》在许多应用中,都需要展示图片并支持用户进行浏览,本文主要为大家介绍了如何通过Android实现图片浏览功能,感兴趣的小伙伴可以跟随小编一起学习一... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码