您的位置:首页 > 其它

Tor源码分析五 -- 客户端执行流程(libevent调度)

2013-05-09 21:46 495 查看
  上一源码分析的小节中,已经将系统的主要流程启动过程介绍完毕。实际上,系统在如此启动之后,就可以通过正确的流程将Tor系统完整的启动起来,为用户提供应用程序代理服务,以达到匿名通信的目标。本节的目标,不是为了详细介绍系统初始化后的正确启动流程,而是介绍系统是如何通过Libevent的调度实现成功启动的。也就是说,本节的重点在于说明Libevent调度机制在Tor系统之中的应用。

  上节当中也简要介绍了Libevent的三种应用方式:信号处理;定时处理;网络套接字处理。这三种处理内容对于Libevent来说均是事件,只是事件的属性与回调函数不同而已。Libevent根据自身的调度机制,优先级队列等,来进行事件的调度。事件大多在被创建之时就被加入到Libevent事件集之中,Libevent一旦发现事件被激活,就会执行事件相应的回调函数。而值得说明的是,激活事件的方式根据事件类别的不同而有所区别:

  1)signal - 信号处理事件:挂起的信号处理事件在进程接收到相应信号时被激活,信号的发送者可以是用户,也可以是另外一个进程等;

  2)time - 定时处理事件:挂起的定时处理事件在定时器到时时被激活,定时器的时间设定会在定时处理事件生成时就指定;

  3)socket - 网络套接字处理事件:挂起的网络套接字处理事件主要分两种,分别为读事件和写事件;读事件在相应套接字有数据可读时被激活;写事件比较特殊,它总是处于激活状态的,也就是说,只要套接字是正常可写的,写事件就会不断被激活;因为写事件的情况特殊,所以Tor系统对其的处理是动态地增删,而不是像读事件一样一直放置于事件池内部。利用Libevent的事件池输出调试函数event_base_dump_events可以看到事件池内事件的变动和简单状态,是调试Tor系统的重要手段。(我们讨论非阻塞的读写事件,若读写事件是阻塞的,则Tor系统的单线程执行很容易就会被阻塞,系统就无法正常运行)

0. 系统中的普通事件

  此处暂且介绍客户端系统的事件集,服务器端的事件集可以由此作为依据进行猜测。

  0.1. 监听事件(socket) -- 系统初始化阶段 tor_init()

  客户端系统中的监听事件主要分为两种:socket监听;control监听。

  以上两种监听事件的目的是不同的:socket监听是监听Tor提供给用户程序的应用端口,接收的是应用请求;control监听是监听Tor提供给用户的控制端口,通过向控制端口传送相应的命令,可以有效控制Tor系统的执行,该部分的控制规则可以在文件control-spec.txt中找到详细的说明。

  监听事件的添加是在监听连接创建之后马上执行的,主要处理代码在初始化阶段,入口函数为options_act_reversible。其后主要是通过connection_listener_new函数中的处理来完成事件的新建和添加。connection_start_reading函数完成了事件向事件集中添加的具体操作。在完成了添加之后,一旦进入到主循环的Libevent循环,就将控制权交给Libevent,通过Libevent的机制进行调度,执行提供的回调函数。

  此处值得先一提的是,对于所有连接的读事件和写事件,执行的回调函数分别相同:conn_read_callback,conn_write_callback。在后期的分析之中,会着重介绍这两个函数对网络套接字事件所执行的具体处理。

  0.2. 信号事件(signal) -- 系统无限循环开启前 do_main_loop()

  与大多数的Linux软件编程类似,Tor系统进程需要准确地控制进程以接收信号完成相应操作。信号的处理是件相对繁琐的事情,我们此处略去信号的具体处理细节,而把注意力集中在Tor系统是如何利用Libevent来进行信号控制。其实其过程也相对简单,只利用了一个函数:

/** Set up the signal handlers for either parent or child. */
void
handle_signals(int is_parent)
{
#ifndef _WIN32 /* do signal stuff only on Unix */
  int i;

  // 定义需要处理的信号
  static const int signals[] = {
    SIGINT,  /* do a controlled slow shutdown */
    SIGTERM, /* to terminate now */
    SIGPIPE, /* otherwise SIGPIPE kills us */
    SIGUSR1, /* dump stats */
    SIGUSR2, /* go to loglevel debug */
    SIGHUP,  /* to reload config, retry conns, etc */
#ifdef SIGXFSZ
    SIGXFSZ, /* handle file-too-big resource exhaustion */
#endif
    SIGCHLD, /* handle dns/cpu workers that exit */
    -1 };

  //信号事件集
  static struct event *signal_events[16]; /* bigger than it has to be. */

  if (is_parent) { // 父进程信号处理交由Libevent进行
    for (i = 0; signals[i] >= 0; ++i) {
      signal_events[i] = tor_evsignal_new(
                       tor_libevent_get_base(), signals[i], signal_callback,
                       (void*)(uintptr_t)signals[i]);
      if (event_add(signal_events[i], NULL))
        log_warn(LD_BUG, "Error from libevent when adding event for signal %d",
                 signals[i]);
    }
  } else { // 子进程信号处理由系统默认信号处理机制sigaction完成
    struct sigaction action;
    action.sa_flags = 0;
    sigemptyset(&action.sa_mask);
    action.sa_handler = SIG_IGN;
    sigaction(SIGINT,  &action, NULL);
    sigaction(SIGTERM, &action, NULL);
    sigaction(SIGPIPE, &action, NULL);
    sigaction(SIGUSR1, &action, NULL);
    sigaction(SIGUSR2, &action, NULL);
    sigaction(SIGHUP,  &action, NULL);
#ifdef SIGXFSZ
    sigaction(SIGXFSZ, &action, NULL);
#endif
  }
#else /* MS windows */
  (void)is_parent;
#endif /* signal stuff */
}
  总的说来,上述函数执行完毕之后,系统会在Libevent事件列表中挂入信号处理事件。信号的处理从此就交由Libevent自动执行。具体是如何处理的,则需要进一步研究回调函数signal_callback。而这并不是本文的重点,所以此处略去。

  0.3.定时事件(time) -- 系统无限循环开启前 do_main_loop()

  Tor系统为了维护系统重要模块和组件,需要进行每秒一次的系统维护工作。同时,系统中的流量控制模块也需要进行定期的流量审查和令牌填充等操作。所以,定时事件是Tor系统必须要处理的重要事件。定时事件根据上面的描述,有主要分为两个:每秒一次的系统定时维护事件;定期的流量控制事件。

/* set up once-a-second callback. */
  if (! second_timer) { //每秒执行的系统维护定时器事件
    struct timeval one_second;
    one_second.tv_sec = 1;
    one_second.tv_usec = 0;

    second_timer = periodic_timer_new(tor_libevent_get_base(),
                                      &one_second,
                                      second_elapsed_callback, // 回调处理函数
                                      NULL);
    tor_assert(second_timer);
  }

#ifndef USE_BUFFEREVENTS
  if (!refill_timer) { // 定期执行的系统流量控制定时器事件
    struct timeval refill_interval;
    int msecs = get_options()->TokenBucketRefillInterval; // 默认为100msecs

    refill_interval.tv_sec =  msecs/1000;
    refill_interval.tv_usec = (msecs%1000)*1000;

    refill_timer = periodic_timer_new(tor_libevent_get_base(),
                                      &refill_interval,
                                      refill_callback, // 回调处理函数
                                      NULL);
    tor_assert(refill_timer);
  }
#endif
  上述的代码已经将两个定时器事件的定义和创建过程描述清楚。periodic_timer_new函数内部会将新建的事件加入事件集,由Libevent进行调度处理。

  0.4. OR连接读写事件(socket) -- 系统无限循环开启后 do_main_loop()

  OR连接是Tor系统中结点间交互最重要的连接。其下层是结点之间形成的TLS连接。OR连接的最主要目标就是形成结点之间稳定的TLS链路,以保障后期通信的安全性和稳定性。所以,OR连接最主要的读写事件在非阻塞的TLS握手,以及OR握手的过程中发生。当然,还有其后所有数据流的流入和流出。但是,读写事件最复杂的处理则是处于TLS握手阶段。因为,我们需要进行的TLS握手是非阻塞形式的,也就是说在TLS握手的过程中,我们需要手动的控制OR连接的写事件。若写事件保持在事件集中,写事件会持续被激活,那么就无法顺利完成读写交替的握手过程。因为这样的种种原因,我们可以猜测,OR连接的读写事件回调函数中,一定有特殊的判断或相应机制:读事件处理函数中,当发现读完需要写的时候,就将写事件添加到事件集中;写事件处理函数中,当发现写完需要读的时候,就将写事件从事件集中删除。事实上,Tor系统也是这么做的。在读写回调函数中的握手控制函数内,我们可以分别很容易地找到connection_start_writing,connection_stop_writing函数。这两个函数就是用于控制写事件在事件集中的出入。当然相应的还有控制读事件在事件集中出入的函数,但是读事件的增删控制除OR连接建立和删除时候使用之外,一般更多的是被linked连接所使用,下文会详细介绍。

  这里猜测TLS握手过程中的写事件增删情况如下:

  1)握手初期:状态可读,可写;可写被激活,客户端发起TLS连接请求TLS_connect(),关闭可写;

  2)TLS握手中:状态可读;客户端接收到服务器响应,判断握手到达的阶段,开启可写;

  3)TLS握手中:状态可读,可写;可写被激活,客户端继续发出相应握手包TLS_connect(),关闭可写;

  4)TLS握手中:状态可读;客户端接收到服务器响应,判断握手到达的阶段,开启可写;

  …………

  n)TLS握手中:状态可读;客户端接收到服务器响应,TLS握手完成,开启OR握手,开启可写;

  n+1)OR握手中:状态可读,可写;可写被激活,客户端发起OR连接握手过程,发出VERSION CELL,关闭可写;

  n+2)OR握手中:状态可读;客户端接收到服务器响应:VERSION CELL, CERT CELL, AUTH-CHALLENGE CELL, NETINFO CELL,开启可写;

  n+3)OR握手中:状态可读,可写;可写被激活,客户端发起OR连接握手过程,发出NETINFO CELL,关闭可写;

  n+4)握手结束:状态可读;当有数据传向服务器端的时候开启可写。

  实际上,握手过程中的TLS握手过程中没有这么大费周折的开关写事件。TLS_connect()函数会在握手时写出握手数据而无需显式激活写事件。下面是握手信息:

[debug] {NET} conn_write_callback(): socket 552 wants to write. // TLS Handshake 1 - write
 [debug] {HANDSHAKE} connection_or_finished_connecting(): OR connect() to router at 154.35.32.5:443 finished.
 [notice] {CONTROL} Bootstrapped 10%: Finishing handshake with directory server.
 [debug] {NET} rectify_client_ciphers(): List was: ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-CAMELLIA256-SHA:DHE-DSS-CAMELLIA256-SHA:DHE-RSA-AES256-SHA:DHE-DSS-AES256-SHA:ECDH-RSA-AES256-SHA:ECDH-ECDSA-AES256-SHA:CAMELLIA256-SHA:AES256-SHA:ECDHE-ECDSA-RC4-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-RC4-SHA:ECDHE-RSA-AES128-SHA:DHE-RSA-CAMELLIA128-SHA:DHE-DSS-CAMELLIA128-SHA:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA:ECDH-RSA-RC4-SHA:ECDH-RSA-AES128-SHA:ECDH-ECDSA-RC4-SHA:ECDH-ECDSA-AES128-SHA:SEED-SHA:CAMELLIA128-SHA:RC4-MD5:RC4-SHA:AES128-SHA:ECDHE-ECDSA-DES-CBC3-SHA:ECDHE-RSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:EDH-DSS-DES-CBC3-SHA:ECDH-RSA-DES-CBC3-SHA:ECDH-ECDSA-DES-CBC3-SHA:DES-CBC3-SHA:!SSLv2
 [debug] {NET} rectify_client_ciphers(): Cipher 0: 300c00a ECDHE-ECDSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Cipher 1: 300c014 ECDHE-RSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Cipher 2: 3000039 DHE-RSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Cipher 3: 3000038 DHE-DSS-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Cipher 4: 300c00f ECDH-RSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Cipher 5: 300c005 ECDH-ECDSA-AES256-SHA
 ......
 [debug] {NET} rectify_client_ciphers(): Found cipher ECDHE-ECDSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Found cipher ECDHE-RSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Completely omitting unsupported cipher DHE-RSA-CAMELLIA256-SHA
 [debug] {NET} rectify_client_ciphers(): Completely omitting unsupported cipher DHE-DSS-CAMELLIA256-SHA
 [debug] {NET} rectify_client_ciphers(): Found cipher DHE-RSA-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Found cipher DHE-DSS-AES256-SHA
 [debug] {NET} rectify_client_ciphers(): Found cipher ECDH-RSA-AES256-SHA
 ......
 [notice] {NET} We weren't able to find support for all of the TLS ciphersuites that we wanted to advertise. This won't hurt security, but it might make your Tor (if run as a client) more easy for censors to block.
 [notice] {NET} To correct this, use a more recent OpenSSL, built without disabling any secure ciphers or features.
 ......
 [debug] {HANDSHAKE} connection_tls_start_handshake(): starting TLS handshake on fd 552
 [debug] {HANDSHAKE} tor_tls_handshake(): About to call SSL_connect on 022530A8 (before/accept initialization)
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state before/connect initialization [type=16,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state before/connect initialization [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv2/v3 write client hello A [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv2/v3 read server hello A [type=4098,val=-1].
 [debug] {HANDSHAKE} tor_tls_handshake(): After call, 022530A8 was in state SSLv2/v3 read server hello A
 [debug] {OR} connection_tls_continue_handshake(): wanted read
 [warn] {GENERAL} Notice : connection_stop_writing !!!!!
 [debug] {HANDSHAKE} tor_tls_handshake(): About to call SSL_connect on 022530A8 (SSLv2/v3 read server hello A)
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv2/v3 read server hello A [type=4098,val=-1].
 [debug] {OR} connection_tls_continue_handshake(): wanted read

 [debug] {NET} conn_read_callback(): socket 552 wants to read. // TLS Handshake 2 - read
 [debug] {HANDSHAKE} tor_tls_handshake(): About to call SSL_connect on 022530A8 (SSLv2/v3 read server hello A)
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read server hello A [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read server certificate A [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read server key exchange A [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read server done A [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 write client key exchange A [type=4097,val=1]. // write
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 write change cipher spec A [type=4097,val=1].  // write
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 write finished A [type=4097,val=1].            // write
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 flush data [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read finished A [type=4098,val=-1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read finished A [type=4098,val=-1].
 [debug] {HANDSHAKE} tor_tls_handshake(): After call, 022530A8 was in state SSLv3 read finished A
 [debug] {OR} connection_tls_continue_handshake(): wanted read

 [debug] {NET} conn_read_callback(): socket 552 wants to read. // TLS Handshake 2 - read
 [debug] {HANDSHAKE} tor_tls_handshake(): About to call SSL_connect on 022530A8 (SSLv3 read finished A)
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSLv3 read finished A [type=4097,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSL negotiation finished successfully [type=32,val=1].
 [debug] {HANDSHAKE} tor_tls_debug_state_callback(): SSL 0233E6C8 is now in state SSL negotiation finished successfully [type=4098,val=1].
 [debug] {HANDSHAKE} tor_tls_handshake(): After call, 022530A8 was in state SSL negotiation finished successfully
 [debug] {HANDSHAKE} tor_tls_finish_handshake(): Server sent back a single certificate; looks like a v2 handshake on 022530A8.
 [info] {OR} connection_tls_continue_handshake(): Client got a v3 cert!  Moving on to v3 handshake.
 [debug] {OR} connection_or_process_cells_from_inbuf(): 552: starting, inbuf_datalen 0 (0 pending in tls object).

 [debug] {NET} conn_write_callback(): socket 552 wants to write. // OR Handshake write - VERSION CELL
 [debug] {NET} flush_chunk_tls(): flushed 7 bytes, 0 ready to flush, 0 remain.
 [debug] {GENERAL} connection_handle_write_impl(): After TLS write of 7: 979 read, 401 written
 [debug] {NET} global_read_bucket now 10485760.
 [debug] {NET} global_write_bucket now 10485760.
 [debug] {NET} global_relayed_read_bucket now 10485760.
 [debug] {NET} global_relayed_write_bucket now 10485760.

 [debug] {NET} conn_read_callback(): socket 552 wants to read. // OR Handshake read - VERSION CELL, CERT CELL, AUTH_CHALLENGE CELL, NETINFO CELL
 [debug] {NET} connection_read_to_buf(): 552: starting, inbuf_datalen 0 (0 pending in tls object). at_most 16384.
 [debug] {GENERAL} connection_read_to_buf(): After TLS read of 1459: 1530 read, 0 written
 [debug] {OR} connection_or_process_cells_from_inbuf(): 552: starting, inbuf_datalen 1459 (0 pending in tls object).
 [info] {OR} command_process_versions_cell(): Negotiated version 3 with 154.35.32.5:443; Waiting for CERTS cell
 [debug] {OR} connection_or_process_cells_from_inbuf(): 552: starting, inbuf_datalen 1450 (0 pending in tls object).
 [info] {OR} command_process_certs_cell(): Got some good certificates from 154.35.32.5:443: Authenticated it.
 [debug] {OR} connection_or_process_cells_from_inbuf(): 552: starting, inbuf_datalen 553 (0 pending in tls object).
 [debug] {OR} connection_or_process_cells_from_inbuf(): 552: starting, inbuf_datalen 512 (0 pending in tls object).
 [debug] {CIRC} circuit_n_conn_done(): or_conn to $CF6D0AAFB385BE71B8E111FC5CFF4B47923733BC/154.35.32.5, status=1
 [debug] {CIRC} circuit_n_conn_done(): Found circ, sending create cell.
 [debug] {CIRC} circuit_send_next_onion_skin(): First skin; sending create cell.
 [notice] {CONTROL} Bootstrapped 15%: Establishing an encrypted directory connection.
 [debug] {CIRC} circuit_deliver_create_cell(): Chosen circID 4372.
 [debug] {GENERAL} append_cell_to_circuit_queue(): Made a circuit active.
 [info] {CIRC} circuit_send_next_onion_skin(): First hop: finished sending CREATE_FAST cell to '<unnamed>'
 [info] {OR} command_process_netinfo_cell(): Got good NETINFO cell from 154.35.32.5:443; OR connection is now open, using protocol version 3. Its ID digest is CF6D0AAFB385BE71B8E111FC5CFF4B47923733BC
 [debug] {OR} connection_or_process_cells_from_inbuf(): 552: starting, inbuf_datalen 0 (0 pending in tls object).

 [debug] {NET} conn_write_callback(): socket 552 wants to write. // OR Handshake write - NETINFO CELL
 [debug] {NET} flush_chunk_tls(): flushed 512 bytes, 0 ready to flush, 0 remain.
 [debug] {GENERAL} connection_handle_write_impl(): After TLS write of 512: 0 read, 586 written
 [debug] {GENERAL} connection_or_flush_from_first_active_circuit(): Made a circuit inactive.

 [debug] {NET} conn_write_callback(): socket 552 wants to write. // Circuit Handshake write - CREATE CELL
 [debug] {NET} flush_chunk_tls(): flushed 512 bytes, 0 ready to flush, 0 remain.
 [debug] {GENERAL} connection_handle_write_impl(): After TLS write of 512: 0 read, 586 written
 [debug] {NET} global_read_bucket now 10485760.
 [debug] {NET} global_write_bucket now 10485760.
 [debug] {NET} global_relayed_read_bucket now 10485760.
 [debug] {NET} or_conn->write_bucket now 10485760.
  由以上所有调试信息就可以清晰地看到整个握手过程的全貌,此处也仅是概括性地描述握手的主要流程。关于握手过程中具体牵涉到的函数操作和细节处理,我们会在后面适当的时候给出。

1. 系统中的特殊事件

  1.1. Linked连接读写事件(linked) -- 系统无限循环开启后 do_main_loop()

  除了上述几种事件之外,还有一类重要连接所处理的事件 -- Linked连接读写事件。先谈谈Linked连接:系统中的连接类型非常之多,连接的一个重要职能就是相互交换数据。OR连接之间交换数据的方式是通过socket通信。socket通信可以实现在两个OR连接之间相互的交换两个结点之间的数据。但是,有的时候,我们并非需要socket通信这么繁琐的方式实现数据交换。我们可能仅仅需要在一个结点内部的不同结构之间交换数据。或者说,我们可能想要实现内部连接的功能。例如说系统中的DIR连接与AP连接之间的关系,便是Linked连接的关系。DIR与AP连接之间需要相互交换数据,而他们两者均是Tor系统自身产生的,难道还要通过socket方式来实现数据交换吗?这显然是不科学的。针对这样的情况,就出现了Linked连接的概念。将两个连接Link起来,直接用指针实现相互指向,很简单的就可以在有数据要交换之时找到数据流出的对象连接。但是,此时又出现一些问题:如何控制读写?数据流动的时机是什么呢?

  实际上,Tor系统的Linked连接是没有写事件的,只有读事件。例如,A,B两个连接是互连的连接。那么A连接的读,就是B连接的写;相反的,A连接的写,也就是B连接的读。所以系统中针对Linked连接的写事件处理,会转化为与其相应连接的读事件进行处理。而读事件处理就正常处理。具体源码操作如下:

/** Tell the main loop to start notifying <b>conn</b> of any write events. */
void
connection_start_writing(connection_t *conn) // 写事件
{
  ......

  if (conn->linked) { // Linked连接类的写事件激活处理
    conn->writing_to_linked_conn = 1;
    if (conn->linked_conn &&
        connection_should_read_from_linked_conn(conn->linked_conn))
      connection_start_reading_from_linked_conn(conn->linked_conn); // 激活相应连接的读事件
  } else { // 普通连接类的写事件激活处理
    if (event_add(conn->write_event, NULL)) // 激活本连接的写事件
      log_warn(LD_NET, "Error from libevent setting write event state for %d "
               "to watched: %s",
               (int)conn->s,
               tor_socket_strerror(tor_socket_errno(conn->s)));
  }
}

/** Tell the main loop to start notifying <b>conn</b> of any read events. */
void
connection_start_reading(connection_t *conn) // 读事件
{
  ......

  if (conn->linked) { // Linked连接类的读事件激活处理
    conn->reading_from_linked_conn = 1;
    if (connection_should_read_from_linked_conn(conn))
      connection_start_reading_from_linked_conn(conn); // 激活本连接的读事件
  } else { // 普通连接类的读事件激活处理
    if (event_add(conn->read_event, NULL)) // 激活本连接的读事件
      log_warn(LD_NET, "Error from libevent setting read event state for %d "
               "to watched: %s",
               (int)conn->s,
               tor_socket_strerror(tor_socket_errno(conn->s)));
  }
}
  从以上代码我们知道,Linked连接只有读事件。那么第一个问题就转化为如何控制读。Tor系统控制linked连接的读也是利用Libevent的调度系统。但是与普通的事件激活方式有较大区别。普通的事件,要么通过信号,定时器激活,要么socket可读可写时被激活;而Linked连接事件没有上述事件的任何特征,不能通过信号,定时器激活,更不需要用socket可读可写激活。事实上,Tor系统是显式激活Linked连接读事件的。我们回忆主循环中的代码:
/** Tor main loop. */
/* static */ int
do_main_loop(void)
{
  ......
  for (;;) {
    ......
    /* All active linked conns should get their read events activated. */
    SMARTLIST_FOREACH(active_linked_connection_lst, connection_t *, conn,
                      event_active(conn->read_event, EV_READ, 1));
    called_loop_once = smartlist_len(active_linked_connection_lst) ? 1 : 0;

    update_approx_time(time(NULL));

    /* poll until we have an event, or the second ends, or until we have
     * some active linked connections to trigger events for. */
    loop_result = event_base_loop(tor_libevent_get_base(),
                                  called_loop_once ? EVLOOP_ONCE : 0);
    ......
  }
}
  系统维护着Linked连接的一个全局链表,循环内一旦发现该链表中有连接,则会直接激活其读事件,然后开始事件池的一次不阻塞调度。看到这里,大致就可以了解系统处理Linked连接读事件的方法。那就是,将读事件直接插入被激活事件池的顶端,在下轮调度中直接执行。而相应的被激活的读事件在这次调度完之后,就会被删除,不会保留在等待事件池之中。可是,第一轮调度开始的时候,并没有被激活的Linked连接读事件,所以是非阻塞的无限调度。那么又是如何结束无限调度,进行后面的循环呢?我们在Linked连接的读开启函数中找到了答案:
/** Helper: Tell the main loop to begin reading bytes into <b>conn</b> from
 * its linked connection, if it is not doing so already.  Called by
 * connection_start_reading and connection_start_writing as appropriate. */
static void
connection_start_reading_from_linked_conn(connection_t *conn)
{
  tor_assert(conn);
  tor_assert(conn->linked == 1);

  if (!conn->active_on_link) { // 当前Linked连接是否不处于全局Linked激活列表中?
    conn->active_on_link = 1;
    smartlist_add(active_linked_connection_lst, conn);
    if (!called_loop_once) { // 当前Libevent调度是否是阻塞的?
      /* This is the first event on the list; we won't be in LOOP_ONCE mode,
       * so we need to make sure that the event_base_loop() actually exits at
       * the end of its run through the current connections and lets us
       * activate read events for linked connections. */
      struct timeval tv = { 0, 0 };
      tor_event_base_loopexit(tor_libevent_get_base(), &tv); // 结束本次Libevent调度!
    }
  } else {
    tor_assert(smartlist_isin(active_linked_connection_lst, conn));
  }
}
  这样,我们就分析出了整个Linked读事件的控制过程:读事件是如何被激活和调度的。值得注意的是,Linked连接也是连接的一种,所以其读回调函数与其他所有连接的读回调函数共用conn_read_callback函数。而关于读事件激活时机的问题,我们仅在此处做简单说明:当有某个操作要将数据放入到某Linked连接之时,该操作也应该开启此Linked连接的写,也就是对应连接的读事件。

未完待续
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: