SRS(simple-rtmp-server)流媒体服务器源码分析--启动
2017-10-12 17:33
543 查看
SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动
一、前言
小卒最近看SRS源码,随手写下博客,其一为了整理思路,其二也是为日后翻看方便。如果不足之处,请指教!首先总结一下SRS源码的优点:
1、轻量级,代码结构清楚,目前SRS3.0代码8万行左右,但几乎满足直播业务的所有要求。
2、SRS采用State Threads,支持高并发量,高性能。
3、SRS支持rtmp和hls,满足PC和移动直播要求。
4、SRS支持集群部署。小集群Forward,大集群edge。
代码分析可分为两个阶段:
一:分析代码框架,理清楚组织流程
二:分析代码细节,熟悉SRS工作原理
二、代码分析
相关SRS源码其他总结:SRS(simple-rtmp-server)流媒体服务器源码分析--系统启动
SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP消息play
SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP信息Publish
SRS(simple-rtmp-server)流媒体服务器源码分析--HLS切片
现阶段,我主要以代码框架梳理为主。Srs源码框架如下图:
系统在启动时,初始化相关类,监听相关端口,若来一个访问请求,则为该链接创建一个线程,专门处理与该链接的操作。
main函数在srs_main_server.cpp这个文件中。在main函数中,启动参数在这里不做过多介绍。直接从run()-> run_master()看起。
[html] view
plain copy
int run_master()
{
int ret = ERROR_SUCCESS;
if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
return ret;
}
//将pid进程号写进文件
if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {
return ret;
}
//客户端监听
if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
return ret;
}
return 0;
}
进入客户监听
[html] view
plain copy
if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
return ret;
}
监听内容: 不同的连接请求,有不同的监听。
[html] view
plain copy
int SrsServer::listen()
{
int ret = ERROR_SUCCESS;
// 创建一个rtmp的Streamlistener
if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = listen_http_api()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
return ret;
}
if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
return ret;
}
return ret;
}
1、首先分析RTMP连接
[html] view
plain copy
int SrsServer::listen_rtmp()
{
int ret = ERROR_SUCCESS;
// stream service port.
std::vector<std::string> ip_ports = _srs_config->get_listens();
srs_assert((int)ip_ports.size() > 0);
close_listeners(SrsListenerRtmpStream);
for (int i = 0; i < (int)ip_ports.size(); i++) {
SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
listeners.push_back(listener);
std::string ip;
int port;
srs_parse_endpoint(ip_ports[i], ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
return ret;
}
}
return ret;
}
这里是listen_rtmp()函数,你也可以去看看listen_http_api()函数、listen_http_stream()函数,其实结构都很相似,只是在创建SrsStreamListener对象时,传入了不同的参数SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同类型的监听对象。
[html] view
plain copy
// listen_rtmp 中listen监听走这里了。
int SrsStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("tcp listen failed. ret=%d", ret);
return ret;
}
srs_info("listen thread current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
注意,这里有大量纯虚函数,不要走错路了。进入TCP监听代码
[html] view
plain copy
// rtmp tcp监听
int SrsTcpListener::listen()
{
//C++ Socket编程
int ret = ERROR_SUCCESS;
// 1、创建套接字,流式Socket(SOCK_STREAM)
if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
ret = ERROR_SOCKET_CREATE;
srs_error("create linux socket error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
int reuse_socket = 1;
if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
ret = ERROR_SOCKET_SETREUSE;
srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
return ret;
}
srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
// 2、绑定套接字到一个IP地址和一个端口上
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
ret = ERROR_SOCKET_BIND;
srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
// 3、将套接字设置为监听模式等待连接请求
if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
ret = ERROR_SOCKET_LISTEN;
srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
ret = ERROR_ST_OPEN_SOCKET;
srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
// 4、等到连接一个客户之后,开启一个新的线程
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
return ret;
}
srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
return ret;
}
此代码为C++ TCP Socket代码,思路比较清晰,可以看到,每接受到一个rtmp访问请求,创建一个”线程“,这里暂时将其称为线程,后面再做具体介绍。创建线程代码如下:
[html] view
plain copy
int SrsReusableThread::start()
{
return pthread->start();
}
[cpp] view
plain copy
int SrsThread::start()
{
int ret = ERROR_SUCCESS;
if(tid) {
srs_info("thread %s already running.", _name);
return ret;
}
if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
ret = ERROR_ST_CREATE_CYCLE_THREAD;
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
disposed = false;
// we set to loop to true for thread to run.
loop = true;
// wait for cid to ready, for parent thread to get the cid.
while (_cid < 0) {
st_usleep(10 * 1000);
}
// now, cycle thread can run.
can_run = true;
return ret;
}
来到了st_thread_create,这里要注意,这是SRS开源项目具有高并发,高性能的重要一步。这里创建的是协程,不是线程。协程是有别于进程和线程的一种组件,具有进程的独立性和线程的轻量级,听说微信能够支持8亿用户量,也是采用协程这种网络服务框架:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。
从这里可以看出,srs是一个单线程的服务器,采用协程,主持高并发,高性能。
创建协程,协程函数为:thread_fun()
[html] view
plain copy
// 每连链接一个用户,创建一个协程程,该函数为协程函数
void* SrsThread::thread_fun(void* arg)
{
SrsThread* obj = (SrsThread*)arg;
srs_assert(obj);
// 进入线程循环
obj->thread_cycle();
// for valgrind to detect.
SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
if (ctx) {
ctx->clear_cid();
}
st_thread_exit(NULL);
return NULL;
}
此时,真正进入了协程循环处理
[html] view
plain copy
void SrsThread::thread_cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
srs_info("thread %s cycle start", _name);
_cid = _srs_context->get_id();
srs_assert(handler);
handler->on_thread_start();
// thread is running now.
really_terminated = false;
// wait for cid to ready, for parent thread to get the cid.
while (!can_run && loop) {
st_usleep(10 * 1000);
}
while (loop) {
if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on before cycle success", _name);
// 注意纯虚函数的应用
if ((ret = handler->cycle()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
}
goto failed;
}
srs_info("thread %s cycle success", _name);
if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
goto failed;
}
srs_info("thread %s on end cycle success", _name);
failed:
if (!loop) {
break;
}
// to improve performance, donot sleep when interval is zero.
// @see: https://github.com/ossrs/srs/issues/237
if (cycle_interval_us != 0) {
st_usleep(cycle_interval_us);
}
}
// readly terminated now.
really_terminated = true;
handler->on_thread_stop();
srs_info("thread %s cycle finished", _name);
}
至此,一定要熟悉C++纯虚函数的引用,本人刚学了几天C++,对虚函数和纯虚函数在SRS源码中的应用很不习惯! 好了,进入循环ret = handler->cycle()
[html] view
plain copy
int SrsConnection::cycle()
{
int ret = ERROR_SUCCESS;
_srs_context->generate_id();
id = _srs_context->get_id();
ip = srs_get_peer_ip(st_netfd_fileno(stfd));
//srs_trace("ip:%s", ip);
ret = do_cycle();
// if socket io error, set to closed.
if (srs_is_client_gracefully_close(ret)) {
ret = ERROR_SOCKET_CLOSED;
}
// success.
if (ret == ERROR_SUCCESS) {
srs_trace("client finished.");
}
// client close peer.
if (ret == ERROR_SOCKET_CLOSED) {
srs_warn("client disconnect peer. ret=%d", ret);
}
return ERROR_SUCCESS;
}
进入ret=do_cycle();
[cpp] view
plain copy
// TODO: return detail message when error for client.
int SrsRtmpConn::do_cycle()
{
int ret = ERROR_SUCCESS;
srs_trace("RTMP client ip=%s", ip.c_str());
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
srs_error("rtmp handshake failed. ret=%d", ret);
return ret;
}
srs_verbose("rtmp handshake success");
if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
srs_error("rtmp connect vhost/app failed. ret=%d", ret);
return ret;
}
srs_verbose("rtmp connect app success");
// set client ip to request.
req->ip = ip;
// discovery vhost, resolve the vhost from config
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
if (parsed_vhost) {
req->vhost = parsed_vhost->arg0();
}
srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s",
req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {
ret = ERROR_RTMP_REQ_TCURL;
srs_error("discovery tcUrl failed. "
"tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);
return ret;
}
// check vhost
if ((ret = check_vhost()) != ERROR_SUCCESS) {
srs_error("check vhost failed. ret=%d", ret);
return ret;
}
srs_verbose("check vhost success.");
srs_trace("connect app, "
"tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
req->app.c_str(), (req->args? "(obj)":"null"));
// show client identity
if(req->args) {
std::string srs_version;
std::string srs_server_ip;
int srs_pid = 0;
int srs_id = 0;
SrsAmf0Any* prop = NULL;
if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
srs_version = prop->to_str();
}
if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
srs_server_ip = prop->to_str();
}
if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
srs_pid = (int)prop->to_number();
}
if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
srs_id = (int)prop->to_number();
}
srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d",
srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
if (srs_pid > 0) {
srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
}
}
ret = service_cycle();
http_hooks_on_close();
return ret;
}
行了,rtmp连接就到这里,要不然都快到rtmp流接受代码了,和系统启动越走越远了,rtmp流接受后面再分析。
2、再分析http-api连接,回到int SrsServer::listen()函数中,梳理http-api链接
[html] view
plain copy
int SrsServer::listen_http_api()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_API
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
SrsListener* listener = new SrsStreamListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
std::string ep = _srs_config->get_http_api_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
srs_error("HTTP api listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
return ret;
}
}
#endif
return ret;
}
listen_http_api()函数和listen_rtmp()函数内容非常像,再走到listener->listen()里面看看,结果来到了
[html] view
plain copy
int SrsStreamListener::listen(string i, int p)
{
int ret = ERROR_SUCCESS;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((ret = listener->listen()) != ERROR_SUCCESS) {
srs_error("tcp listen failed. ret=%d", ret);
return ret;
}
srs_info("listen thread current_cid=%d, "
"listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
_srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
return ret;
}
和rtmp链接监听机制完全一样,只是type不同而已
[html] view
plain copy
enum SrsListenerType
{
// RTMP client,
SrsListenerRtmpStream = 0,
// HTTP api,
SrsListenerHttpApi = 1,
// HTTP stream, HDS/HLS/DASH
SrsListenerHttpStream = 2,
// UDP stream, MPEG-TS over udp.
SrsListenerMpegTsOverUdp = 3,
// TCP stream, RTSP stream.
SrsListenerRtsp = 4,
// TCP stream, FLV stream over HTTP.
SrsListenerFlv = 5,
};
我就看了两个链接监听,监听到此为止。
3、http api回调注册
回到run_master()函数中,从_srs_server->http_handle()看起。
[html] view
plain copy
int SrsServer::http_handle()
{
int ret = ERROR_SUCCESS;
#ifdef SRS_AUTO_HTTP_API
srs_assert(http_api_mux);
if ((ret = http_api_mux->handle("/", new SrsHttpNotFoundHandler())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/", new SrsGoApiApi())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/", new SrsGoApiV1())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/versions", new SrsGoApiVersion())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/summaries", new SrsGoApiSummaries())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/rusages", new SrsGoApiRusages())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/self_proc_stats", new SrsGoApiSelfProcStats())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/system_proc_stats", new SrsGoApiSystemProcStats())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/meminfos", new SrsGoApiMemInfos())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/authors", new SrsGoApiAuthors())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/features", new SrsGoApiFeatures())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/vhosts/", new SrsGoApiVhosts())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/streams/", new SrsGoApiStreams())) != ERROR_SUCCESS) {
return ret;
}
if ((ret = http_api_mux->handle("/api/v1/clients/", new SrsGoApiClients())) != ERROR_SUCCESS) {
return ret;
}
// test the request info.
if ((ret = http_api_mux->handle("/api/v1/tests/requests", new SrsGoApiRequests())) != ERROR_SUCCESS) {
return ret;
}
// test the error code response.
if ((ret = http_api_mux->handle("/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
return ret;
}
// test the redirect mechenism.
if ((ret = http_api_mux->handle("/api/v1/tests/redirects", new SrsHttpRedirectHandler("/api/v1/tests/errors", SRS_CONSTS_HTTP_MovedPermanently))) != ERROR_SUCCESS) {
return ret;
}
// test the http vhost.
if ((ret = http_api_mux->handle("error.srs.com/api/v1/tests/errors", new SrsGoApiError())) != ERROR_SUCCESS) {
return ret;
}
// TODO: FIXME: for console.
// TODO: FIXME: support reload.
std::string dir = _srs_config->get_http_stream_dir() + "/console";
if ((ret = http_api_mux->handle("/console/", new SrsHttpFileServer(dir))) != ERROR_SUCCESS) {
srs_error("http: mount console dir=%s failed. ret=%d", dir.c_str(), ret);
return ret;
}
srs_trace("http: api mount /console to %s", dir.c_str());
#endif
return ret;
}
该函数注册了http-api回调接口。可以参考:https://github.com/ossrs/srs/wiki/v2_CN_HTTPApi
比如我们可以访问http://ip:1985/api/v1 其中ip为SRS服务器地址,就可以看到从该接口返回srs服务器参数。
4、ingest(拉流,SRS主动去拉流,和推流相反)处理
注意:SRS对拉流的处理比较特殊,SRS拉流是通过ffmpeg工具去实现的,SRS代码只是实现简单的系统调用,这部分内容在后面的章节中详细说明。
[html] view
plain copy
int SrsIngester::start()
{
int ret = ERROR_SUCCESS;
if ((ret = parse()) != ERROR_SUCCESS) {
clear_engines();
ret = ERROR_SUCCESS;
return ret;
}
// even no ingesters, we must also start it,
// for the reload may add more ingesters.
// start thread to run all encoding engines.
if ((ret = pthread->start()) != ERROR_SUCCESS) {
srs_error("st_thread_create failed. ret=%d", ret);
return ret;
}
srs_trace("ingest thread cid=%d, current_cid=%d", pthread->cid(), _srs_context->get_id());
return ret;
}
到此,可以看出,和监听过程一样,进入int SrsThread::start()函数,只是传入对象不一样而已。
5、SRS自服务
[html] view
plain copy
int SrsServer::cycle()
{
int ret = ERROR_SUCCESS;
srs_trace("SrsServer")
ret = do_cycle();
#ifdef SRS_AUTO_GPERF_MC
destroy();
// remark, for gmc, never invoke the exit().
srs_warn("sleep a long time for system st-threads to cleanup.");
st_usleep(3 * 1000 * 1000);
srs_warn("system quit");
#else
// normally quit with neccessary cleanup by dispose().
srs_warn("main cycle terminated, system quit normally.");
dispose();
srs_trace("srs terminated");
// for valgrind to detect.
srs_freep(_srs_config);
srs_freep(_srs_log);
exit(0);
#endif
return ret;
}
[html] view
plain copy
int SrsServer::do_cycle()
{
int ret = ERROR_SUCCESS;
// find the max loop
int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
#ifdef SRS_AUTO_STAT
max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
#endif
// for asprocess.
bool asprocess = _srs_config->get_asprocess();
// the deamon thread, update the time cache
while (true) {
if(handler && (ret = handler->on_cycle((int)conns.size())) != ERROR_SUCCESS){
srs_error("cycle handle failed. ret=%d", ret);
return ret;
}
// the interval in config.
int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() / SRS_SYS_CYCLE_INTERVAL);
// dynamic fetch the max.
int temp_max = max;
temp_max = srs_max(temp_max, heartbeat_max_resolution);
for (int i = 0; i < temp_max; i++) {
st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
// asprocess check.
if (asprocess && ::getppid() != ppid) {
srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());
return ret;
}
// gracefully quit for SIGINT or SIGTERM.
if (signal_gracefully_quit) {
srs_trace("cleanup for gracefully terminate.");
return ret;
}
// for gperf heap checker,
// @see: research/gperftools/heap-checker/heap_checker.cc
// if user interrupt the program, exit to check mem leak.
// but, if gperf, use reload to ensure main return normally,
// because directly exit will cause core-dump.
#ifdef SRS_AUTO_GPERF_MC
if (signal_gmc_stop) {
srs_warn("gmc got singal to stop server.");
return ret;
}
#endif
// do reload the config.
if (signal_reload) {
signal_reload = false;
srs_info("get signal reload, to reload the config.");
if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {
srs_error("reload config failed. ret=%d", ret);
return ret;
}
srs_trace("reload config success.");
}
// notice the stream sources to cycle.
if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {
return ret;
}
// update the cache time
if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
srs_info("update current time cache.");
srs_update_system_time_ms();
}
#ifdef SRS_AUTO_STAT
if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
srs_info("update resource info, rss.");
srs_update_system_rusage();
}
if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update cpu info, cpu usage.");
srs_update_proc_stat();
}
if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
srs_info("update disk info, disk iops.");
srs_update_disk_stat();
}
if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
srs_info("update memory info, usage/free.");
srs_update_meminfo();
}
if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
srs_info("update platform info, uptime/load.");
srs_update_platform_info();
}
if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
srs_info("update network devices info.");
srs_update_network_devices();
}
if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
srs_info("update network server kbps info.");
resample_kbps();
}
#ifdef SRS_AUTO_HTTP_CORE
if (_srs_config->get_heartbeat_enabled()) {
if ((i % heartbeat_max_resolution) == 0) {
srs_info("do http heartbeat, for internal server to report.");
http_heartbeat->heartbeat();
}
}
#endif
#endif
srs_info("server main thread loop");
}
}
return ret;
}
主线程,更新srs时间和缓存!!至此,系统启动代码结构梳理完了。
三、总结
启动不同的业务。监听不同的客户端类型。
每链接一个客户端,SRS为其创建一个协程,专门负责该路链接信息交互。
SRS系统采用了协程网络服务框架,使得系统具有高并发,高性能等有点。
相关文章推荐
- SRS(simple-rtmp-server)流媒体服务器源码分析--启动
- SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP消息play
- SRS(simple-rtmp-server)流媒体服务器源码分析--RTMP信息Publish
- SRS(simple-rtmp-server)流媒体服务器源码分析--HLS切片
- Ubuntu 14.04 64bit上编译安装simple-rtmp-server(srs)服务器
- SRS(simple-rtmp-server)直播点播服务器
- Ubuntu 14.04 64bit上编译安装simple-rtmp-server(srs)服务器
- [SRS流媒体]RTMP/HLS 直播服务器simple-rtmp-server安装
- Ubuntu 14.04 64bit上编译安装simple-rtmp-server(srs)服务器
- 源码分析Android SystemServer进程的启动过程
- Tomcat源码分析(3)--StandardServer类中涉及到的初始化和启动
- Redis网络库源码分析(2)之启动服务器
- 学习SRS(SimpleRtmpServer)--推流+直播rtmp+直播hls
- kafka源码分析之一server启动分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- SystemServer进程启动过程源码分析
- Netty4启动ServerBootStrap源码分析
- ADB模块源码分析(二)――adb server的启动
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析