您的位置:首页 > 运维架构 > Nginx

NGINX-RTMP复杂度分析

2013-09-27 20:30 363 查看
 很好奇nginx如何处理异步请求,我看nginx-rtmp在处理异步时状态也还行,所以调试下看看。

纯异步做rtmp协议,真是非常复杂,特别是需要做回源。无数的回调和处理逻辑。NGINX-RTMP几个简化问题的方法:

1. RTMP-CHUNK协议解析直接在一个函数里做,避免添加状态。

2. CHUNK的长度可以计算得出,所以收到要求的长度的数据后,才开始协议解析。

3. 收发数据,协议解析,包逻辑处理,三部分分离。

4. 包收到后通过各种handler回调,包发送时只是发送到chain,异步的发送和包发送分离。

5. 使用ATM模型来收发数据。

6. 使用chain作为收发数据的缓存。收取时,chain达到长度才开始处理;发送包时,缓存到chain,然后启动异步发送即可。

整个机制虽然有所简化,比起同步的调用,要复杂至少10倍。

首先,下载和解压nginx:

     tar xf nginx-1.5.0.tar.gz && tar xf nginx-rtmp-module-1.0.4.tar.gz

     cd nginx-1.5.0 && ./configure --add-module=/home/winlin/nginx-rtmp-module-1.0.4 --with-http_ssl_module --prefix=`pwd`/release

修改编译参数,将优化去掉:

     sed -i "s/-O /-O0 /g" objs/Makefile

编译:

     make && make install

配置RTMP:

     vi conf/nginx.conf

添加如下:

    daemon off;

    master_process off;

    rtmp {

        server {

            listen 19352;

            application live {

                live on;

                allow publish all;

                allow play all;

            }

        }

    }

配置可以在nginx.c中看到:

    { ngx_string("daemon"),

      NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_FLAG,

      ngx_conf_set_flag_slot,

      0,

      offsetof(ngx_core_conf_t, daemon),

      NULL },

跳到函数:ngx_conf_set_flag_slot,可以看到应该配置为on或off。

开始调试。

ultimate IDE的project如附件:

目录结构如下:

E:\Chnvideo\research\nginx

main

nginx-1.5.0

nginx-rtmp-module-1.0.4

只要将nginx和rtmp模块解压到这个目录,然后将当前目录(E:\Chnvideo\research\nginx)在IDE打开即可。

IDE=>New assembly => package nests 输入:E:\Chnvideo\research\nginx

Rtmp模块初始化

rtmp模块初始化函数调用如下:

(gdb) bt

#0  ngx_rtmp_optimize_servers (cf=0x7fffffffe1d0, ports=0x7fffffffddb0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:605

#1  0x00000000004ae362 in ngx_rtmp_block (cf=0x7fffffffe1d0, cmd=0x711680, conf=0xa4da78) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:312

#2  0x000000000041f173 in ngx_conf_handler (cf=0x7fffffffe1d0, last=1) at src/core/ngx_conf_file.c:387

#3  0x000000000041ed1e in ngx_conf_parse (cf=0x7fffffffe1d0, filename=0xa4ce00) at src/core/ngx_conf_file.c:243

#4  0x000000000041b9cc in ngx_init_cycle (old_cycle=0x7fffffffe310) at src/core/ngx_cycle.c:268

#5  0x0000000000406230 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:333

(gdb) f

#0  ngx_rtmp_optimize_servers (cf=0x7fffffffe1d0, ports=0x7fffffffddb0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp.c:605

605                 ls->handler = ngx_rtmp_init_connection;

默认将accept之后的处理函数设置为了ngx_rtmp_init_connection。

Nginx的回调函数

在ngx_rtmp_init_connection(ngx_rtmp_init.c:132)中设置断点,可以看到接受连接和处理handshake的过程。

#0  ngx_rtmp_init_connection (c=0x7ffff7fad190) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_init.c:31

#1  0x000000000042befe in ngx_event_accept (ev=0xa7b370) at src/event/ngx_event_accept.c:357

#2  0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=18446744073709551615, flags=1) at src/event/modules/ngx_epoll_module.c:683

#3  0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249

#4  0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315

#5  0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409

在ngx_event_accept(ngx_event_accept.c:206)中,接受到连接后进行处理,其中设置了recvchain:

        c->recv_chain = ngx_recv_chain;

        c->send_chain = ngx_send_chain;

这两个值是宏定义,在ngx_event.h:458:

extern ngx_os_io_t  ngx_io;

#define ngx_recv_chain       ngx_io.recv_chain

#define ngx_send_chain       ngx_io.send_chain

这个ngx_io是个extern的变量,grep搜索,发现定义在:

[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_io\;"

src/core/ngx_connection.c:13:ngx_os_io_t  ngx_io;

[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_io ="|grep epoll

src/event/modules/ngx_epoll_module.c:329:    ngx_io = ngx_os_io;

所以会在epoll这个模块被重置,可以设置断点。

b ngx_epoll_module.c:329

Breakpoint 6, ngx_epoll_init (cycle=0xa4cca0, timer=0) at src/event/modules/ngx_epoll_module.c:329

329         ngx_io = ngx_os_io;

(gdb) p &ngx_io

$32 = (ngx_os_io_t *) 0xa222a0

(gdb) p &ngx_os_io

$33 = (ngx_os_io_t *) 0x703440

(gdb) p ngx_os_io

$34 = {recv = 0x42f8b0 <ngx_unix_recv>, recv_chain = 0x42f9c8 <ngx_readv_chain>, udp_recv = 0x42fc74 <ngx_udp_unix_recv>, send = 0x42fd34 <ngx_unix_send>, send_chain = 0x435ee4 <ngx_linux_sendfile_chain>, flags = 1}

(gdb) p ngx_io

$35 = {recv = 0, recv_chain = 0, udp_recv = 0, send = 0, send_chain = 0, flags = 0}

在connection.c中的初始化只是默认初始化为0,只有在ngx_os_io这个里面才是真正用到的。

这个ngx_os_io是在这里定义的:

[winlin@dev6 nginx-1.5.0]$ find src -name "*.c"|xargs grep -in "ngx_os_io \="|grep linux

src/os/unix/ngx_linux_init.c:75:    ngx_os_io = ngx_linux_io;

这个结构体定义为:

static ngx_os_io_t ngx_linux_io = {

    ngx_unix_recv,

    ngx_readv_chain,

    ngx_unix_send,

    ngx_linux_sendfile_chain,

};

函数定义在ngx_readv_chain.c:19和ngx_linux_sendfile_chain.c:38(对于rtmp还是用的writev,不是用的sendfile)。

处理函数的改变

ngx_rtmp_init_connection里面调用了ngx_rtmp_init_session和ngx_rtmp_handshake。

ngx_rtmp_init_session先调用ngx_rtmp_set_chunk_size,然后调用ngx_rtmp_fire_event激发了事件NGX_RTMP_CONNECT。

ngx_rtmp_set_chunk_size,第一次将chunk-size设为128(NGX_RTMP_DEFAULT_CHUNK_SIZE),不需要发包。

ngx_rtmp_fire_event的handler可以搜索:NGX_RTMP_CONNECT,定义在:ngx_rtmp_limit_postconfiguration

h = ngx_array_push(&cmcf->events[NGX_RTMP_CONNECT]);

*h = ngx_rtmp_limit_connect;

所以NGX_RTMP_CONNECT的handler应该是ngx_rtmp_limit_connect,这个函数检查一个共享的变量,然后判断是否超过连接数,返回错误或者OK:

      rc = n > (ngx_uint_t) lmcf->max_conn ? NGX_ERROR : NGX_OK;

若可以接受这个连接,则开始握手,调用函数ngx_rtmp_handshake如下:

#0  ngx_rtmp_handshake (s=0xa665b0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c:576

#1  0x00000000004af3ab in ngx_rtmp_init_connection (c=0x7ffff7fad190) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_init.c:132

#2  0x000000000042befe in ngx_event_accept (ev=0xa7b370) at src/event/ngx_event_accept.c:357

#3  0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=18446744073709551615, flags=1) at src/event/modules/ngx_epoll_module.c:683

#4  0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249

#5  0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315

#6  0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409

在ngx_rtmp_handshake中,初始化rtmp的读写函数为:

c->read->handler =  ngx_rtmp_handshake_recv;

c->write->handler = ngx_rtmp_handshake_send;

然后,设置状态机:

      s->hs_stage = NGX_RTMP_HANDSHAKE_SERVER_RECV_CHALLENGE;

并直接进入下一个状态:

      ngx_rtmp_handshake_recv(c->read);

可见,还是使用了状态机,只是将状态组合成了几个大的阶段,通过设置不同的c->read/write->hander来处理。

在handshake完毕的函数ngx_rtmp_cycle(s)里面,将handler设置成了ngx_rtmp_recv/ngx_rtmp_send。

HandshakeBuffer

设置断点,看状态如何改变:

(gdb) b ngx_rtmp_handshake_recv

Breakpoint 15 at 0x4b0526: file /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c, line 375.

(gdb) b ngx_rtmp_handshake_send

Breakpoint 16 at 0x4b09a6: file /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handshake.c, line 490.

ngx_rtmp_handshake_recv,接受消息,处理时,先接受指定长度的数据,然后再处理。

b = s->hs_buf;

while (b->last != b->end) {

    n = c->recv(c, b->last, b->end - b->last);

    if (n == NGX_AGAIN) {

        ngx_add_timer(rev, s->timeout);

        if (ngx_handle_read_event(c->read, 0) != NGX_OK) {

            ngx_rtmp_finalize_session(s);

        }

        return;

    }

    b->last += n;

}

第一次时,会收1537字节然后处理,:

     (gdb) p b->end-b->last

      $76 = 1537

所以这个s->hs_buf应该初始化为1537字节,即handshake的字节。在调用handshake时初始化为NGX_RTMP_HANDSHAKE_BUFSIZE长度。

获取时间:

ngx_time_update这个函数会更新全局变量ngx_current_msec,调用的是ngx_gettimeofday,如果过多频繁的调用会有性能问题。

所以应该不会随意调用它,很多时候只是直接用,相当于cache了这个时间。

timer的处理,超时:

假设收取失败,返回的是NGX_AGAIN(-2),则会使用定时器:ngx_add_timer设置超时,然后调用ngx_handle_read_event侦听READ事件。

ngx_add_timer会添加到rbtree里面去:

      timer = {key = 1380262079704, left = 0x719700, right = 0x719700, parent = 0x0, color = 0 '\000', data = 0 '\000'}

NGX_EAGAIN返回后,会调用ngx_event_expire_timers函数,这个会取全局的rbtree的root:ngx_event_timer_rbtree

      (gdb) p ngx_event_timer_rbtree 

      $98 = {root = 0xa7b468, sentinel = 0x719700, insert = 0x414ded <ngx_rbtree_insert_timer_value>}

     (gdb) p *ngx_event_timer_rbtree.root

      $101 = {key = 1380262079704, left = 0x719700, right = 0x719700, parent = 0x0, color = 0 '\000', data = 0 '\000'}

这个root就是ngx_add_timer添加的那个选项,rev就是handshake的那个结构。

rbtree的key是超时时间,所以最小的超时时间会放在前面,ngx_event_add_timer函数里面:

      key = ngx_current_msec + timer;

rbtree的节点是直接指向evt对象的,所以ngx_event_expire_timers可以直接从rbtree的node获取evt:

      ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));

然后设置为timeout后,直接就调用handler:

     ev->timedout = 1;

     ev->handler(ev); // ngx_rtmp_handshake_recv

因为timeout,所以就关掉连接了。

EAGAIN的处理:

假若没有timeout,则进入到EAGAIN的处理。

ngx_event_find_timer会返回一个timeout,共epoll_wait来处理超时事件。

     timer = ngx_event_find_timer(); //60000,handshake的超时为60秒。

     events = epoll_wait(ep, event_list, (int) nevents, timer);

若fd可读,则调用read的handler:

     if ((revents & EPOLLIN) && rev->active) {

          rev = c->read;

          rev->handler(rev);

     (gdb) p *c->read->handler

      $125 = {void (ngx_event_t *)} 0x4b0519 <ngx_rtmp_handshake_recv>

所以直接就调用到了这个处理函数ngx_rtmp_handshake_recv,先清除timer,然后读取到handshake需要的长度的数据。

     (gdb) p b->end-b->last

      $126 = 1437

还需要读取这些数据。若EAGAIN,还是在可读时调用这个函数。

可见nginx来设置不同的handler,相当于将大状态机切割成小状态机,小状态机里面直接switch。

若c++用类表示大状态机,用switch表示小状态机,是可以简化的。设置不同的handler,就设置不同的状态子类表示。

ATM模型:

ATM就是读-写-读这种分离的模型,避免又读又写。即同一时刻,只能读或者写,做完了才能做下一个。

ngx_rtmp_handshake_recv中,会先读取,若EAGAIN则侦听读事件,若读取完毕则删除读事件。

     // start read, if not completed, focus read event.

     n = c->recv(c, b->last, b->end - b->last);

     if (n == NGX_AGAIN) {

          ngx_handle_read_event(c->read, 0);

          return;

     }

     // read completed, donot focus read event.

     if (rev->active) {

          ngx_del_event(rev, NGX_READ_EVENT, 0);

     }

若读取C0C1成功,则发送S0S1S2。调用的是:

     ngx_rtmp_handshake_send(c->write);

这个函数会尝试发送,也是ATM模型:

     // start write, if not completed, focus write event.

     n = c->send(c, b->pos, b->last - b->pos);

     if (n == NGX_AGAIN || n == 0) {

          ngx_handle_write_event(c->write, 0)

          return;

     }

     // write completed, donot focus write event.

     if (wev->active) {

          ngx_del_event(wev, NGX_WRITE_EVENT, 0);

     }

发送S0S1完毕,还是要收C2.等C0C1C2和S0S1S2都处理完毕,则进入ngx_rtmp_handshake_done,调用ngx_rtmp_cycle函数,进入包处理逻辑。

RTMP收包

这个应该是状态巨多的一个场景,每个包都是一个大状态。

首先把handler设置成收发的函数。

     c->read->handler =  ngx_rtmp_recv;

     c->write->handler = ngx_rtmp_send;

并先开始收数据并处理:

     ngx_rtmp_recv(c->read);

ngx_rtmp_recv的主要逻辑就是解析chunk为RTMP包,并调用处理包的函数。

ngx_rtmp_session_t* s中有个成员是ngx_rtmp_stream_t* in_streams,这个就是chunk streams。

     st = &s->in_streams[s->in_csid]; // 获取或创建一个chunk_stream,直接将数组转换为map,即预先开辟in_streams的空间。

其中:

typedef struct {

    ngx_rtmp_header_t       hdr;

    uint32_t                dtime;

    uint32_t                len;        /* current fragment length */

    uint8_t                 ext;

    ngx_chain_t            *in;

} ngx_rtmp_stream_t;

在每个ngx_rtmp_stream_t中都定义了一个chain:in。每个chain其实就是一个缓存,就是说给每个chunk_stream新建了一个可以写的buffer。

     n = c->recv(c, b->last, b->end - b->last); // 先收146字节的数据。调试时可以改为10,看如何解析。

b的初始化在ngx_rtmp_init_session,设置了in_streams里面:

     s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) * cscf->max_streams);

收到数据后,开始解析chunk。若只读取了10字节,那么basic_header当fmt为0时需要12字节,会在这个地方再读取:

     if (fmt == 0) {

          if (b->last - p < 4)

               continue;

    (gdb) p b->last - p

    $173 = 2

再次循环调用recv和解析:

     n = c->recv(c, b->last, b->end - b->last); // b->end - b->last=136,读取了10字节。

解析时,还是会重头开始解析一遍。除非header解析完了,才会改变pos位置:

     /* header done */

     b->pos = p; // p-b->start=12, fmt为0时的12字节basic header.

解析完的头如下:

     (gdb) p st->hdr

      $189 = {csid = 3, timestamp = 0, mlen = 161, type = 20 '\024', msid = 0}

有个地方验证了消息的最大长度,不能超过这个:cscf->max_message,默认1048576即1M,这个对大码率视频可能需要配置。

解析完头,就得看body是否足够了:

     size = b->last - b->pos; // buffer的数据,134

     fsize = h->mlen - st->len; // 消息还没有下载的长度。

这个时候,还需要更多的数据:

     if (fsize > s->in_chunk_size) {

          st->len += s->in_chunk_size;

          b->last = b->pos + s->in_chunk_size;

这样就算出了还需要收多少数据:

     (gdb) p b->end-b->last

      $198 = 6

然后会接着收,一直收完为止。

RTMP处理包

ngx_rtmp_recv收到一个包后,调用ngx_rtmp_receive_message处理,这个函数根据消息的类型来找到对应的handler:

      evhs = &cmcf->events[h->type];

     evh = evhs->elts;

     (*evh)(s, h, in)

消息处理的钩子函数的初始化是在ngx_rtmp_init_event_handlers,譬如AMF0的消息处理在:

    static size_t               amf_events[] = {

        NGX_RTMP_MSG_AMF_CMD,

        NGX_RTMP_MSG_AMF_META,

        NGX_RTMP_MSG_AMF_SHARED,

        NGX_RTMP_MSG_AMF3_CMD,

        NGX_RTMP_MSG_AMF3_META,

        NGX_RTMP_MSG_AMF3_SHARED

    };

    /* init amf events */

    for(n = 0; n < sizeof(amf_events) / sizeof(amf_events[0]); ++n) {

        eh = ngx_array_push(&cmcf->events[amf_events
]);

        *eh = ngx_rtmp_amf_message_handler;

    }

在amf的处理函数中,还得解析body内容后再调用其他钩子,来处理特定的amf0函数:

     ph = ch->elts;

     (*ph)(s, h, in) // ngx_rtmp_cmd_connect_init

也就是说,这个处理实际上是最难的,虽然一个消息能被一个处理函数处理,可是这个函数不知道很多状态信息。

不过nginx-rtmp这样还是处理的很漂亮,非常Nice。

其实同步的逻辑是这样写:

          MessagePacket* packet = NULL;

          if((ret = ctx_core->rtmp->RecvMessage(&packet)) != ErrorCode::Success){

               return ret;

          }

          if(packet is play){

               return process_play();

          }

          if(packet is publish){

               return process_publish();

          }

纯异步的方式是超级复杂,特别是加上了回源pull和push两种方式后。

能简化纯异步的技术有:

1. 将收包和解析处理分开,确认消息收到了才开始解析。

2. 异步的ATM模型,读写分开,读完才写,写完才读。

3. 处理时使用handler,譬如handshake有handler,amf0有handler,amf0-connect有handler。

同步方式是要简单很多。

Edge-relay-pull模式

nginx-rtmp的边缘叫做relay,即中继模式。有pull和push,即上行推流和下行播放。

实际上FMS是不区分这两个的,中继时只需要指定上游服务器地址即可。

收到请求前的调用:

ngx_rtmp_relay_create_app_conf,初始化app的配置。

ngx_rtmp_relay_push_pull,解析pull的命令和参数。

ngx_rtmp_relay_merge_app_conf

ngx_rtmp_relay_postconfiguration

ngx_rtmp_relay_init_process

连接上游的调用:

ngx_rtmp_relay_pull

ngx_rtmp_relay_create_local_ctx

ngx_rtmp_relay_create_remote_ctx

ngx_rtmp_relay_create_connection

ngx_event_connect_peer(堆栈如下)

#0  ngx_event_connect_peer (pc=0xa53ed8) at src/event/ngx_event_connect.c:25

#1  0x00000000004cb815 in ngx_rtmp_relay_create_connection (cctx=0x7fffffffdd60, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:468

#2  0x00000000004cba2a in ngx_rtmp_relay_create_remote_ctx (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:527

#3  0x00000000004cbce3 in ngx_rtmp_relay_create (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0, create_publish_ctx=0x4cb9d5 <ngx_rtmp_relay_create_remote_ctx>, create_play_ctx=0x4cba2c <ngx_rtmp_relay_create_local_ctx>)

    at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:607

#4  0x00000000004cbde9 in ngx_rtmp_relay_pull (s=0xa665b0, name=0x7fffffffde80, target=0xa4fef0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:630

#5  0x00000000004cc226 in ngx_rtmp_relay_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_relay_module.c:738

#6  0x00000000004d0df5 in ngx_rtmp_enotify_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_enotify_module.c:412

#7  0x00000000004d4045 in ngx_rtmp_notify_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_notify_module.c:1407

#8  0x00000000004d5b6d in ngx_rtmp_log_play (s=0xa665b0, v=0x719da0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_log_module.c:844

#9  0x00000000004b8f7c in ngx_rtmp_cmd_play_init (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_cmd_module.c:569

#10 0x00000000004b6d2b in ngx_rtmp_amf_message_handler (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_receive.c:436

#11 0x00000000004b28f3 in ngx_rtmp_receive_message (s=0xa665b0, h=0xa50dc0, in=0xa51cc0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:791

#12 0x00000000004b1d19 in ngx_rtmp_recv (rev=0xa7b440) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:456

#13 0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=59762, flags=1) at src/event/modules/ngx_epoll_module.c:683

#14 0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249

#15 0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315

#16 0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409

连接后,进行握手:

     ngx_rtmp_relay_create_connection

     ngx_event_connect_peer

     ngx_rtmp_init_session

     ngx_rtmp_client_handshake

握手完成时,回调relay的函数:

     ngx_epoll_process_events

     ngx_rtmp_handshake_recv

     ngx_rtmp_handshake_recv

     ngx_rtmp_handshake_send

     ngx_rtmp_handshake_done

     ngx_rtmp_fire_event

     ngx_rtmp_relay_handshake_done

     ngx_rtmp_relay_send_connect

正常的握手是没有回调的,就进入了ngx_rtmp_cycle,相当于relay在这个地方进行了hook:

ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)

     ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE)

     ngx_rtmp_cycle(s);

异步发送多个包

回源连接connect时,连续发送了几个包,对于异步的socket,如何做到的呢?

ngx_rtmp_relay_send_connect

    return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK

        || ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK

        || ngx_rtmp_send_amf(s, &h, out_elts,

            sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK

        ? NGX_ERROR

        : NGX_OK;

分析一个:ngx_rtmp_send_chunk_size

     ngx_rtmp_relay_send_connect

     ngx_rtmp_send_chunk_size

     ngx_rtmp_send_shared_packet

     ngx_rtmp_send_message

     ngx_rtmp_send

这个函数是先将包写到buffer:

ngx_int_t

ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)

{

    return ngx_rtmp_send_shared_packet(s,

           ngx_rtmp_create_chunk_size(s, chunk_size));

}

ngx_chain_t *

ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) 

其中,ngx_rtmp_create_chunk_size就是将包写到ngx_chain_t*,调用ngx_rtmp_prepare_message。

发送多包的逻辑是:

     if (!s->connection->write->active) {

          ngx_rtmp_send(s->connection->write);

     }

其中,ngx_rtmp_send就是一个没有返回值的函数,它主要是启动ATM到写模式,若写不完,在可写时会自动回调这个函数。

所以发送多包,实际上就是把包发送到ngx_rtmp_send(ngx_event_t *wev)的wev的chain里面,相当于缓冲区,它会保证将它发送完。

至于ngx_rtmp_send_message,根本就不会处理这个发送错误,它打交道的是缓冲区。

不过这样比起同步来,也足够麻烦了。

简化的地方:将发送和组包分离,组包到chain,可以将多个包进入队列,发送只负责发就好了。

同样,发送成功后,需要调用特殊的回调函数。

譬如处理amf的_result消息时:

ngx_rtmp_relay_postconfiguration

    ch = ngx_array_push(&cmcf->amf);

    ngx_str_set(&ch->name, "_result");

    ch->handler = ngx_rtmp_relay_on_result;

这样可以回调:

     ngx_rtmp_recv

     ngx_rtmp_receive_message

     ngx_rtmp_amf_message_handler

     ngx_rtmp_relay_on_result

     ngx_rtmp_relay_send_create_stream

整个异步的状态,无处不在。

边缘数据

边缘处理数据包的调用是:

     ngx_epoll_process_events

     ngx_rtmp_recv

     ngx_rtmp_receive_message

     ngx_rtmp_codec_av

回调是在ngx_rtmp_codec_postconfiguration中设置:

     h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);

     *h = ngx_rtmp_codec_av;

     h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);

     *h = ngx_rtmp_codec_av;

其中,回源连接的ngx_rtmp_receive_message会调用的handler包括:

     ngx_rtmp_codec_av //解码av。

     ngx_rtmp_live_av // 转发给所有客户端

ngx_rtmp_live_av的调用是:

#0  ngx_rtmp_send (wev=0xa95450) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:492

#1  0x00000000004b283f in ngx_rtmp_send_message (s=0xa665b0, out=0xab33e4, priority=0) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:736

#2  0x00000000004bef33 in ngx_rtmp_live_av (s=0xa53f88, h=0xaaf4a0, in=0xab1400) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_live_module.c:957

#3  0x00000000004b28f3 in ngx_rtmp_receive_message (s=0xa53f88, h=0xaaf4a0, in=0xab1400) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:791

#4  0x00000000004b1d19 in ngx_rtmp_recv (rev=0xa7b4a8) at /home/winlin/nginx-rtmp-module-1.0.4/ngx_rtmp_handler.c:456

#5  0x0000000000435d26 in ngx_epoll_process_events (cycle=0xa4cca0, timer=49151, flags=1) at src/event/modules/ngx_epoll_module.c:683

#6  0x0000000000428f02 in ngx_process_events_and_timers (cycle=0xa4cca0) at src/event/ngx_event.c:249

#7  0x0000000000433181 in ngx_single_process_cycle (cycle=0xa4cca0) at src/os/unix/ngx_process_cycle.c:315

#8  0x00000000004064c8 in main (argc=1, argv=0x7fffffffe5d8) at src/core/nginx.c:409

其中ngx_rtmp_live_av,也是调用ngx_rtmp_send_message,发送到缓冲区,然后发起发送请求而已,不必等到所有数据都发出去了。

     ngx_rtmp_live_av

     /* broadcast to all subscribers */

是在live模块中实现的这个转发,即subscribe模式。

也就是说,这个回源连接收到数据后,同时转发给了其他socket(准确讲是转发给了各个客户端的chain,然后异步发送)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: