[ZeroMQ] libzmq 源码阅读 之 Reactor机制(mailbox, event)
2016-07-20 20:23
477 查看
[ZeroMQ] libzmq 源码阅读 之 Reactor机制(mailbox, event)
ZeroMQ libzmq 源码阅读 之 Reactor机制mailbox event信号员 signaler
进程间通信
signaler 实现
多路复用器poller 监听 socket
mailbox
IO Thread
总结
zmq在创建的时候回启动两类线程,一是应用线程(Application thread),二是I/O线程。那么这些线程之间是如何协作,如何通信的呢?
信号员 signaler
signaler 负责在进程间传递信号。由于Mac属于BSD系统,所以进程间通信使用socketpair来实现。
进程间通信
在UNIX中,socket 一开始是用于网络通信的,后来也应用于进程间通信。由于是本地环回,所以就不需要绑定IP地址不需要监听连接了。直接设定一个套接字对儿。#include <socket.h> int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
可以认为这是一个全双工的管道。在端口
sv[0]发送,在
sv[1]就会收到;反之,
sv[1]发,
sv[0]收。
signaler 实现
在signaler_t中有两个fd成员
w和
r,他们代表两个socket。实际并没有用上全双工,而是一个读,一个写。
class signaler_t { public: ... fd_t get_fd () const; void send (); int wait (int timeout_); void recv (); int recv_failable (); private: // Creates a pair of file descriptors that will be used // to pass the signals. static int make_fdpair (fd_t *r_, fd_t *w_); // Underlying write & read file descriptor // Will be -1 if we exceeded number of available handles fd_t w; fd_t r; ... }; // 文件描述符返回的是 r,也就是接收端。 zmq::fd_t zmq::signaler_t::get_fd () const { return r; } void zmq::signaler_t::send() { ... // 用端口 w 发送信号 ::send(w, ...); ... } void zmq::signaler_t::recv() { ... // 用端口 r 接收信号 ::recv(r, ...); ... } int zmq::signaler_t::make_fdpair(fd_t *r, fd_t *w){ ... int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); ... *w_ = sv [0]; *r_ = sv [1]; ... }
多路复用器poller 监听 socket
在zmq中,端口监听其实是由**I/O多路复用器**poller来实现的。poller_t是I/O多路复用的一个抽象类,具体实现的的话,zmq根据系统平台选择poll, select, epoll, kqueue等。在此,我们暂且不管这个I/O多路复用是什么鬼。简而言之,如果把socket fd 注册到poller中,poller 就可以监听该socket 是否有数据进来(poll in),或者发出(poll out)。
以kqueue为例,在loop中监听:
void zmq::kqueue_t::loop () { while (!stopping) { // Wait for events. struct kevent ev_buf [max_io_events]; int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, ...); ... for (int i = 0; i < n; i ++) { poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; ... if (ev_buf [i].filter == EVFILT_READ) pe->reactor->in_event (); // 触发事件 } ... } }
mailbox
mailbox_t有主要两个成员,一个是管道,另一个是信号员。管道是真正用于收发数据的,而信号员只是负责监视邮箱是否有来信,发信的时候也要发射信号。class mailbox_t : public i_mailbox { public: ... // 返回相应文件描述符,用于注册到poller中,监听是否有来信。 fd_t get_fd () const; // 发送cmd void send (const command_t &cmd_); // 接收cmd int recv (command_t *cmd_, int timeout_); ... private: ... // 管道,用于收发cmd cpipe_t cpipe; // Signaler to pass signals from writer thread to reader thread. signaler_t signaler; ... };
I/O Thread
io_thread_t中有一个邮箱mailbox和一个事件监听poller。
class io_thread_t : public object_t, public i_poll_events { public: ... // Launch the physical thread. void start (); // Ask underlying thread to stop. void stop (); // Returns mailbox associated with this I/O thread. mailbox_t *get_mailbox (); // i_poll_events implementation. void in_event (); void out_event (); // Used by io_objects to retrieve the associated poller object. poller_t *get_poller (); ... private: // I/O thread accesses incoming commands via this mailbox. mailbox_t mailbox; // Handle associated with mailbox' file descriptor. poller_t::handle_t mailbox_handle; // I/O multiplexing is performed using a poller object. poller_t *poller; ... };
io_thread在创建的时候,需要创建一个
poller,用于监听事件。
注意到
io_thread_t继承了
i_poll_events,所以他自己就可以作为一个Reactor。在构造时,需要向
poller注册自己的mailbox和自己本身(this):
poller->add_fd (mailbox.get_fd (), this);。
我觉得这里非常微妙,在
poller->loop()中,可以监听到邮箱mailbox是否有消息进来,然后通过透传数据即可触发
in_event()。正是因为上述
add_fd中注册了
this指针,所以可以很容易地回调自己的成员函数
in_event()。
构造函数如下:
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (mailbox_handle); }
总结
io_thread创建时向
poller注册自己的mailbox fd,以及
this指针,用于事件回调。
在
ctx_t中,通过
slots [i] = io_thread->get_mailbox ();注册I/O线程的邮箱。然后通过
slots [tid_]->send (command_);可以向该邮箱发送指令。
上述
send()调用的是mailbox 的
send(),他会向管道
cpipe写入数据,并发送信号
signaler->send()。
signaler_t发送信号其实就是用自己的socket
w发送。因为
w和
r是一对儿,所以
r会接收到数据。
当有指令进入mailbox时,
poller_t就会监听到。
poller监听的是
mailbox->get_fd(),这个fd正正是上述的
r。因为
mailbox->get_fd()返回的是
signaler->get_fd(),而
signaler->get_fd()返回的就是
r。
监听到邮箱有指令进入后,通过
reactor->in_event()触发事件。因为
io_thread在创建时向
poller注册了自己的
this指针,而
reactor正是这个
this指针,于是实现事件触发。
在
io_thread的
in_event()中,调用
mailbox->recv()。于是mailbox向管道cpipe读取指令。至此,io_thread成功收到指令!
相关文章推荐
- java-模拟tomcat服务器
- Linux socket 初步
- 从源码安装Mysql/Percona 5.5
- spymemcached源码中Reactor模式分析
- apache mpm
- java socket 注意的地方
- java socket 注意的地方
- C#基于socket模拟http请求的方法
- 浅析Ruby的源代码布局及其编程风格
- 简单的Ruby中的Socket编程教程
- Socket不能选择本地IP连接问题如何解决
- C#之Socket操作类实例解析
- 使用C#来编写一个异步的Socket服务器
- C#使用Socket快速判断数据库连接是否正常的方法
- 科学知识:理解socket
- websocket++简单使用及实例分析
- Android聊天工具基于socket实现
- PHP socket 模拟POST 请求实例代码
- php与flash as3 socket通信传送文件实现代码
- 解决time_wait强制关闭socket