关于Libevent的快速可移植非阻塞网络编程(block-》select-》poll-》epoll-》libevent)
2017-11-08 16:40
453 查看
我们学习IO调用基本都是从阻塞IO开始的。如果调用在操作完成之前,或者足够的时间已经流逝使得网络栈放弃操作之前,不会返回,那么就是异步的。比如说,在TCP连接上调用connect()时,操作系统将一个SYN分组排队到TCP连接的另一端主机中。在收到来自对方主机的SYN ACK分组之前,或者直到足够的时间已经流逝而决定放弃操作之前,控制不会返回到应用程序。
这里有一个使用阻塞网络调用的简单客户端示例。它打开到www.google.com的连接,发送一个简单的HTTP请求,将响应打印到stdout。
上述代码中的所有网络调用都是阻塞的:在成功解析www.google.com,或者解析失败之前,gethostbyname不会返回;连接建立之前connect不会返回;收到数据或者关闭之前recv调用不会返回;至少在清空输出缓冲区到内核的写缓冲区之前,send调用不会返回。
这里,阻塞IO没有什么不好的。如果没有其他事情需要同时进行,阻塞IO会工作得很好。但是考虑需要同时处理多个连接的情形。考虑一个具体的例子:需要从两个连接读取输入,但是不知道哪个连接将先收到输入。程序可能是这样的:
即使fd[2]上最先有数据到达,对fd[0]和fd[1]的读取操作取得一些数据并且完成之前,程序不会试图从fd[2]进行读取。
有时候用多线程或者多进程服务器来解决此问题。最简单的方式是用一个单独的进程(或者线程)处理每个连接。因为每个连接拥有独立的进程,一个连接上阻塞的IO调用不会阻塞其他任何连接的进程。
这里有另一个示例程序。它是一个简单的服务器,在端口47013上监听TCP连接,每次从其输入缓冲区读取一行,写回其ROT13混淆结果。程序使用fork()调用为每个进入的连接创建一个新的进程。
是否有同时处理多个连接的完美解决方案?我可以停止编写本书,去做其他事情吗?不可以。首先,一些平台上进程创建(甚至线程创建)的开销是很大的。现实中你可能想用线程池代替创建新进程。然而,线程的扩展性根本达不到期望。如果需要同时处理成千上万个连接,处理上万个线程的效率并不比在每个CPU上使用少量线程高。
如果线程不是处理多个连接的答案,那么什么是呢?在Unix世界中,可以使用非阻塞套接字:
这里fd是套接字的文件描述符。将fd(套接字)设置为非阻塞之后,对fd进行网络调用时,调用要么立即完成操作,要么返回一个特定的错误号,指示“现在不能进行操作,请重试”。这样,示例程序可以写作:
使用非阻塞套接字,上述代码可以工作,但只是在很少的情况下。程序性能将很糟糕,原因有两个。首先,如果任何连接上都没有数据可读,循环还是会无限进行,消耗CPU时间。第二,如果用这种方式处理多于一两个连接,程序将为每个连接进行内核调用,不论连接上是否有数据。我们需要的是一种可以告诉内核“等待这些套接字中的某一个有数据可读,并且告知是哪一个”。
对于此问题,现在仍然使用的最老的解决方案是select()。select()调用要求三个fd集合(作为位数组实现):一个用于读取,一个用于写入,一个用于异常。select()将等待集合中的某个套接字就绪,并且修改集合,使之仅包含已经就绪的套接字。
这是使用select的相同示例:
这里是使用select重新实现的ROT13服务器:
事情还没完。因为生成和读取select位数组所需的时间与用于select的最大fd成比例,所以当套接字个数增加时,select调用的开销将急剧增加。
不同的操作系统为select提供了不同的替代功能,包括poll、epoll、kqueue、evports和/dev/poll。这些函数的性能都比select高,而且除了poll之外,添加、删除套接字和通知套接字已经准备好IO的性能都是O(1)。
不幸的是,这些接口都不是标准的。Linux有epoll、BSD(包括Darwin)有kqueue、Solaris有evports和/dev/poll……,然而没有哪个操作系统有其他系统所拥有的调用。所以,如果想编写可移植的高性能异步应用,就需要一个封装所有这些接口的抽象,提供这些调用中性能最高的一个供使用。
这就是Libevent API最底层所做的事情。Libevent为各种select替代提供了一致的接口,使用所运行在的计算机上的最高效版本。
下面是另一个版本的异步ROT13服务器。这次用Libevent 2代替了select。注意fd_sets已经被抛弃:替代的是,将事件与结构体event_base关联或者断开关联,这可能是用select、poll、epoll或者kqueue实现的。
(代码需要注意的其他地方:使用evutil_socket_t代替int来代表套接字;调用evutil_make_socket_nonblocking来将套接字设置为异步的,而不是调用fcntl(O_NONBLOCK)。这使得代码兼容于Win32网络API)
使用是否便捷?(还有Windows呢?)
你可能注意到代码效率更高了,但是也更复杂了。使用fork的时候,(1)不需要为每个连接管理缓冲区:仅对每个进程使用一个单独的在栈上分配的缓冲区。(2)不需要显式跟踪每个套接字是否在读取或者写入:这隐藏在代码中了。(3)也不需要跟踪每个操作是否完成的结构体:只需要循环和栈变量。
此外,如果对Windows网络有很深的体验,你将认识到用于上述示例的时候,Libevent并不能取得优化的性能。在Windows上进行快速异步IO的方法不是使用select接口:而是使用IOCP。与其他快速网络API不同的是,IOCP不是在套接字已经准备好某种操作时通知程序,然后程序可以进行相应的操作。替代的是,程序告知Windows网络栈启动某网络操作,IOCP在操作完成时通知程序。
幸运的是,Libevent 2 的“bufferevent”接口解决了所有这些问题:它提供了让Libevent在Windows和Unix上都能够有效实现的接口,让程序编写更简单。
这是最后一个版本的ROT13,使用bufferevent API:
这里有一个使用阻塞网络调用的简单客户端示例。它打开到www.google.com的连接,发送一个简单的HTTP请求,将响应打印到stdout。
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For gethostbyname */ #include <netdb.h> #include <unistd.h> #include <string.h> #include <stdio.h> int main(int c, char **v) { const char query[] = "GET / HTTP/1.0\r\n" "Host: www.google.com\r\n" "\r\n"; const char hostname[] = "www.google.com"; struct sockaddr_in sin; struct hostent *h; const char *cp; int fd; ssize_t n_written, remaining; char buf[1024]; /* Look up the IP address for the hostname. Watch out; this isn't threadsafe on most platforms. */ h = gethostbyname(hostname); if (!h) { fprintf(stderr, "Couldn't lookup %s: %s", hostname, hstrerror(h_errno)); return 1; } if (h->h_addrtype != AF_INET) { fprintf(stderr, "No ipv6 support, sorry."); return 1; } /* Allocate a new socket */ fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { perror("socket"); return 1; } /* Connect to the remote host. */ sin.sin_family = AF_INET; sin.sin_port = htons(80); sin.sin_addr = *(struct in_addr*)h->h_addr; if (connect(fd, (struct sockaddr*) &sin, sizeof(sin))) { perror("connect"); close(fd); return 1; } /* Write the query. */ /* XXX Can send succeed partially? */ cp = query; remaining = strlen(query); while (remaining) { n_written = send(fd, cp, remaining, 0); if (n_written <= 0) { perror("send"); return 1; } remaining -= n_written; cp += n_written; } /* Get an answer back. */ while (1) { ssize_t result = recv(fd, buf, sizeof(buf), 0); if (result == 0) { break; } else if (result < 0) { perror("recv"); close(fd); return 1; } fwrite(buf, 1, result, stdout); } close(fd); return 0; }
上述代码中的所有网络调用都是阻塞的:在成功解析www.google.com,或者解析失败之前,gethostbyname不会返回;连接建立之前connect不会返回;收到数据或者关闭之前recv调用不会返回;至少在清空输出缓冲区到内核的写缓冲区之前,send调用不会返回。
这里,阻塞IO没有什么不好的。如果没有其他事情需要同时进行,阻塞IO会工作得很好。但是考虑需要同时处理多个连接的情形。考虑一个具体的例子:需要从两个连接读取输入,但是不知道哪个连接将先收到输入。程序可能是这样的:
/* This won't work. */ char buf[1024]; int i, n; while (i_still_want_to_read()) { for (i=0; i<n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n==0) handle_close(fd[i]); else if (n<0) handle_error(fd[i], errno); else handle_input(fd[i], buf, n); } }
即使fd[2]上最先有数据到达,对fd[0]和fd[1]的读取操作取得一些数据并且完成之前,程序不会试图从fd[2]进行读取。
有时候用多线程或者多进程服务器来解决此问题。最简单的方式是用一个单独的进程(或者线程)处理每个连接。因为每个连接拥有独立的进程,一个连接上阻塞的IO调用不会阻塞其他任何连接的进程。
这里有另一个示例程序。它是一个简单的服务器,在端口47013上监听TCP连接,每次从其输入缓冲区读取一行,写回其ROT13混淆结果。程序使用fork()调用为每个进入的连接创建一个新的进程。
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> #include <unistd.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #define MAX_LINE 16384 char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } void child(int fd) { char outbuf[MAX_LINE+1]; size_t outbuf_used = 0; ssize_t result; while (1) { char ch; result = recv(fd, &ch, 1, 0); if (result == 0) { break; } else if (result == -1) { perror("read"); break; } /* We do this test to keep the user from overflowing the buffer. */ if (outbuf_used < sizeof(outbuf)) { outbuf[outbuf_used++] = rot13_char(ch); } if (ch == '\n') { send(fd, outbuf, outbuf_used, 0); outbuf_used = 0; continue; } } } void run(void) { int listener; struct sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } while (1) { struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else { if (fork() == 0) { child(fd); exit(0); } } } } int main(int c, char **v) { run(); return 0; }
是否有同时处理多个连接的完美解决方案?我可以停止编写本书,去做其他事情吗?不可以。首先,一些平台上进程创建(甚至线程创建)的开销是很大的。现实中你可能想用线程池代替创建新进程。然而,线程的扩展性根本达不到期望。如果需要同时处理成千上万个连接,处理上万个线程的效率并不比在每个CPU上使用少量线程高。
如果线程不是处理多个连接的答案,那么什么是呢?在Unix世界中,可以使用非阻塞套接字:
fcntl(fd, F_SETFL, O_NONBLOCK);
这里fd是套接字的文件描述符。将fd(套接字)设置为非阻塞之后,对fd进行网络调用时,调用要么立即完成操作,要么返回一个特定的错误号,指示“现在不能进行操作,请重试”。这样,示例程序可以写作:
/* This will work, but the performance will be unforgivably bad. */ int i, n; char buf[1024]; for (i=0; i < n_sockets; ++i) fcntl(fd[i], F_SETFL, O_NONBLOCK); while (i_still_want_to_read()) { for (i=0; i < n_sockets; ++i) { n = recv(fd[i], buf, sizeof(buf), 0); if (n == 0) { handle_close(fd[i]); } else if (n < 0) { if (errno == EAGAIN) ; /* The kernel didn't have any data for us to read. */ else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); } } }
使用非阻塞套接字,上述代码可以工作,但只是在很少的情况下。程序性能将很糟糕,原因有两个。首先,如果任何连接上都没有数据可读,循环还是会无限进行,消耗CPU时间。第二,如果用这种方式处理多于一两个连接,程序将为每个连接进行内核调用,不论连接上是否有数据。我们需要的是一种可以告诉内核“等待这些套接字中的某一个有数据可读,并且告知是哪一个”。
对于此问题,现在仍然使用的最老的解决方案是select()。select()调用要求三个fd集合(作为位数组实现):一个用于读取,一个用于写入,一个用于异常。select()将等待集合中的某个套接字就绪,并且修改集合,使之仅包含已经就绪的套接字。
这是使用select的相同示例:
/* If you only have a couple dozen fds, this version won't be awful */ fd_set readset; int i, n; char buf[1024]; while (i_still_want_to_read()) { int maxfd = -1; FD_ZERO(&readset); /* Add all of the interesting fds to readset */ for (i=0; i < n_sockets; ++i) { if (fd[i]>maxfd) maxfd = fd[i]; FD_SET(fd[i], &readset); } /* Wait until one or more fds are ready to read */ select(maxfd+1, &readset, NULL, NULL, NULL); /* Process all of the fds that are still set in readset */ for (i=0; i < n_sockets; ++i) { if (FD_ISSET(fd[i], &readset)) { n = recv(fd[i], buf, sizeof(buf), 0); if (n == 0) { handle_close(fd[i]); } else if (n < 0) { if (errno == EAGAIN) ; /* The kernel didn't have any data for us to read. */ else handle_error(fd[i], errno); } else { handle_input(fd[i], buf, n); } } } }
这里是使用select重新实现的ROT13服务器:
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> /* for select */ #include <sys/select.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ( 11796 (c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; int writing; size_t n_written; size_t write_upto; }; struct fd_state * alloc_fd_state(void) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (!state) return NULL; state->buffer_used = state->n_written = state->writing = state->write_upto = 0; return state; } void free_fd_state(struct fd_state *state) { free(state); } void make_nonblocking(int fd) { fcntl(fd, F_SETFL, O_NONBLOCK); } int do_read(int fd, struct fd_state *state) { char buf[1024]; int i; ssize_t result; while (1) { result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n') { state->writing = 1; state->write_upto = state->buffer_used; } } } if (result == 0) { return 1; } else if (result < 0) { if (errno == EAGAIN) return 0; return -1; } return 0; } int do_write(int fd, struct fd_state *state) { while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) return 0; return -1; } assert(result != 0); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 0; state->writing = 0; return 0; } void run(void) { int listener; struct fd_state *state[FD_SETSIZE]; struct sockaddr_in sin; int i, maxfd; fd_set readset, writeset, exset; sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); for (i = 0; i < FD_SETSIZE; ++i) state[i] = NULL; listener = socket(AF_INET, SOCK_STREAM, 0); make_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); while (1) { maxfd = listener; FD_ZERO(&readset); FD_ZERO(&writeset); FD_ZERO(&exset); FD_SET(listener, &readset); for (i=0; i < FD_SETSIZE; ++i) { if (state[i]) { if (i > maxfd) maxfd = i; FD_SET(i, &readset); if (state[i]->writing) { FD_SET(i, &writeset); } } } if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) { perror("select"); return; } if (FD_ISSET(listener, &readset)) { struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { make_nonblocking(fd); state[fd] = alloc_fd_state(); assert(state[fd]);/*XXX*/ } } for (i=0; i < maxfd+1; ++i) { int r = 0; if (i == listener) continue; if (FD_ISSET(i, &readset)) { r = do_read(i, state[i]); } if (r == 0 && FD_ISSET(i, &writeset)) { r = do_write(i, state[i]); } if (r) { free_fd_state(state[i]); state[i] = NULL; close(i); } } } } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
事情还没完。因为生成和读取select位数组所需的时间与用于select的最大fd成比例,所以当套接字个数增加时,select调用的开销将急剧增加。
不同的操作系统为select提供了不同的替代功能,包括poll、epoll、kqueue、evports和/dev/poll。这些函数的性能都比select高,而且除了poll之外,添加、删除套接字和通知套接字已经准备好IO的性能都是O(1)。
不幸的是,这些接口都不是标准的。Linux有epoll、BSD(包括Darwin)有kqueue、Solaris有evports和/dev/poll……,然而没有哪个操作系统有其他系统所拥有的调用。所以,如果想编写可移植的高性能异步应用,就需要一个封装所有这些接口的抽象,提供这些调用中性能最高的一个供使用。
这就是Libevent API最底层所做的事情。Libevent为各种select替代提供了一致的接口,使用所运行在的计算机上的最高效版本。
下面是另一个版本的异步ROT13服务器。这次用Libevent 2代替了select。注意fd_sets已经被抛弃:替代的是,将事件与结构体event_base关联或者断开关联,这可能是用select、poll、epoll或者kqueue实现的。
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } struct fd_state { char buffer[MAX_LINE]; size_t buffer_used; size_t n_written; size_t write_upto; struct event *read_event; struct event *write_event; }; struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd) { struct fd_state *state = malloc(sizeof(struct fd_state)); if (!state) return NULL; state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state); if (!state->read_event) { free(state); return NULL; } state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state); if (!state->write_event) { event_free(state->read_event); free(state); return NULL; } state->buffer_used = state->n_written = state->write_upto = 0; assert(state->write_event); return state; } void free_fd_state(struct fd_state *state) { event_free(state->read_event); event_free(state->write_event); free(state); } void do_read(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; char buf[1024]; int i; ssize_t result; while (1) { assert(state->write_event); result = recv(fd, buf, sizeof(buf), 0); if (result <= 0) break; for (i=0; i < result; ++i) { if (state->buffer_used < sizeof(state->buffer)) state->buffer[state->buffer_used++] = rot13_char(buf[i]); if (buf[i] == '\n') { assert(state->write_event); event_add(state->write_event, NULL); state->write_upto = state->buffer_used; } } } if (result == 0) { free_fd_state(state); } else if (result < 0) { if (errno == EAGAIN) // XXXX use evutil macro return; perror("recv"); free_fd_state(state); } } void do_write(evutil_socket_t fd, short events, void *arg) { struct fd_state *state = arg; while (state->n_written < state->write_upto) { ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0); if (result < 0) { if (errno == EAGAIN) // XXX use evutil macro return; free_fd_state(state); return; } assert(result != 0); state->n_written += result; } if (state->n_written == state->buffer_used) state->n_written = state->write_upto = state->buffer_used = 1; event_del(state->write_event); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { // XXXX eagain?? perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */ } else { struct fd_state *state; evutil_make_socket_nonblocking(fd); state = alloc_fd_state(base, fd); assert(state); /*XXX err*/ assert(state->write_event); event_add(state->read_event, NULL); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
(代码需要注意的其他地方:使用evutil_socket_t代替int来代表套接字;调用evutil_make_socket_nonblocking来将套接字设置为异步的,而不是调用fcntl(O_NONBLOCK)。这使得代码兼容于Win32网络API)
使用是否便捷?(还有Windows呢?)
你可能注意到代码效率更高了,但是也更复杂了。使用fork的时候,(1)不需要为每个连接管理缓冲区:仅对每个进程使用一个单独的在栈上分配的缓冲区。(2)不需要显式跟踪每个套接字是否在读取或者写入:这隐藏在代码中了。(3)也不需要跟踪每个操作是否完成的结构体:只需要循环和栈变量。
此外,如果对Windows网络有很深的体验,你将认识到用于上述示例的时候,Libevent并不能取得优化的性能。在Windows上进行快速异步IO的方法不是使用select接口:而是使用IOCP。与其他快速网络API不同的是,IOCP不是在套接字已经准备好某种操作时通知程序,然后程序可以进行相应的操作。替代的是,程序告知Windows网络栈启动某网络操作,IOCP在操作完成时通知程序。
幸运的是,Libevent 2 的“bufferevent”接口解决了所有这些问题:它提供了让Libevent在Windows和Unix上都能够有效实现的接口,让程序编写更简单。
这是最后一个版本的ROT13,使用bufferevent API:
/* For sockaddr_in */ #include <netinet/in.h> /* For socket functions */ #include <sys/socket.h> /* For fcntl */ #include <fcntl.h> #include <event2/event.h> #include <event2/buffer.h> #include <event2/bufferevent.h> #include <assert.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <errno.h> #define MAX_LINE 16384 void do_read(evutil_socket_t fd, short events, void *arg); void do_write(evutil_socket_t fd, short events, void *arg); char rot13_char(char c) { /* We don't want to use isalpha here; setting the locale would change * which characters are considered alphabetical. */ if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M')) return c + 13; else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z')) return c - 13; else return c; } void readcb(struct bufferevent *bev, void *ctx) { struct evbuffer *input, *output; char *line; size_t n; int i; input = bufferevent_get_input(bev); output = bufferevent_get_output(bev); while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) { for (i = 0; i < n; ++i) line[i] = rot13_char(line[i]); evbuffer_add(output, line, n); evbuffer_add(output, "\n", 1); free(line); } if (evbuffer_get_length(input) >= MAX_LINE) { /* Too long; just process what there is and go on so that the buffer * doesn't grow infinitely long. */ char buf[1024]; while (evbuffer_get_length(input)) { int n = evbuffer_remove(input, buf, sizeof(buf)); for (i = 0; i < n; ++i) buf[i] = rot13_char(buf[i]); evbuffer_add(output, buf, n); } evbuffer_add(output, "\n", 1); } } void errorcb(struct bufferevent *bev, short error, void *ctx) { if (error & BEV_EVENT_EOF) { /* connection has been closed, do any clean up here */ /* */ } else if (error & BEV_EVENT_ERROR) { /* check errno to see what error occurred */ /* */ } else if (error & BEV_EVENT_TIMEOUT) { /* must be a timeout event handle, handle it */ /* */ } bufferevent_free(bev); } void do_accept(evutil_socket_t listener, short event, void *arg) { struct event_base *base = arg; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); int fd = accept(listener, (struct sockaddr*)&ss, &slen); if (fd < 0) { perror("accept"); } else if (fd > FD_SETSIZE) { close(fd); } else { struct bufferevent *bev; evutil_make_socket_nonblocking(fd); bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, readcb, NULL, errorcb, NULL); bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE); bufferevent_enable(bev, EV_READ|EV_WRITE); } } void run(void) { evutil_socket_t listener; struct sockaddr_in sin; struct event_base *base; struct event *listener_event; base = event_base_new(); if (!base) return; /*XXXerr*/ sin.sin_family = AF_INET; sin.sin_addr.s_addr = 0; sin.sin_port = htons(40713); listener = socket(AF_INET, SOCK_STREAM, 0); evutil_make_socket_nonblocking(listener); #ifndef WIN32 { int one = 1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); } #endif if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) { perror("bind"); return; } if (listen(listener, 16)<0) { perror("listen"); return; } listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base); /*XXX check it */ event_add(listener_event, NULL); event_base_dispatch(base); } int main(int c, char **v) { setvbuf(stdout, NULL, _IONBF, 0); run(); return 0; }
相关文章推荐
- 使用Libevent的快速可移植非阻塞网络编程:异步IO简介
- 关于网络(同步、异步、阻塞、非阻塞,select/poll/epoll,rpc/msgqueue,tcpip常见面试题)
- 翻译:使用Libevent的快速可移植非阻塞网络编程:异步IO简介
- 使用Libevent的快速可移植非阻塞网络编程:异步IO简介
- 使用Libevent的快速可移植非阻塞网络编程:异步IO简介 (一)
- 翻译:使用Libevent的快速可移植非阻塞网络编程:异步IO简介 (一) (转
- 翻译:使用Libevent的快速可移植非阻塞网络编程:异步IO简介 (一)
- Linux后台网络编程中select/poll/epoll的比较分析
- 几种典型的服务器网络编程模型归纳( select poll epoll)
- Linux网络编程--select,poll和epoll的区别
- 关于同步,异步,阻塞,非阻塞,IOCP/epoll,select/poll,AIO ,NIO ,BIO的总结
- 阻塞与非阻塞、select关于深入网络编程
- 关于同步,异步,阻塞,非阻塞,IOCP/epoll,select/poll,AIO ,NIO ,BIO的总结
- 网络编程 - select、poll、epoll比较
- linux网络编程之socket(十三):epoll 系列函数简介、与select、poll 的区别
- linux 网络编程 I/O复用 select,poll ,epoll
- Linux网络通信编程(套接字模型TCP\UDP与IO多路复用模型select\poll\epoll)
- 几种典型的服务器网络编程模型归纳(select poll epoll)
- Linux网络通信编程(套接字模型TCP\UDP与IO多路复用模型select\poll\epoll)
- 朴素、Select、Poll和Epoll网络编程模型实现和分析——朴素模型