您的位置:首页 > 编程语言 > C语言/C++

C++爱好者博客

2013-01-06 09:15 1116 查看
上篇我们讲到epoll的函数和性能。这一篇用用这些个函数,给出一个最简单的epoll的例子。

C++

//
// a simple echo server using epoll in linux
//
// 2009-11-05
// by sparkling
//
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std;
#define MAX_EVENTS 500
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[128]; // recv data buffer
int len;
long last_active; // last active time
};
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == 1){
op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = 1;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
printf("Event Add failed[fd=%d]/n", ev->fd);
else
printf("Event Add OK[fd=%d]/n", ev->fd);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
printf("%s: bad accept", __func__);
}
return;
}
do
{
for(i = 0; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == 0)
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
break;
}
// set nonblocking
if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break;
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN|EPOLLET, &g_Events[i]);
printf("new conn[%s:%d][time:%d]/n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), g_Events[i].last_active);
}while(0);
}
// receive data
void RecvData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// receive data
len = recv(fd, ev->buff, sizeof(ev->buff)-1, 0);
EventDel(g_epollFd, ev);
if(len > 0)
{
ev->len = len;
ev->buff[len] = '/0';
printf("C[%d]:%s/n", fd, ev->buff);
// change to send event
EventSet(ev, fd, SendData, ev);
EventAdd(g_epollFd, EPOLLOUT|EPOLLET, ev);
}
else if(len == 0)
{
close(ev->fd);
printf("[fd=%d] closed gracefully./n", fd);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s/n", fd, errno, strerror(errno));
}
}
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff, ev->len, 0);
ev->len = 0;
EventDel(g_epollFd, ev);
if(len > 0)
{
// change to receive event
EventSet(ev, fd, RecvData, ev);
EventAdd(g_epollFd, EPOLLIN|EPOLLET, ev);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]/n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int listenFd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
printf("server listen fd=%d/n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN|EPOLLET, &g_Events[MAX_EVENTS]);
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, 5);
}
int main(int argc, char **argv)
{
short port = 12345; // default port
if(argc == 2){
port = atoi(argv[1]);
}
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= 0) printf("create epoll failed.%d/n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d]/n", port);
int checkPos = 0;
while(1){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
long now = time(NULL);
for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
if(g_Events[checkPos].status != 1) continue;
long duration = now - g_Events[checkPos].last_active;
if(duration >= 60) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d--%d]./n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
if(fds < 0){
printf("epoll_wait error, exit/n");
break;
}
for(int i = 0; i < fds; i++){
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
//  
// a simple echo server using epoll in linux  
//  
// 2009-11-05  
// by sparkling  
//  
#include <sys/socket.h>  
#include <sys/epoll.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <fcntl.h>  
#include <unistd.h>  
#include <stdio.h>  
#include <errno.h>  
#include <iostream>  

using namespace std;  

#define MAX_EVENTS 500  

struct myevent_s  

{  

    int fd;  

    void (*call_back)(int fd, int events, void *arg);  

    int events;  

    void *arg;  

    int status; // 1: in epoll wait list, 0 not in  

    char buff[128]; // recv data buffer  

    int len;  

    long last_active; // last active time  

};  

// set event  

void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)  

{  

    ev->fd = fd;  

    ev->call_back = call_back;  

    ev->events = 0;  

    ev->arg = arg;  

    ev->status = 0;  

    ev->last_active = time(NULL);  

}  

// add/mod an event to epoll  

void EventAdd(int epollFd, int events, myevent_s *ev)  

{  

    struct epoll_event epv = {0, {0}};  

    int op;  

    epv.data.ptr = ev;  

    epv.events = ev->events = events;  

    if(ev->status == 1){  

        op = EPOLL_CTL_MOD;  

    }  

    else{  

        op = EPOLL_CTL_ADD;  

        ev->status = 1;  

    }  

    if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)  

        printf("Event Add failed[fd=%d]/n", ev->fd);  

    else  

        printf("Event Add OK[fd=%d]/n", ev->fd);  

}  

// delete an event from epoll  

void EventDel(int epollFd, myevent_s *ev)  

{  

    struct epoll_event epv = {0, {0}};  

    if(ev->status != 1) return;  

    epv.data.ptr = ev;  

    ev->status = 0;  

    epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);  

}  

int g_epollFd;  

myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd  

void RecvData(int fd, int events, void *arg);  

void SendData(int fd, int events, void *arg);  

// accept new connections from clients  

void AcceptConn(int fd, int events, void *arg)  

{  

    struct sockaddr_in sin;  

    socklen_t len = sizeof(struct sockaddr_in);  

    int nfd, i;  

    // accept  

    if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)  

    {  

        if(errno != EAGAIN && errno != EINTR)  

        {  

            printf("%s: bad accept", __func__);  

        }  

        return;  

    }  

    do  

    {  

        for(i = 0; i < MAX_EVENTS; i++)  

        {  

            if(g_Events[i].status == 0)  

            {  

                break;  

            }  

        }  

        if(i == MAX_EVENTS)  

        {  

            printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);  

            break;  

        }  

        // set nonblocking  

        if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break;  

        // add a read event for receive data  

        EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);  

        EventAdd(g_epollFd, EPOLLIN|EPOLLET, &g_Events[i]);  

        printf("new conn[%s:%d][time:%d]/n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), g_Events[i].last_active);  

    }while(0);  

}  

// receive data  

void RecvData(int fd, int events, void *arg)  

{  

    struct myevent_s *ev = (struct myevent_s*)arg;  

    int len;  

    // receive data  

    len = recv(fd, ev->buff, sizeof(ev->buff)-1, 0);    

    EventDel(g_epollFd, ev);  

    if(len > 0)  

    {  

        ev->len = len;  

        ev->buff[len] = '/0';  

        printf("C[%d]:%s/n", fd, ev->buff);  

        // change to send event  

        EventSet(ev, fd, SendData, ev);  

        EventAdd(g_epollFd, EPOLLOUT|EPOLLET, ev);  

    }  

    else if(len == 0)  

    {  

        close(ev->fd);  

        printf("[fd=%d] closed gracefully./n", fd);  

    }  

    else  

    {  

        close(ev->fd);  

        printf("recv[fd=%d] error[%d]:%s/n", fd, errno, strerror(errno));  

    }  

}  

// send data  

void SendData(int fd, int events, void *arg)  

{  

    struct myevent_s *ev = (struct myevent_s*)arg;  

    int len;  

    // send data  

    len = send(fd, ev->buff, ev->len, 0);  

    ev->len = 0;  

    EventDel(g_epollFd, ev);  

    if(len > 0)  

    {  

        // change to receive event  

        EventSet(ev, fd, RecvData, ev);  

        EventAdd(g_epollFd, EPOLLIN|EPOLLET, ev);  

    }  

    else  

    {  

        close(ev->fd);  

        printf("recv[fd=%d] error[%d]/n", fd, errno);  

    }  

}  

void InitListenSocket(int epollFd, short port)  

{  

    int listenFd = socket(AF_INET, SOCK_STREAM, 0);  

    fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking  

    printf("server listen fd=%d/n", listenFd);  

    EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);  

    // add listen socket  

    EventAdd(epollFd, EPOLLIN|EPOLLET, &g_Events[MAX_EVENTS]);  

    // bind & listen  

    sockaddr_in sin;  

    bzero(&sin, sizeof(sin));  

    sin.sin_family = AF_INET;  

    sin.sin_addr.s_addr = INADDR_ANY;  

    sin.sin_port = htons(port);  

    bind(listenFd, (const sockaddr*)&sin, sizeof(sin));  

    listen(listenFd, 5);  

}  

int main(int argc, char **argv)  

{  

    short port = 12345; // default port  

    if(argc == 2){  

        port = atoi(argv[1]);  

    }  

    // create epoll  

    g_epollFd = epoll_create(MAX_EVENTS);  

    if(g_epollFd <= 0) printf("create epoll failed.%d/n", g_epollFd);  

    // create & bind listen socket, and add to epoll, set non-blocking  

    InitListenSocket(g_epollFd, port);  

    // event loop  

    struct epoll_event events[MAX_EVENTS];  

    printf("server running:port[%d]/n", port);  

    int checkPos = 0;  

    while(1){  

        // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event  

        long now = time(NULL);  

        for(int i = 0; i < 100; i++, checkPos++) // doesn't check listen fd  

        {  

            if(checkPos == MAX_EVENTS) checkPos = 0; // recycle  

            if(g_Events[checkPos].status != 1) continue;  

            long duration = now - g_Events[checkPos].last_active;  

            if(duration >= 60) // 60s timeout  

            {  

                close(g_Events[checkPos].fd);  

                printf("[fd=%d] timeout[%d--%d]./n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);  

                EventDel(g_epollFd, &g_Events[checkPos]);  

            }  

        }  

        // wait for events to happen  

        int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);  

        if(fds < 0){  

            printf("epoll_wait error, exit/n");  

            break;  

        }  

        for(int i = 0; i < fds; i++){  

            myevent_s *ev = (struct myevent_s*)events[i].data.ptr;  

            if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event  

            {  

                ev->call_back(ev->fd, events[i].events, ev->arg);  

            }  

            if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event  

            {  

                ev->call_back(ev->fd, events[i].events, ev->arg);  

            }  

        }  

    }  

    // free resource  

    return 0;  

}
上面是一个echo server的例子,下来我们看下echo client的例子

C++

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#define MAXDATASIZE 100 // max number of bytes we can get at once
int main(int argc, char *argv[])
{
int sockfd, numbytes;
char buf[MAXDATASIZE];
struct hostent *he;
struct sockaddr_in their_addr; // connector'saddress information
if (( argc == 1) || (argc==2) )
{
fprintf(stderr,"usage: client hostname\nEx:\n$./client01 ip port\n");
exit(1);
}
if ((he=gethostbyname(argv[1])) == NULL)
{ // get the host info
herror("gethostbyname");
exit(1);
}
if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
exit(1);
}
their_addr.sin_family = AF_INET; // host byte order
their_addr.sin_port = htons(atoi(argv[2])); // short, network byte order
their_addr.sin_addr = *((struct in_addr *)he->h_addr);
memset(their_addr.sin_zero, '', sizeof their_addr.sin_zero);
if (connect(sockfd, (struct sockaddr *)&their_addr, sizeof their_addr) == -1)
{
perror("connect");
exit(1);
}
while( 1 )
{
if(send(sockfd, "hello, this is client message!", strlen("hello, this is client message!"), 0 ) == -1)
{
perror("send");
}
if ((numbytes=recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1)
{
perror("recv");
}
else if( numbytes == 0 )
{
printf("Remote server has shutdown!\n");
break;
}
buf[numbytes] = '';
printf("Received: %s \n",buf);
sleep(1);
}
close(sockfd);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
 
#define MAXDATASIZE 100 // max number of bytes we can get at once
 

int main(int argc, char *argv[])

{

    int sockfd, numbytes;

    char buf[MAXDATASIZE];

    struct hostent *he;

    struct sockaddr_in their_addr; // connector'saddress information

 

    if (( argc == 1) || (argc==2) )

    {

        fprintf(stderr,"usage: client hostname\nEx:\n$./client01 ip port\n");

        exit(1);

    }

 

    if ((he=gethostbyname(argv[1])) == NULL)

    { // get the host info

        herror("gethostbyname");

        exit(1);

    }

 

    if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)

    {

        perror("socket");

        exit(1);

    }

 

    their_addr.sin_family = AF_INET; // host byte order

    their_addr.sin_port = htons(atoi(argv[2])); // short, network byte order

    their_addr.sin_addr = *((struct in_addr *)he->h_addr);

    memset(their_addr.sin_zero, '', sizeof their_addr.sin_zero);

 

    if (connect(sockfd, (struct sockaddr *)&their_addr, sizeof their_addr) == -1)

    {

        perror("connect");

        exit(1);

    }

 

    while( 1 )

    {

        if(send(sockfd, "hello, this is client message!", strlen("hello, this is client message!"), 0 ) == -1)

        {

            perror("send");

        }

 

        if ((numbytes=recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1)

        {

            perror("recv");

        }

        else if( numbytes == 0 )

        {

            printf("Remote server has shutdown!\n");

            break;

        }

 

        buf[numbytes] = '';

 

        printf("Received: %s \n",buf);

        sleep(1);

    }

 

    close(sockfd);

    return 0;

}
以上客户端和服务器例子均来自网络,自己没有单独写,因为觉得自己写了跟着个差不多还浪费时间,大家有兴趣可以去研究下libevent和libev,这都是很好用的C风格的epoll网络库,或者之际看ASIO吧。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息