SRS 代码分析【HTTP-FLV传输实现】
2017-11-27 13:11
525 查看
http-flv技术实现:
HTTP协议中有个约定:content-length字段,http的body部分的长度
服务器回复http请求的时候如果有这个字段,客户端就接收这个长度的数据然后就认为数据传输完成了,
如果服务器回复http请求中没有这个字段,客户端就一直接收数据,直到服务器跟客户端的socket连接断开。
http-flv直播就是利用了这个原理,服务器回复客户端请求的时候不加content-length字段,在回复了http
内容之后,紧接着发送flv数据,客户端就一直接收数据了。
请求SRS返回的是:
HTTP/1.1 200 OK
Connection: Keep-Alive
Content-Type: video/x-flv
Server: SRS/2.0.205
Transfer-Encoding: chunked
实现代码:
服务器启动时http端口的监听过程如下:
run_master()-->SrsServer::listen()--->SrsServer::listen_http_stream()
listen_http_stream服务端口监听流程如下:
1).判断是否开启HttpStream功能 _srs_config->get_http_stream_enabled()
2).创建SrsListener实例根据获取的ip的port启动监听listener->listen(ip, port)
3).SrsBufferListener::listen()中创建SrsTcpListener的实例并启动监听listener->listen()
[html] view
plain copy
srs_error_t SrsBufferListener::listen(string i, int p)
{
srs_error_t err = srs_success;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "buffered tcp listen");
}
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return err;
}
4).SrsTcpListener::listen()中启动监听,创建SrsSTCoroutine协程实例,开启协程。
[plain] view
plain copy
srs_error_t SrsTcpListener::listen()
{
srs_error_t err = srs_success;
if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_fd_close_exec(_fd);
srs_socket_reuse_addr(_fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
return srs_error_new(ERROR_SOCKET_BIND, "bind socket");
}
if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket");
}
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
}
srs_freep(trd);
trd = new SrsSTCoroutine("tcp", this);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start coroutine");
}
return err;
}
5).调用trd->start()函数后,最终会执行SrsTcpListener::cycle()。
具体调用过程请参照
http://blog.csdn.net/weixin_39799839/article/details/78579278
在cycle函数中会调用srs_accept阻塞等待客户端http请求的到来,
SrsTcpListener::cycle函数定义如下
[html] view
plain copy
srs_error_t SrsTcpListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(cstfd == NULL){
return srs_error_new(ERROR_SOCKET_CREATE, "accept failed");
}
int cfd = srs_netfd_fileno(cstfd);
srs_fd_close_exec(cfd);
if ((err = handler->on_tcp_client(cstfd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", cfd);
}
}
return err;
}
当有http请求到来时srs_accept()返回连接的fd,接着调用handler->on_tcp_client()处理连理连接;
on_tcp_client接下来执行顺序为:handler->on_tcp_client()-->SrsBufferListener::on_tcp_client()-->SrsServer::accept_client。
最终由SrsServer::accept_client处理连接请求,具体代码如下:
[plain] view
plain copy
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
srs_error_t err = srs_success;
SrsConnection* conn = NULL;
if ((err = fd2conn(type, stfd, &conn)) != srs_success) {
return srs_error_wrap(err, "fd2conn");
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn);
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) {
return srs_error_wrap(err, "start conn coroutine");
}
return err;
}
SrsServer::accept_client调用fd2conn(type, stfd, &conn)创建一个连接对象SrsResponseOnlyHttpConn,SrsResponseOnlyHttpConn 继承者自 SrsHttpConn
SrsHttpConn对应一个协程,conn.start()会启动该协程,进入到SrsHttpConn::do_cycle()循环来处理http请求:
调用流程如下:
SrsServer::accept_client--->SrsConnection::start--->SrsSTCoroutine::start-->SrsConnection::cycle--->SrsHttpConn::do_cycle
SrsHttpConn::do_cycle中调用on_got_http_message,该函数为纯虚函数,实际实现它的是SrsResponseOnlyHttpConn
,函数SrsResponseOnlyHttpConn::on_got_http_message主要是用来读取第一次http请求中的消息内容。后面该连接的消息处理会交给SrsHttpRecvThread。
r)对请求处理。cors是SrsHttpConn的成员变量,类型为SrsHttpCorsMux
纯虚函数,next实际上是SrsHttpServer*
SrsServer::http_server。赋值过程如下:
fd2conn()-->new
SrsResponseOnlyHttpConn(this, stfd, http_server, ip)-->SrsHttpConn(this, stdfd, http_server, ip)
-->SrsHttpConn(this,
stdfd, http_server, ip)-->SrsHttpConn::http_mux = http_server;
SrsHttpConn::do_cycle()调用cors->initialize(http_mux,
crossdomain_enabled)完成初始化。
cors->initialize函数中会给next赋值,next的实际就是SrsServer::http_server
SrsHttpServer::serve_http首先判断HttpStreamServer中是否注册了url请求对应的handler。
这里分析的是直播流通过http-flv的方式发送给客户端的场景。对于直播流实际调用的是SrsLiveStream的serve_http。
SrsLiveStream与url的建立关联过程如下。
serve_http调用find_handler
然后会调用SrsHttpStreamServer::hijack接着调用SrsHttpStreamServer::http_mount,
http_mount函数中首先根据请求调用SrsSource::fetch_or_create创建或者获取一个SrsSource对象用来从源端获取流数据,
接着根据url判断是否已经创建了handler,如果没有创建一个新的,如果有返回已经存在的。
find_handler调用后会得到handler,接着调用handler的serve_http。在SrsLiveStream::serve_http循环中通过Cosumer从源站不断的获取数据,再通过FlvEncoder将数据写入http响应中。
然后该函数会开启一个新的协程来处理客户端的请求。
然后程序进入到循环中通过cosumer不断的从源站获取数据,再通过SrsFlvStreamEncoder::write_tags将拉取的数据写入http响应中。
当处理客户端的协程发现客户端关闭连接时,该循环退出
HTTP协议中有个约定:content-length字段,http的body部分的长度
服务器回复http请求的时候如果有这个字段,客户端就接收这个长度的数据然后就认为数据传输完成了,
如果服务器回复http请求中没有这个字段,客户端就一直接收数据,直到服务器跟客户端的socket连接断开。
http-flv直播就是利用了这个原理,服务器回复客户端请求的时候不加content-length字段,在回复了http
内容之后,紧接着发送flv数据,客户端就一直接收数据了。
请求SRS返回的是:
HTTP/1.1 200 OK
Connection: Keep-Alive
Content-Type: video/x-flv
Server: SRS/2.0.205
Transfer-Encoding: chunked
实现代码:
服务器启动时http端口的监听过程如下:
run_master()-->SrsServer::listen()--->SrsServer::listen_http_stream()
listen_http_stream服务端口监听流程如下:
srs_error_t SrsServer::listen_http_stream() { srs_error_t err = srs_success; close_listeners(SrsListenerHttpStream); if (_srs_config->get_http_stream_enabled()) { SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpStream); listeners.push_back(listener); std::string ep = _srs_config->get_http_stream_listen(); std::string ip; int port; srs_parse_endpoint(ep, ip, port); if ((err = listener->listen(ip, port)) != srs_success) { return srs_error_wrap(err, "http stream listen %s:%d", ip.c_str(), port); } } return err; }
1).判断是否开启HttpStream功能 _srs_config->get_http_stream_enabled()
2).创建SrsListener实例根据获取的ip的port启动监听listener->listen(ip, port)
3).SrsBufferListener::listen()中创建SrsTcpListener的实例并启动监听listener->listen()
[html] view
plain copy
srs_error_t SrsBufferListener::listen(string i, int p)
{
srs_error_t err = srs_success;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port);
if ((err = listener->listen()) != srs_success) {
return srs_error_wrap(err, "buffered tcp listen");
}
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return err;
}
4).SrsTcpListener::listen()中启动监听,创建SrsSTCoroutine协程实例,开启协程。
[plain] view
plain copy
srs_error_t SrsTcpListener::listen()
{
srs_error_t err = srs_success;
if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
return srs_error_new(ERROR_SOCKET_CREATE, "create socket");
}
srs_fd_close_exec(_fd);
srs_socket_reuse_addr(_fd);
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
return srs_error_new(ERROR_SOCKET_BIND, "bind socket");
}
if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
return srs_error_new(ERROR_SOCKET_LISTEN, "listen socket");
}
if ((_stfd = srs_netfd_open_socket(_fd)) == NULL){
return srs_error_new(ERROR_ST_OPEN_SOCKET, "st open socket");
}
srs_freep(trd);
trd = new SrsSTCoroutine("tcp", this);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start coroutine");
}
return err;
}
5).调用trd->start()函数后,最终会执行SrsTcpListener::cycle()。
具体调用过程请参照
http://blog.csdn.net/weixin_39799839/article/details/78579278
在cycle函数中会调用srs_accept阻塞等待客户端http请求的到来,
SrsTcpListener::cycle函数定义如下
[html] view
plain copy
srs_error_t SrsTcpListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
srs_netfd_t cstfd = srs_accept(_stfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(cstfd == NULL){
return srs_error_new(ERROR_SOCKET_CREATE, "accept failed");
}
int cfd = srs_netfd_fileno(cstfd);
srs_fd_close_exec(cfd);
if ((err = handler->on_tcp_client(cstfd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", cfd);
}
}
return err;
}
当有http请求到来时srs_accept()返回连接的fd,接着调用handler->on_tcp_client()处理连理连接;
on_tcp_client接下来执行顺序为:handler->on_tcp_client()-->SrsBufferListener::on_tcp_client()-->SrsServer::accept_client。
最终由SrsServer::accept_client处理连接请求,具体代码如下:
[plain] view
plain copy
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
srs_error_t err = srs_success;
SrsConnection* conn = NULL;
if ((err = fd2conn(type, stfd, &conn)) != srs_success) {
return srs_error_wrap(err, "fd2conn");
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn);
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) {
return srs_error_wrap(err, "start conn coroutine");
}
return err;
}
SrsServer::accept_client调用fd2conn(type, stfd, &conn)创建一个连接对象SrsResponseOnlyHttpConn,SrsResponseOnlyHttpConn 继承者自 SrsHttpConn
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn) { ...... if (type == SrsListenerRtmpStream) { *pconn = new SrsRtmpConn(this, stfd, ip); } else if (type == SrsListenerHttpApi) { *pconn = new SrsHttpApi(this, stfd, http_api_mux, ip); } else if (type == SrsListenerHttpStream) { *pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip); } else { srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str()); srs_close_stfd(stfd); return err; } return err; }
SrsHttpConn对应一个协程,conn.start()会启动该协程,进入到SrsHttpConn::do_cycle()循环来处理http请求:
调用流程如下:
SrsServer::accept_client--->SrsConnection::start--->SrsSTCoroutine::start-->SrsConnection::cycle--->SrsHttpConn::do_cycle
srs_error_t SrsHttpConn::do_cycle() { int ret = ERROR_SUCCESS; srs_error_t err = srs_success; srs_trace("HTTP client ip=%s", ip.c_str()); // initialize parser if ((ret = parser->initialize(HTTP_REQUEST, false)) != ERROR_SUCCESS) { return srs_error_new(ret, "init parser"); } // set the recv timeout, for some clients never disconnect the connection. // @see https://github.com/ossrs/srs/issues/398 skt->set_recv_timeout(SRS_HTTP_RECV_TMMS); SrsRequest* last_req = NULL; SrsAutoFree(SrsRequest, last_req); // initialize the cors, which will proxy to mux. bool crossdomain_enabled = _srs_config->get_http_stream_crossdomain(); if ((err = cors->initialize(http_mux, crossdomain_enabled)) != srs_success) { return srs_error_wrap(err, "init cors"); } // process http messages. while ((err = trd->pull()) == srs_success) { ISrsHttpMessage* req = NULL; // get a http message if ((ret = parser->parse_message(skt, this, &req)) != ERROR_SUCCESS) { break; } // if SUCCESS, always NOT-NULL. srs_assert(req); // always free it in this scope. SrsAutoFree(ISrsHttpMessage, req); // copy request to last request object. srs_freep(last_req); SrsHttpMessage* hreq = dynamic_cast<SrsHttpMessage*>(req); last_req = hreq->to_request(hreq->host()); // may should discard the body. if ((err = on_got_http_message(req)) != srs_success) { break; } // ok, handle http request. SrsHttpResponseWriter writer(skt); if ((err = process_request(&writer, req)) != srs_success) { break; } // donot keep alive, disconnect it. // @see https://github.com/ossrs/srs/issues/399 if (!req->is_keep_alive()) { break; } } srs_error_t r0 = srs_success; if ((r0 = on_disconnect(last_req)) != srs_success) { err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str()); srs_freep(r0); } return err; }
SrsHttpConn::do_cycle中调用on_got_http_message,该函数为纯虚函数,实际实现它的是SrsResponseOnlyHttpConn
,函数SrsResponseOnlyHttpConn::on_got_http_message主要是用来读取第一次http请求中的消息内容。后面该连接的消息处理会交给SrsHttpRecvThread。
srs_error_t SrsResponseOnlyHttpConn::on_got_http_message(ISrsHttpMessage* msg) { int ret = ERROR_SUCCESS; srs_error_t err = srs_success; ISrsHttpResponseReader* br = msg->body_reader(); // when not specified the content length, ignore. if (msg->content_length() == -1) { return err; } // drop all request body. while (!br->eof()) { char body[4096]; if ((ret = br->read(body, 4096, NULL)) != ERROR_SUCCESS) { return srs_error_new(ret, "read response"); } } return err; }SrsHttpConn::do_cycle中读取完http请求后,接着调用请求处理函数process_request
srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; srs_trace("HTTP %s %s, content-length=%" PRId64 "", r->method_str().c_str(), r->url().c_str(), r->content_length()); // use cors server mux to serve http request, which will proxy to http_remux. if ((err = cors->serve_http(w, r)) != srs_success) { return srs_error_wrap(err, "mux serve"); } return err; }函数SrsHttpConn::process_request调用cors->serve_http(w,
r)对请求处理。cors是SrsHttpConn的成员变量,类型为SrsHttpCorsMux
srs_error_t SrsHttpCorsMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { int ret = ERROR_SUCCESS; // If CORS enabled, and there is a "Origin" header, it's CORS. if (enabled) { for (int i = 0; i < r->request_header_count(); i++) { string k = r->request_header_key_at(i); if (k == "Origin" || k == "origin") { required = true; break; } } } // When CORS required, set the CORS headers. if (required) { SrsHttpHeader* h = w->header(); h->set("Access-Control-Allow-Origin", "*"); h->set("Access-Control-Allow-Methods", "GET, POST, HEAD, PUT, DELETE, OPTIONS"); h->set("Access-Control-Expose-Headers", "Server,range,Content-Length,Content-Range"); h->set("Access-Control-Allow-Headers", "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type"); } // handle the http options. if (r->is_http_options()) { w->header()->set_content_length(0); if (enabled) { w->write_header(SRS_CONSTS_HTTP_OK); } else { w->write_header(SRS_CONSTS_HTTP_MethodNotAllowed); } if ((ret = w->final_request()) != ERROR_SUCCESS) { return srs_error_new(ret, "final request"); } } srs_assert(next); return next->serve_http(w, r); }SrsHttpCorsMux::serve_http首选写入一些响应头部,接着调用next->serve_http,该方法为ISrsHttpServeMux类的
纯虚函数,next实际上是SrsHttpServer*
SrsServer::http_server。赋值过程如下:
fd2conn()-->new
SrsResponseOnlyHttpConn(this, stfd, http_server, ip)-->SrsHttpConn(this, stdfd, http_server, ip)
-->SrsHttpConn(this,
stdfd, http_server, ip)-->SrsHttpConn::http_mux = http_server;
SrsHttpConn::do_cycle()调用cors->initialize(http_mux,
crossdomain_enabled)完成初始化。
cors->initialize函数中会给next赋值,next的实际就是SrsServer::http_server
srs_error_t SrsHttpCorsMux::initialize(ISrsHttpServeMux* worker, bool cros_enabled) { next = worker; enabled = cros_enabled; return srs_success; }调用cors->serve_http(w, r)实际就是调用SrsHttpServer::serve_http,具体代码如下
srs_error_t SrsHttpServer::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; // try http stream first. ISrsHttpHandler* h = NULL; if ((err = http_stream->mux.find_handler(r, &h)) != srs_success) { return srs_error_wrap(err, "find handler"); } if (!h->is_not_found()) { return http_stream->mux.serve_http(w, r); } return http_static->mux.serve_http(w, r); }
SrsHttpServer::serve_http首先判断HttpStreamServer中是否注册了url请求对应的handler。
srs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { srs_error_t err = srs_success; ISrsHttpHandler* h = NULL; if ((err = find_handler(r, &h)) != srs_success) { return srs_error_wrap(err, "find handler"); } srs_assert(h); if ((err = h->serve_http(w, r)) != srs_success) { return srs_error_wrap(err, "serve http"); } return err; }
这里分析的是直播流通过http-flv的方式发送给客户端的场景。对于直播流实际调用的是SrsLiveStream的serve_http。
SrsLiveStream与url的建立关联过程如下。
serve_http调用find_handler
然后会调用SrsHttpStreamServer::hijack接着调用SrsHttpStreamServer::http_mount,
http_mount函数中首先根据请求调用SrsSource::fetch_or_create创建或者获取一个SrsSource对象用来从源端获取流数据,
接着根据url判断是否已经创建了handler,如果没有创建一个新的,如果有返回已经存在的。
int SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r) { ...... // remove the default vhost mount mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/"); entry = new SrsLiveEntry(mount); entry->cache = new SrsBufferCache(s, r); entry->stream = new SrsLiveStream(s, r, entry->cache); ...... }
find_handler调用后会得到handler,接着调用handler的serve_http。在SrsLiveStream::serve_http循环中通过Cosumer从源站不断的获取数据,再通过FlvEncoder将数据写入http响应中。
srs_error_t SrsLiveStream::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r) { int ret = ERROR_SUCCESS; srs_error_t err = srs_success; ISrsBufferEncoder* enc = NULL; srs_assert(entry); if (srs_string_ends_with(entry->pattern, ".flv")) { w->header()->set_content_type("video/x-flv"); #ifdef SRS_PERF_FAST_FLV_ENCODER enc = new SrsFastFlvStreamEncoder(); #else enc = new SrsFlvStreamEncoder(); #endif } else if (srs_string_ends_with(entry->pattern, ".aac")) { w->header()->set_content_type("audio/x-aac"); enc = new SrsAacStreamEncoder(); } else if (srs_string_ends_with(entry->pattern, ".mp3")) { w->header()->set_content_type("audio/mpeg"); enc = new SrsMp3StreamEncoder(); } else if (srs_string_ends_with(entry->pattern, ".ts")) { w->header()->set_content_type("video/MP2T"); enc = new SrsTsStreamEncoder(); } else { return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str()); } SrsAutoFree(ISrsBufferEncoder, enc); // create consumer of souce, ignore gop cache, use the audio gop cache. SrsConsumer* consumer = NULL; if ((ret = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != ERROR_SUCCESS) { return srs_error_new(ret, "create consumer"); } SrsAutoFree(SrsConsumer, consumer); srs_verbose("http: consumer created success."); SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream(); SrsAutoFree(SrsPithyPrint, pprint); SrsMessageArray msgs(SRS_PERF_MW_MSGS); // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((ret = stat->on_client(_srs_context->get_id(), req, NULL, SrsRtmpConnPlay)) != ERROR_SUCCESS) { return srs_error_new(ret, "stat on client"); } // the memory writer. SrsBufferWriter writer(w); if ((ret = enc->initialize(&writer, cache)) != ERROR_SUCCESS) { return srs_error_new(ret, "init encoder"); } // if gop cache enabled for encoder, dump to consumer. if (enc->has_cache()) { if ((ret = enc->dump_cache(consumer, source->jitter())) != ERROR_SUCCESS) { return srs_error_new(ret, "encoder dump cache"); } } #ifdef SRS_PERF_FAST_FLV_ENCODER SrsFastFlvStreamEncoder* ffe = dynamic_cast<SrsFastFlvStreamEncoder*>(enc); #endif // Use receive thread to accept the close event to avoid FD leak. // @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427 SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r); SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection()); SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); SrsAutoFree(SrsHttpRecvThread, trd); if ((err = trd->start()) != srs_success) { return srs_error_wrap(err, "start recv thread"); } // TODO: free and erase the disabled entry after all related connections is closed. while (entry->enabled) { pprint->elapse(); // Whether client closed the FD. if ((err = trd->pull()) != srs_success) { return srs_error_wrap(err, "recv thread"); } // get messages from consumer. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them. int count = 0; if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) { return srs_error_new(ret, "consumer dump packets"); } if (count <= 0) { srs_info("http: sleep %dms for no msg", SRS_CONSTS_RTMP_PULSE_TMMS); // directly use sleep, donot use consumer wait. srs_usleep(SRS_CONSTS_RTMP_PULSE_TMMS * 1000); // ignore when nothing got. continue; } if (pprint->can_print()) { srs_info("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d", count, pprint->age(), SRS_PERF_MW_MIN_MSGS, SRS_CONSTS_RTMP_PULSE_TMMS); } // sendout all messages. #ifdef SRS_PERF_FAST_FLV_ENCODER if (ffe) { ret = ffe->write_tags(msgs.msgs, count); } else { ret = streaming_send_messages(enc, msgs.msgs, count); } #else ret = streaming_send_messages(enc, msgs.msgs, count); #endif // free the messages. for (int i = 0; i < count; i++) { SrsSharedPtrMessage* msg = msgs.msgs[i]; srs_freep(msg); } // check send error code. if (ret != ERROR_SUCCESS) { return srs_error_new(ret, "send messages"); } } return err; }首先该函数会调用source->create_consumer创建一个SrsConsumer对象,用来接收SrsSource中获取的源站数据。
然后该函数会开启一个新的协程来处理客户端的请求。
/**
* The HTTP receive thread, try to read messages util EOF.
* For example, the HTTP FLV serving thread will use the receive thread to break
* when client closed the request, to avoid FD leak.
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r); SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection()); SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc);
然后程序进入到循环中通过cosumer不断的从源站获取数据,再通过SrsFlvStreamEncoder::write_tags将拉取的数据写入http响应中。
当处理客户端的协程发现客户端关闭连接时,该循环退出
相关文章推荐
- HTTP严格安全传输(HTTP Strict Transport Security, HSTS)chromuim实现源码分析(二)
- HTTP代理实现请求报文的拦截与篡改10--大结局 篡改部分的代码分析
- HTTP严格安全传输(HTTP Strict Transport Security, HSTS)chromuim实现源码分析(一)
- HTTP严格安全传输(HTTP Strict Transport Security, HSTS)chromuim实现源码分析(一)
- SRS 代码分析【转发流实现】
- SRS 代码分析【DVR录像实现】
- SRS 代码分析【RTMP握手实现】
- SRS 代码分析【FLV文件解析】
- Linux Signal实现代码分析http://blog.csdn.net/suqin0802/article/details/8093004
- HTTP严格安全传输(HTTP Strict Transport Security, HSTS)chromuim实现源码分析(一)
- http报文中chunked分块编码传输格式分析及c语言解压实现
- Linux Signal实现代码分析
- 直播协议HTTP-FLV标准解读与技术实现
- 归并排序算法原理分析与代码实现
- 区块链教程Fabric1.0源代码分析Peer peer chaincode命令及子命令实现
- 基于visual c++之windows核心编程代码分析(48)编程实现远程登录3389
- Java NIO原理图文分析及代码实现
- [导入]ASP.NET 2.0 HttpHandler实现生成图片验证码(示例代码下载)
- 日志分析代码实现(字符串切割)
- 【转】https,https的本地测试环境搭建,asp.net结合https的代码实现,http网站转换成https网站之后遇到的问题