您的位置:首页 > 其它

Libevent源码分析(七)--- IOCP

2016-04-02 20:00 429 查看
关于iocp模型,网上有很多资料,这里不详细分析,下面这篇文章写的非常详细:

完成端口(CompletionPort)详解 - 手把手教你玩转网络编程系列之三

event_base中有一个iocp变量,event_base的初始化函数中会调用event_base_start_iocp开启iocp功能,event_base_start_iocp又会调用event_iocp_port_launch来初始化IOCP:

#ifdef WIN32
if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
event_base_start_iocp(base, cfg->n_cpus_hint);
#endif
int
event_base_start_iocp(struct event_base *base, int n_cpus)
{
#ifdef WIN32
if (base->iocp)
return 0;
base->iocp = event_iocp_port_launch(n_cpus);
if (!base->iocp) {
event_warnx("%s: Couldn't launch IOCP", __func__);
return -1;
}
return 0;
#else
return -1;
#endif


iocp指向一个event_iocp_port结构体:

struct event_iocp_port {
/** The port itself */
HANDLE port;
/* A lock to cover internal structures. */
CRITICAL_SECTION lock;
/** Number of threads ever open on the port. */
short n_threads;
/** True iff we're shutting down all the threads on this port */
short shutdown;
/** How often the threads on this port check for shutdown and other
* conditions */
long ms;
/* The threads that are waiting for events. */
HANDLE *threads;
/** Number of threads currently open on this port. */
short n_live_threads;
/** A semaphore to signal when we are done shutting down. */
HANDLE *shutdownSemaphore;
};


其中port使iocp的端口,shutdown,lock和shutdownSemaphore用于关闭iocp。ms是GetQueuedCompletionStatus的等待时间,threads是线程句柄,n_threads代表线程数量,n_live_threads代表当前没有关闭的线程。

event_iocp_port的初始化在event_iocp_port_launch函数中进行:

struct event_iocp_port *event_iocp_port_launch(int n_cpus)
{
struct event_iocp_port *port;
int i;

if (!extension_fns_initialized)
init_extension_functions(&the_extension_fns);

if (!(port = mm_calloc(1, sizeof(struct event_iocp_port))))
return NULL;

if (n_cpus <= 0)
n_cpus = N_CPUS_DEFAULT;
port->n_threads = n_cpus * 2;
port->threads = mm_calloc(port->n_threads, sizeof(HANDLE));
if (!port->threads)
goto err;

port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
n_cpus);
port->ms = -1;
if (!port->port)
goto err;

port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL);
if (!port->shutdownSemaphore)
goto err;

for (i=0; i<port->n_threads; ++i) {
ev_uintptr_t th = _beginthread(loop, 0, port);
if (th == (ev_uintptr_t)-1)
goto err;
port->threads[i] = (HANDLE)th;
++port->n_live_threads;
}

InitializeCriticalSectionAndSpinCount(&port->lock, 1000);

return port;
err:
if (port->port)
CloseHandle(port->port);
if (port->threads)
mm_free(port->threads);
if (port->shutdownSemaphore)
CloseHandle(port->shutdownSemaphore);
mm_free(port);
return NULL;
}


event_iocp_port_launch会获取iocp的扩展函数库,然后创建iocp的端口和用于关闭iocp的信号量,iocp还需要一个线程池,线程池大小为cpu数量的2倍。

void
event_overlapped_init(struct event_overlapped *o, iocp_callback cb)
{
memset(o, 0, sizeof(struct event_overlapped));
o->cb = cb;
}

static void
handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok)
{
struct event_overlapped *eo =
EVUTIL_UPCAST(o, struct event_overlapped, overlapped);
eo->cb(eo, completion_key, nBytes, ok);
}

static void
loop(void *_port)
{
struct event_iocp_port *port = _port;
long ms = port->ms;
HANDLE p = port->port;

if (ms <= 0)
ms = INFINITE;

while (1) {
OVERLAPPED *overlapped=NULL;
ULONG_PTR key=0;
DWORD bytes=0;
int ok = GetQueuedCompletionStatus(p, &bytes, &key,
&overlapped, ms);
EnterCriticalSection(&port->lock);
if (port->shutdown) {
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1,
NULL);
LeaveCriticalSection(&port->lock);
return;
}
LeaveCriticalSection(&port->lock);

if (key != NOTIFICATION_KEY && overlapped)
handle_entry(overlapped, key, bytes, ok);
else if (!overlapped)
break;
}
event_warnx("GetQueuedCompletionStatus exited with no event.");
EnterCriticalSection(&port->lock);
if (--port->n_live_threads == 0)
ReleaseSemaphore(port->shutdownSemaphore, 1, NULL);
LeaveCriticalSection(&port->lock);
}


loop是线程池运行的函数,他通过调用GetQueuedCompletionStatus监听iocp中的事件。ms在初始化中被设置为-1,无限等待。正常轻快下GetQueuedCompletionStatus中的key会返回0,如果返回-1即NOTIFICATION_KEY证明调用了event_iocp_notify_all函数从而调用了PostQueuedCompletionStatus导致GetQueuedCompletionStatus立刻返回。这时需要关闭线程,结束iocp。当loop检测到事件之后会调用handle_entry来调用对应的回调。

event_overlapped的定义如下:

struct event_overlapped {
OVERLAPPED overlapped;
iocp_callback cb;
};


该结构体的第一个变量OVERLAPPED就是投入读写事件时传入的OVERLAPPED,根据GetQueuedCompletionStatus的返回可以获得对应的OVERLAPPED,从而可以根据EVUTIL_UPCAST获得event_overlapped结构体,调用对应的回调。

看完iocp的工作流程,接下来看bufferevent_asyn是如何使用iocp的。想要试用bufferevent_asyn,首先要调用bufferevent_async_new:

struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
{
struct bufferevent_async *bev_a;
struct bufferevent *bev;
struct event_iocp_port *iocp;

options |= BEV_OPT_THREADSAFE;

if (!(iocp = event_base_get_iocp(base)))
return NULL;

if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
int err = GetLastError();
/* We may have alrady associated this fd with a port.
* Let's hope it's this port, and that the error code
* for doing this neer changes. */
if (err != ERROR_INVALID_PARAMETER)
return NULL;
}

if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
return NULL;

bev = &bev_a->bev.bev;
if (!(bev->input = evbuffer_overlapped_new(fd))) {
mm_free(bev_a);
return NULL;
}
if (!(bev->output = evbuffer_overlapped_new(fd))) {
evbuffer_free(bev->input);
mm_free(bev_a);
return NULL;
}

if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async,
options)<0)
goto err;

evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);

event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
event_overlapped_init(&bev_a->read_overlapped, read_complete);
event_overlapped_init(&bev_a->write_overlapped, write_complete);

bev_a->ok = fd >= 0;
if (bev_a->ok)
_bufferevent_init_generic_timeout_cbs(bev);

return bev;
err:
bufferevent_free(&bev_a->bev.bev);
return NULL;
}


参数中的fd可以是有效的套接字,也可以是-1(之后在be_async_ctrl中指定),event_iocp_port_associate用于把套接字关联到iocp的端口上。evbuffer_overlapped_new创建一个evbuffer_overlapped结构体并且返回一个evbuffer:

struct evbuffer_overlapped {
struct evbuffer buffer;
/** The socket that we're doing overlapped IO on. */
evutil_socket_t fd;

/** pending I/O type */
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;

/** The first pinned chain in the buffer. */
struct evbuffer_chain *first_pinned;

/** How many chains are pinned; how many of the fields in buffers
* are we using. */
int n_buffers;
WSABUF buffers[MAX_WSABUFS];
};

struct evbuffer *
evbuffer_overlapped_new(evutil_socket_t fd)
{
struct evbuffer_overlapped *evo;

evo = mm_calloc(1, sizeof(struct evbuffer_overlapped));
if (!evo)
return NULL;

TAILQ_INIT(&evo->buffer.callbacks);
evo->buffer.refcnt = 1;
evo->buffer.last_with_datap = &evo->buffer.first;

evo->buffer.is_overlapped = 1;
evo->fd = fd;

return &evo->buffer;
}


evbuffer_overlapped结构体可以由evbuffer通过upcast_evbuffer函数得到。libevent大量使用这种方法隐藏复杂实现,只给开发者简单的通用的结构,内部则通过类型转换获得真正的结构体。evbuffer_overlapped结构体中read_in_progress和write_in_progress用来标记读写事件是否已经投递。buffers是iocp投递读写时用到的数据缓存。

继续看bufferevent_async_new函数,be_async_inbuf_callback和be_async_outbuf_callback函数被设置为读写evbuffer的callback。connect_complete,read_complete和write_complete分别被设置为connect_overlapped,read_overlapped和write_overlapped的回调。接下来就看一下这些回调:

static void
be_async_outbuf_callback(struct evbuffer *buf,
const struct evbuffer_cb_info *cbinfo,
void *arg)
{
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);

/* If we added data to the outbuf and were not writing before,
* we may want to write now. */

_bufferevent_incref_and_lock(bev);

if (cbinfo->n_added)
bev_async_consider_writing(bev_async);

_bufferevent_decref_and_unlock(bev);
}
be_async_outbuf_callback是output的回调,当有数据写入output(如果设置阀值需要达到阀值)时会调用此函数。该函数会调用bev_async_consider_writing进行写事件的投递:


static void

bev_async_consider_writing(struct bufferevent_async *beva)

{

size_t at_most;

int limit;

struct bufferevent *bev = &beva->bev.bev;

/* Don't write if there's a write in progress, or we do not
* want to write, or when there's nothing left to write. */
if (beva->write_in_progress || beva->bev.connecting)
return;
if (!beva->ok || !(bev->enabled&EV_WRITE) ||
!evbuffer_get_length(bev->output)) {
bev_async_del_write(beva);
return;
}

at_most = evbuffer_get_length(bev->output);

/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX.  That's true for now. XXXX */
limit = (int)_bufferevent_get_write_max(&beva->bev);
if (at_most >= (size_t)limit && limit >= 0)
at_most = limit;

if (beva->bev.write_suspended) {
bev_async_del_write(beva);
return;
}

/*  XXXX doesn't respect low-water mark very well. */
bufferevent_incref(bev);
if (evbuffer_launch_write(bev->output, at_most,
&beva->write_overlapped)) {
bufferevent_decref(bev);
beva->ok = 0;
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
beva->write_in_progress = at_most;
_bufferevent_decrement_write_buckets(&beva->bev, at_most);
bev_async_add_write(beva);
}


}

bev_async_consider_writing会判断当前是否已经有数据在投递,如果没有则设置一个合理的at_most值然后调用evbuffer_launch_write:


int

evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,

struct event_overlapped *ol)

{

struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);

int r = -1;

int i;

struct evbuffer_chain *chain;

DWORD bytesSent;

if (!buf) {
/* No buffer, or it isn't overlapped */
return -1;
}

EVBUFFER_LOCK(buf);
EVUTIL_ASSERT(!buf_o->read_in_progress);
if (buf->freeze_start || buf_o->write_in_progress)
goto done;
if (!buf->total_len) {
/* Nothing to write */
r = 0;
goto done;
} else if (at_most < 0 || (size_t)at_most > buf->total_len) {
at_most = buf->total_len;
}
evbuffer_freeze(buf, 1);

buf_o->first_pinned = NULL;
buf_o->n_buffers = 0;
memset(buf_o->buffers, 0, sizeof(buf_o->buffers));

chain = buf_o->first_pinned = buf->first;

for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
WSABUF *b = &buf_o->buffers[i];
b->buf = (char*)( chain->buffer + chain->misalign );
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W);

if ((size_t)at_most > chain->off) {
/* XXXX Cast is safe for now, since win32 has no
mmaped chains.  But later, we need to have this
add more WSAbufs if chain->off is greater than
ULONG_MAX */
b->len = (unsigned long)chain->off;
at_most -= chain->off;
} else {
b->len = (unsigned long)at_most;
++i;
break;
}
}

buf_o->n_buffers = i;
_evbuffer_incref(buf);
if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
&ol->overlapped, NULL)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
/* An actual error. */
pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
evbuffer_unfreeze(buf, 1);
evbuffer_free(buf); /* decref */
goto done;
}
}

buf_o->write_in_progress = 1;
r = 0;


done:

EVBUFFER_UNLOCK(buf);

return r;

}

evbuffer_launch_write将evbuffer中的数据和WSABUF做一个映射,之后调用WSASend进行发送,次函数的参数ol是bufferevent_async的write_overlapped结构体,WSASend函数中传入的就是write_overlapped结构体中的overlapped变量:


static void

handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok)

{

struct event_overlapped *eo =

EVUTIL_UPCAST(o, struct event_overlapped, overlapped);

eo->cb(eo, completion_key, nBytes, ok);

}

这样当handle_entry处理回调时就会找到write_overlapped结构体,从而找到write_complete回调:


static void

write_complete(struct event_overlapped *eo, ev_uintptr_t key,

ev_ssize_t nbytes, int ok)

{

struct bufferevent_async *bev_a = upcast_write(eo);

struct bufferevent *bev = &bev_a->bev.bev;

short what = BEV_EVENT_WRITING;

ev_ssize_t amount_unwritten;

BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->write_in_progress);

amount_unwritten = bev_a->write_in_progress - nbytes;
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;

if (amount_unwritten)
_bufferevent_decrement_write_buckets(&bev_a->bev,
-amount_unwritten);

if (!ok)
bev_async_set_wsa_error(bev, eo);

if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
_bufferevent_run_writecb(bev);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
} else if (!nbytes) {
what |= BEV_EVENT_EOF;
bev_a->ok = 0;
_bufferevent_run_eventcb(bev, what);
}
}

_bufferevent_decref_and_unlock(bev);


}

write_complete首先调用evbuffer_commit_write来处理evbuffer中的数据,然后判断是否需要调用bufferevent的写事件回调活着事件回调,同时如果evbuffer中还有数据需要再次投递。

读事件和连接事件的流程和写事件的流程比较类似,这里不在分析,注意的是当使用IOCP模式时,bufferevent的读写事件不再又event_base管理,而是直接使用iocp。当投递读写事件活着连接事件时会调用event_base_add_virtual,当事件完成时则会调用event_base_del_virtual,这些事件对event_base来说是虚拟的,因为它们是iocp负责管理的。但是当iocp统计事件时同样会将他们计算在内。


static void

bev_async_del_write(struct bufferevent_async *beva)

{

struct bufferevent *bev = &beva->bev.bev;

if (beva->write_added) {
beva->write_added = 0;
event_base_del_virtual(bev->ev_base);
}


}

static void

bev_async_del_read(struct bufferevent_async *beva)

{

struct bufferevent *bev = &beva->bev.bev;

if (beva->read_added) {
beva->read_added = 0;
event_base_del_virtual(bev->ev_base);
}


}

static void

bev_async_add_write(struct bufferevent_async *beva)

{

struct bufferevent *bev = &beva->bev.bev;

if (!beva->write_added) {
beva->write_added = 1;
event_base_add_virtual(bev->ev_base);
}


}

static void

bev_async_add_read(struct bufferevent_async *beva)

{

struct bufferevent *bev = &beva->bev.bev;

if (!beva->read_added) {
beva->read_added = 1;
event_base_add_virtual(bev->ev_base);
}


}

“`

libevent的源码分析至此告一段落,另外libevent的http和dns在这里不在分析,它们都是建立在之前分析的源码基础上的应用,如果后面有时间会补上这部分的内容。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: