您的位置:首页 > Web前端 > React

[ZeroMQ] libzmq 源码阅读 之 Reactor机制(mailbox, event)

2016-07-20 20:23 477 查看

[ZeroMQ] libzmq 源码阅读 之 Reactor机制(mailbox, event)

ZeroMQ libzmq 源码阅读 之 Reactor机制mailbox event
信号员 signaler
进程间通信

signaler 实现

多路复用器poller 监听 socket

mailbox

IO Thread

总结

zmq在创建的时候回启动两类线程,一是应用线程(Application thread),二是I/O线程。那么这些线程之间是如何协作,如何通信的呢?

信号员 signaler

signaler 负责在进程间传递信号。由于Mac属于BSD系统,所以进程间通信使用
socketpair
来实现。

进程间通信

在UNIX中,socket 一开始是用于网络通信的,后来也应用于进程间通信。由于是本地环回,所以就不需要绑定IP地址不需要监听连接了。直接设定一个套接字对儿。

#include <socket.h>
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);


可以认为这是一个全双工的管道。在端口
sv[0]
发送,在
sv[1]
就会收到;反之,
sv[1]
发,
sv[0]
收。

signaler 实现

signaler_t
中有两个fd成员
w
r
,他们代表两个socket。实际并没有用上全双工,而是一个读,一个写。

class signaler_t
{
public:
...
fd_t get_fd () const;
void send ();
int wait (int timeout_);
void recv ();
int recv_failable ();

private:
//  Creates a pair of file descriptors that will be used
//  to pass the signals.
static int make_fdpair (fd_t *r_, fd_t *w_);

//  Underlying write & read file descriptor
//  Will be -1 if we exceeded number of available handles
fd_t w;
fd_t r;
...
};

// 文件描述符返回的是 r,也就是接收端。
zmq::fd_t zmq::signaler_t::get_fd () const
{
return r;
}

void zmq::signaler_t::send() {
...
// 用端口 w 发送信号
::send(w, ...);
...
}

void zmq::signaler_t::recv() {
...
// 用端口 r 接收信号
::recv(r, ...);
...
}

int zmq::signaler_t::make_fdpair(fd_t *r, fd_t *w){
...
int sv [2];
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
...
*w_ = sv [0];
*r_ = sv [1];
...
}


多路复用器poller 监听 socket

在zmq中,端口监听其实是由**I/O多路复用器**poller来实现的。
poller_t
是I/O多路复用的一个抽象类,具体实现的的话,zmq根据系统平台选择poll, select, epoll, kqueue等。在此,我们暂且不管这个I/O多路复用是什么鬼。简而言之,如果把socket fd 注册到poller中,poller 就可以监听该socket 是否有数据进来(poll in),或者发出(poll out)。

以kqueue为例,在loop中监听:

void zmq::kqueue_t::loop ()
{
while (!stopping) {
//  Wait for events.
struct kevent ev_buf [max_io_events];
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, ...);
...
for (int i = 0; i < n; i ++) {
poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
...
if (ev_buf [i].filter == EVFILT_READ)
pe->reactor->in_event ();   // 触发事件
}
...
}
}


mailbox

mailbox_t有主要两个成员,一个是管道,另一个是信号员。管道是真正用于收发数据的,而信号员只是负责监视邮箱是否有来信,发信的时候也要发射信号。

class mailbox_t : public i_mailbox
{
public:
...
// 返回相应文件描述符,用于注册到poller中,监听是否有来信。
fd_t get_fd () const;
// 发送cmd
void send (const command_t &cmd_);
// 接收cmd
int recv (command_t *cmd_, int timeout_);
...
private:
...
// 管道,用于收发cmd
cpipe_t cpipe;
//  Signaler to pass signals from writer thread to reader thread.
signaler_t signaler;
...
};


I/O Thread

io_thread_t
中有一个邮箱mailbox和一个事件监听poller。

class io_thread_t : public object_t, public i_poll_events
{
public:
...
//  Launch the physical thread.
void start ();
//  Ask underlying thread to stop.
void stop ();

//  Returns mailbox associated with this I/O thread.
mailbox_t *get_mailbox ();

//  i_poll_events implementation.
void in_event ();
void out_event ();

//  Used by io_objects to retrieve the associated poller object.
poller_t *get_poller ();
...

private:
//  I/O thread accesses incoming commands via this mailbox.
mailbox_t mailbox;
//  Handle associated with mailbox' file descriptor.
poller_t::handle_t mailbox_handle;
//  I/O multiplexing is performed using a poller object.
poller_t *poller;
...
};


io_thread
在创建的时候,需要创建一个
poller
,用于监听事件。

注意到
io_thread_t
继承了
i_poll_events
,所以他自己就可以作为一个Reactor。在构造时,需要向
poller
注册自己的mailbox和自己本身(this):
poller->add_fd (mailbox.get_fd (), this);


我觉得这里非常微妙,在
poller->loop()
中,可以监听到邮箱mailbox是否有消息进来,然后通过透传数据即可触发
in_event()
。正是因为上述
add_fd
中注册了
this
指针,所以可以很容易地回调自己的成员函数
in_event()


构造函数如下:

zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);

mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}


总结

io_thread
创建时向
poller
注册自己的mailbox fd,以及
this
指针,用于事件回调。

ctx_t
中,通过
slots [i] = io_thread->get_mailbox ();
注册I/O线程的邮箱。然后通过
slots [tid_]->send (command_);
可以向该邮箱发送指令。

上述
send()
调用的是mailbox 的
send()
,他会向管道
cpipe
写入数据,并发送信号
signaler->send()


signaler_t
发送信号其实就是用自己的socket
w
发送。因为
w
r
是一对儿,所以
r
会接收到数据。

当有指令进入mailbox时,
poller_t
就会监听到。
poller
监听的是
mailbox->get_fd()
,这个fd正正是上述的
r
。因为
mailbox->get_fd()
返回的是
signaler->get_fd()
,而
signaler->get_fd()
返回的就是
r


监听到邮箱有指令进入后,通过
reactor->in_event()
触发事件。因为
io_thread
在创建时向
poller
注册了自己的
this
指针,而
reactor
正是这个
this
指针,于是实现事件触发。

io_thread
in_event()
中,调用
mailbox->recv()
。于是mailbox向管道cpipe读取指令。至此,io_thread成功收到指令!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息