srs集群下行edge处理逻辑

2024-02-22 17:28

本文主要是介绍srs集群下行edge处理逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{srs_error_t err = srs_success;consumer = new SrsLiveConsumer(this, conn);consumers.push_back(consumer);if (conn != NULL) {conn->srsConsumer = consumer;}// There should be one consumer, so reset the timeout.stream_die_at_ = 0;publisher_idle_at_ = 0;//通过配置文件中的参数,判断是否是边缘服务器//如果是边缘服务器,则调用 play_edge进行拉流播放//SrsPlayEdge* play_edge;// for edge, when play edge stream, check the stateif (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{srs_error_t err = srs_success;//SrsEdgeIngester ingester 启动一个新的协程去源站拉流// start ingest when init state.if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay;err = ingester->start();} else if (state == SrsEdgeStateIngestStopping) {return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");}return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{srs_error_t err = srs_success;if ((err = source->on_publish()) != srs_success) {return srs_error_wrap(err, "notify source");}srs_freep(trd);trd = new SrsSTCoroutine("edge-igs", this);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "coroutine");}return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "do cycle pull");}// Use protocol in config.string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);// If follow client protocol, change to protocol of client.bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);if (follow_client && !req->protocol.empty()) {edge_protocol = req->protocol;}// Create object by protocol.srs_freep(upstream);//根据边缘协议创建对应的拉流类if (edge_protocol == "flv" || edge_protocol == "flvs") {upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");} else {upstream = new SrsEdgeRtmpUpstream(redirect);}if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {return srs_error_wrap(err, "on source id changed");}//边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取//其中一个节点进行拉流//这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?//其实如果发现连接的源站没有流,会触发302 redirect重连逻辑if ((err = upstream->connect(req, lb)) != srs_success) {return srs_error_wrap(err, "connect upstream");}if ((err = edge->on_ingest_play()) != srs_success) {return srs_error_wrap(err, "notify edge play");}// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);//拉流处理函数err = ingest(redirect);if (srs_is_client_gracefully_close(err)) {srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());srs_error_reset(err);}break;}}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{//第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台//如果连接的源站没有流,触发302,再连接另一台if (redirect_depth == 0) {SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);// @see https://github.com/ossrs/srs/issues/79// when origin is error, for instance, server is shutdown,// then user remove the vhost then reload, the conf is empty.if (!conf) {return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());}// select the origin.std::string server = lb->select(conf->args);int port = SRS_DEFAULT_HTTP_PORT;if (schema_ == "https") {port = SRS_DEFAULT_HTTPS_PORT;}srs_parse_hostport(server, server, port);// Remember the current selected server.selected_ip = server;selected_port = port;} else {// If HTTP redirect, use the server in location.schema_ = req->schema;selected_ip = req->host;selected_port = req->port;}sdk_ = new SrsHttpClient();if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}//如果状态码为302,开启重连另一台逻辑string location;if (hr_->status_code() == 302) {//获取302返回的地址location = hr_->header()->get("Location");}srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());if (hr_->status_code() == 302) {//最多重试三次if (redirect_depth >= 3) {return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);}string app;string stream_name;if (true) {string tcUrl;srs_parse_rtmp_url(location, tcUrl, stream_name);int port;string schema, host, vhost, param;srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);r->schema = schema; r->host = host; r->port = port;r->app = app; r->stream = stream_name; r->param = param;}//重连return do_connect(r, lb, redirect_depth + 1);}
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "thread quit");}pprint->elapse();// pithy printif (pprint->can_print()) {upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());}// read from client.SrsCommonMessage* msg = NULL;//upstream拉流if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");}srs_assert(msg);SrsAutoFree(SrsCommonMessage, msg);//处理拉到的流if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{srs_error_t err = srs_success;// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "source consume audio");}}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "source consume video");}}}

这篇关于srs集群下行edge处理逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/735978

相关文章

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

一文详解Java异常处理你都了解哪些知识

《一文详解Java异常处理你都了解哪些知识》:本文主要介绍Java异常处理的相关资料,包括异常的分类、捕获和处理异常的语法、常见的异常类型以及自定义异常的实现,文中通过代码介绍的非常详细,需要的朋... 目录前言一、什么是异常二、异常的分类2.1 受检异常2.2 非受检异常三、异常处理的语法3.1 try-

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Java Response返回值的最佳处理方案

《JavaResponse返回值的最佳处理方案》在开发Web应用程序时,我们经常需要通过HTTP请求从服务器获取响应数据,这些数据可以是JSON、XML、甚至是文件,本篇文章将详细解析Java中处理... 目录摘要概述核心问题:关键技术点:源码解析示例 1:使用HttpURLConnection获取Resp

Java中Switch Case多个条件处理方法举例

《Java中SwitchCase多个条件处理方法举例》Java中switch语句用于根据变量值执行不同代码块,适用于多个条件的处理,:本文主要介绍Java中SwitchCase多个条件处理的相... 目录前言基本语法处理多个条件示例1:合并相同代码的多个case示例2:通过字符串合并多个case进阶用法使用

Java实现优雅日期处理的方案详解

《Java实现优雅日期处理的方案详解》在我们的日常工作中,需要经常处理各种格式,各种类似的的日期或者时间,下面我们就来看看如何使用java处理这样的日期问题吧,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言一、日期的坑1.1 日期格式化陷阱1.2 时区转换二、优雅方案的进阶之路2.1 线程安全重构2

Python处理函数调用超时的四种方法

《Python处理函数调用超时的四种方法》在实际开发过程中,我们可能会遇到一些场景,需要对函数的执行时间进行限制,例如,当一个函数执行时间过长时,可能会导致程序卡顿、资源占用过高,因此,在某些情况下,... 目录前言func-timeout1. 安装 func-timeout2. 基本用法自定义进程subp