您的位置:首页 > 其它

SRS(simple-rtmp-server)流媒体服务器源码分析--启动

2017-10-12 17:33 543 查看


SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动

一、前言

小卒最近看SRS源码,随手写下博客,其一为了整理思路,其二也是为日后翻看方便。如果不足之处,请指教!

首先总结一下SRS源码的优点:

1、轻量级,代码结构清楚,目前SRS3.0代码8万行左右,但几乎满足直播业务的所有要求。

2、SRS采用State Threads,支持高并发量,高性能。

3、SRS支持rtmp和hls,满足PC和移动直播要求。

4、SRS支持集群部署。小集群Forward,大集群edge。

代码分析可分为两个阶段:

一:分析代码框架,理清楚组织流程

二:分析代码细节,熟悉SRS工作原理

二、代码分析

相关SRS源码其他总结:

SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动

SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP消息play

SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP信息Publish

SRS(simple-rtmp-server)流媒体服务器源码分析--HLS切片

现阶段,我主要以代码框架梳理为主。Srs源码框架如下图:



系统在启动时,初始化相关类,监听相关端口,若来一个访问请求,则为该链接创建一个线程,专门处理与该链接的操作。
main函数在srs_main_server.cpp这个文件中。在main函数中,启动参数在这里不做过多介绍。直接从run()-> run_master()看起。

[html] view
plain copy

int run_master()

{

int ret = ERROR_SUCCESS;

if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {

return ret;

}

//将pid进程号写进文件

if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {

return ret;

}

//客户端监听

if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {

return ret;

}

return 0;

}

进入客户监听

[html] view
plain copy

if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {

return ret;

}

监听内容: 不同的连接请求,有不同的监听。

[html] view
plain copy

int SrsServer::listen()

{

int ret = ERROR_SUCCESS;

// 创建一个rtmp的Streamlistener

if ((ret = listen_rtmp()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = listen_http_api()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = listen_http_stream()) != ERROR_SUCCESS) {

return ret;

}

if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {

return ret;

}

return ret;

}

1、首先分析RTMP连接

[html] view
plain copy

int SrsServer::listen_rtmp()

{

int ret = ERROR_SUCCESS;

// stream service port.

std::vector<std::string> ip_ports = _srs_config->get_listens();

srs_assert((int)ip_ports.size() > 0);

close_listeners(SrsListenerRtmpStream);

for (int i = 0; i < (int)ip_ports.size(); i++) {

SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);

listeners.push_back(listener);

std::string ip;

int port;

srs_parse_endpoint(ip_ports[i], ip, port);

if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {

srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);

return ret;

}

}

return ret;

}

这里是listen_rtmp()函数,你也可以去看看listen_http_api()函数、listen_http_stream()函数,其实结构都很相似,只是在创建SrsStreamListener对象时,传入了不同的参数SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同类型的监听对象。

[html] view
plain copy

// listen_rtmp 中listen监听走这里了。

int SrsStreamListener::listen(string i, int p)

{

int ret = ERROR_SUCCESS;

ip = i;

port = p;

srs_freep(listener);

listener = new SrsTcpListener(this, ip, port);

if ((ret = listener->listen()) != ERROR_SUCCESS) {

srs_error("tcp listen failed. ret=%d", ret);

return ret;

}

srs_info("listen thread current_cid=%d, "

"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",

_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);

srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());

return ret;

}

注意,这里有大量纯虚函数,不要走错路了。进入TCP监听代码

[html] view
plain copy

// rtmp tcp监听

int SrsTcpListener::listen()

{

//C++ Socket编程

int ret = ERROR_SUCCESS;

// 1、创建套接字,流式Socket(SOCK_STREAM)

if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {

ret = ERROR_SOCKET_CREATE;

srs_error("create linux socket error. port=%d, ret=%d", port, ret);

return ret;

}

srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);

int reuse_socket = 1;

if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {

ret = ERROR_SOCKET_SETREUSE;

srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);

return ret;

}

srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);

sockaddr_in addr;

addr.sin_family = AF_INET;

addr.sin_port = htons(port);

addr.sin_addr.s_addr = inet_addr(ip.c_str());

// 2、绑定套接字到一个IP地址和一个端口上

if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {

ret = ERROR_SOCKET_BIND;

srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);

return ret;

}

srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

// 3、将套接字设置为监听模式等待连接请求

if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {

ret = ERROR_SOCKET_LISTEN;

srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);

return ret;

}

srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

if ((_stfd = st_netfd_open_socket(_fd)) == NULL){

ret = ERROR_ST_OPEN_SOCKET;

srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);

return ret;

}

srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);

// 4、等到连接一个客户之后,开启一个新的线程

if ((ret = pthread->start()) != ERROR_SUCCESS) {

srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);

return ret;

}

srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);

return ret;

}

此代码为C++ TCP Socket代码,思路比较清晰,可以看到,每接受到一个rtmp访问请求,创建一个”线程“,这里暂时将其称为线程,后面再做具体介绍。创建线程代码如下:

[html] view
plain copy

int SrsReusableThread::start()

{

return pthread->start();

}

[cpp] view
plain copy

int SrsThread::start()

{

int ret = ERROR_SUCCESS;

if(tid) {

srs_info("thread %s already running.", _name);

return ret;

}

if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){

ret = ERROR_ST_CREATE_CYCLE_THREAD;

srs_error("st_thread_create failed. ret=%d", ret);

return ret;

}

disposed = false;

// we set to loop to true for thread to run.

loop = true;

// wait for cid to ready, for parent thread to get the cid.

while (_cid < 0) {

st_usleep(10 * 1000);

}

// now, cycle thread can run.

can_run = true;

return ret;

}

来到了st_thread_create,这里要注意,这是SRS开源项目具有高并发,高性能的重要一步。这里创建的是协程,不是线程。协程是有别于进程和线程的一种组件,具有进程的独立性和线程的轻量级,听说微信能够支持8亿用户量,也是采用协程这种网络服务框架:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。

从这里可以看出,srs是一个单线程的服务器,采用协程,主持高并发,高性能。

创建协程,协程函数为:thread_fun()

[html] view
plain copy

// 每连链接一个用户,创建一个协程程,该函数为协程函数

void* SrsThread::thread_fun(void* arg)

{

SrsThread* obj = (SrsThread*)arg;

srs_assert(obj);

// 进入线程循环

obj->thread_cycle();

// for valgrind to detect.

SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);

if (ctx) {

ctx->clear_cid();

}

st_thread_exit(NULL);

return NULL;

}

此时,真正进入了协程循环处理

[html] view
plain copy

void SrsThread::thread_cycle()

{

int ret = ERROR_SUCCESS;

_srs_context->generate_id();

srs_info("thread %s cycle start", _name);

_cid = _srs_context->get_id();

srs_assert(handler);

handler->on_thread_start();

// thread is running now.

really_terminated = false;

// wait for cid to ready, for parent thread to get the cid.

while (!can_run && loop) {

st_usleep(10 * 1000);

}

while (loop) {

if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {

srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);

goto failed;

}

srs_info("thread %s on before cycle success", _name);

// 注意纯虚函数的应用

if ((ret = handler->cycle()) != ERROR_SUCCESS) {

if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {

srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);

}

goto failed;

}

srs_info("thread %s cycle success", _name);

if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {

srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);

goto failed;

}

srs_info("thread %s on end cycle success", _name);

failed:

if (!loop) {

break;

}

// to improve performance, donot sleep when interval is zero.

// @see: https://github.com/ossrs/srs/issues/237
if (cycle_interval_us != 0) {

st_usleep(cycle_interval_us);

}

}

// readly terminated now.

really_terminated = true;

handler->on_thread_stop();

srs_info("thread %s cycle finished", _name);

}

至此,一定要熟悉C++纯虚函数的引用,本人刚学了几天C++,对虚函数和纯虚函数在SRS源码中的应用很不习惯! 好了,进入循环ret = handler->cycle()

[html] view
plain copy

int SrsConnection::cycle()

{

int ret = ERROR_SUCCESS;

_srs_context->generate_id();

id = _srs_context->get_id();

ip = srs_get_peer_ip(st_netfd_fileno(stfd));

//srs_trace("ip:%s", ip);

ret = do_cycle();

// if socket io error, set to closed.

if (srs_is_client_gracefully_close(ret)) {

ret = ERROR_SOCKET_CLOSED;

}

// success.

if (ret == ERROR_SUCCESS) {

srs_trace("client finished.");

}

// client close peer.

if (ret == ERROR_SOCKET_CLOSED) {

srs_warn("client disconnect peer. ret=%d", ret);

}

return ERROR_SUCCESS;

}

进入ret=do_cycle();

[cpp] view
plain copy

// TODO: return detail message when error for client.

int SrsRtmpConn::do_cycle()

{

int ret = ERROR_SUCCESS;

srs_trace("RTMP client ip=%s", ip.c_str());

rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);

rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);

if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {

srs_error("rtmp handshake failed. ret=%d", ret);

return ret;

}

srs_verbose("rtmp handshake success");

if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {

srs_error("rtmp connect vhost/app failed. ret=%d", ret);

return ret;

}

srs_verbose("rtmp connect app success");

// set client ip to request.

req->ip = ip;

// discovery vhost, resolve the vhost from config

SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);

if (parsed_vhost) {

req->vhost = parsed_vhost->arg0();

}

srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",

req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());

if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {

ret = ERROR_RTMP_REQ_TCURL;

srs_error("discovery tcUrl failed. "

"tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",

req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);

return ret;

}

// check vhost

if ((ret = check_vhost()) != ERROR_SUCCESS) {

srs_error("check vhost failed. ret=%d", ret);

return ret;

}

srs_verbose("check vhost success.");

srs_trace("connect app, "

"tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s",

req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),

req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),

req->app.c_str(), (req->args? "(obj)":"null"));

// show client identity

if(req->args) {

std::string srs_version;

std::string srs_server_ip;

int srs_pid = 0;

int srs_id = 0;

SrsAmf0Any* prop = NULL;

if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {

srs_version = prop->to_str();

}

if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {

srs_server_ip = prop->to_str();

}

if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {

srs_pid = (int)prop->to_number();

}

if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {

srs_id = (int)prop->to_number();

}

srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d",

srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);

if (srs_pid > 0) {

srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",

srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);

}

}

ret = service_cycle();

http_hooks_on_close();

return ret;

}

行了,rtmp连接就到这里,要不然都快到rtmp流接受代码了,和系统启动越走越远了,rtmp流接受后面再分析。

2、再分析http-api连接,回到int SrsServer::listen()函数中,梳理http-api链接

[html] view
plain copy

int SrsServer::listen_http_api()

{

int ret = ERROR_SUCCESS;

#ifdef SRS_AUTO_HTTP_API

close_listeners(SrsListenerHttpApi);

if (_srs_config->get_http_api_enabled()) {

SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi);

listeners.push_back(listener);

std::string ep = _srs_config->get_http_api_listen();

std::string ip;

int port;

srs_parse_endpoint(ep, ip, port);

if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {

srs_error("HTTP api listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);

return ret;

}

}

#endif

return ret;

}

listen_http_api()函数和listen_rtmp()函数内容非常像,再走到listener->listen()里面看看,结果来到了

[html] view
plain copy

int SrsStreamListener::listen(string i, int p)

{

int ret = ERROR_SUCCESS;

ip = i;

port = p;

srs_freep(listener);

listener = new SrsTcpListener(this, ip, port);

if ((ret = listener->listen()) != ERROR_SUCCESS) {

srs_error("tcp listen failed. ret=%d", ret);

return ret;

}

srs_info("listen thread current_cid=%d, "

"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",

_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);

srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());

return ret;

}

和rtmp链接监听机制完全一样,只是type不同而已

[html] view
plain copy

enum SrsListenerType

{

// RTMP client,

SrsListenerRtmpStream = 0,

// HTTP api,

SrsListenerHttpApi = 1,

// HTTP stream, HDS/HLS/DASH

SrsListenerHttpStream = 2,

// UDP stream, MPEG-TS over udp.

SrsListenerMpegTsOverUdp = 3,

// TCP stream, RTSP stream.

SrsListenerRtsp = 4,

// TCP stream, FLV stream over HTTP.

SrsListenerFlv = 5,

};

我就看了两个链接监听,监听到此为止。

3、http api回调注册

回到run_master()函数中,从_srs_server->http_handle()看起。

[html] view
plain copy

int SrsServer::http_handle()

{

int ret = ERROR_SUCCESS;

#ifdef SRS_AUTO_HTTP_API

srs_assert(http_api_mux);

if ((ret = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/", new SrsGoApiApi())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {

return ret;

}

if ((ret = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != ERROR_SUCCESS) {

return ret;

}

// test the request info.

if ((ret = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != ERROR_SUCCESS) {

return ret;

}

// test the error code response.

if ((ret = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {

return ret;

}

// test the redirect mechenism.

if ((ret = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != ERROR_SUCCESS) {

return ret;

}

// test the http vhost.

if ((ret = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {

return ret;

}

// TODO: FIXME: for console.

// TODO: FIXME: support reload.

std::string dir = _srs_config->get_http_stream_dir() + "/console";

if ((ret = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != ERROR_SUCCESS) {

srs_error("http: mount console dir=%s failed. ret=%d", dir.c_str(), ret);

return ret;

}

srs_trace("http: api mount /console to %s", dir.c_str());

#endif

return ret;

}

该函数注册了http-api回调接口。可以参考:https://github.com/ossrs/srs/wiki/v2_CN_HTTPApi

比如我们可以访问http://ip:1985/api/v1 其中ip为SRS服务器地址,就可以看到从该接口返回srs服务器参数。

4、ingest(拉流,SRS主动去拉流,和推流相反)处理

注意:SRS对拉流的处理比较特殊,SRS拉流是通过ffmpeg工具去实现的,SRS代码只是实现简单的系统调用,这部分内容在后面的章节中详细说明。

[html] view
plain copy

int SrsIngester::start()

{

int ret = ERROR_SUCCESS;

if ((ret = parse()) != ERROR_SUCCESS) {

clear_engines();

ret = ERROR_SUCCESS;

return ret;

}

// even no ingesters, we must also start it,

// for the reload may add more ingesters.

// start thread to run all encoding engines.

if ((ret = pthread->start()) != ERROR_SUCCESS) {

srs_error("st_thread_create failed. ret=%d", ret);

return ret;

}

srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());

return ret;

}

到此,可以看出,和监听过程一样,进入int SrsThread::start()函数,只是传入对象不一样而已。

5、SRS自服务

[html] view
plain copy

int SrsServer::cycle()

{

int ret = ERROR_SUCCESS;

srs_trace("SrsServer")

ret = do_cycle();

#ifdef SRS_AUTO_GPERF_MC

destroy();

// remark, for gmc, never invoke the exit().

srs_warn("sleep a long time for system st-threads to cleanup.");

st_usleep(3 * 1000 * 1000);

srs_warn("system quit");

#else

// normally quit with neccessary cleanup by dispose().

srs_warn("main cycle terminated, system quit normally.");

dispose();

srs_trace("srs terminated");

// for valgrind to detect.

srs_freep(_srs_config);

srs_freep(_srs_log);

exit(0);

#endif

return ret;

}

[html] view
plain copy

int SrsServer::do_cycle()

{

int ret = ERROR_SUCCESS;

// find the max loop

int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);

#ifdef SRS_AUTO_STAT

max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);

max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);

max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);

max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);

max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);

max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);

max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);

#endif

// for asprocess.

bool asprocess = _srs_config->get_asprocess();

// the deamon thread, update the time cache

while (true) {

if(handler && (ret = handler->on_cycle((int)conns.size())) != ERROR_SUCCESS){

srs_error("cycle handle failed. ret=%d", ret);

return ret;

}

// the interval in config.

int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);

// dynamic fetch the max.

int temp_max = max;

temp_max = srs_max(temp_max, heartbeat_max_resolution);

for (int i = 0; i < temp_max; i++) {

st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);

// asprocess check.

if (asprocess && ::getppid() != ppid) {

srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());

return ret;

}

// gracefully quit for SIGINT or SIGTERM.

if (signal_gracefully_quit) {

srs_trace("cleanup for gracefully terminate.");

return ret;

}

// for gperf heap checker,

// @see: research/gperftools/heap-checker/heap_checker.cc

// if user interrupt the program, exit to check mem leak.

// but, if gperf, use reload to ensure main return normally,

// because directly exit will cause core-dump.

#ifdef SRS_AUTO_GPERF_MC

if (signal_gmc_stop) {

srs_warn("gmc got singal to stop server.");

return ret;

}

#endif

// do reload the config.

if (signal_reload) {

signal_reload = false;

srs_info("get signal reload, to reload the config.");

if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {

srs_error("reload config failed. ret=%d", ret);

return ret;

}

srs_trace("reload config success.");

}

// notice the stream sources to cycle.

if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {

return ret;

}

// update the cache time

if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {

srs_info("update current time cache.");

srs_update_system_time_ms();

}

#ifdef SRS_AUTO_STAT

if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {

srs_info("update resource info, rss.");

srs_update_system_rusage();

}

if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {

srs_info("update cpu info, cpu usage.");

srs_update_proc_stat();

}

if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {

srs_info("update disk info, disk iops.");

srs_update_disk_stat();

}

if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {

srs_info("update memory info, usage/free.");

srs_update_meminfo();

}

if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {

srs_info("update platform info, uptime/load.");

srs_update_platform_info();

}

if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {

srs_info("update network devices info.");

srs_update_network_devices();

}

if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {

srs_info("update network server kbps info.");

resample_kbps();

}

#ifdef SRS_AUTO_HTTP_CORE

if (_srs_config->get_heartbeat_enabled()) {

if ((i % heartbeat_max_resolution) == 0) {

srs_info("do http heartbeat, for internal server to report.");

http_heartbeat->heartbeat();

}

}

#endif

#endif

srs_info("server main thread loop");

}

}

return ret;

}

主线程,更新srs时间和缓存!!至此,系统启动代码结构梳理完了。

三、总结

启动不同的业务。
监听不同的客户端类型。
每链接一个客户端,SRS为其创建一个协程,专门负责该路链接信息交互。
SRS系统采用了协程网络服务框架,使得系统具有高并发,高性能等有点。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: