您的位置:首页 > 理论基础 > 计算机网络

【网络】(八)I/O复用--Select(二)

2015-09-14 11:01 447 查看
第七篇文章中简介了几种IO模型,也使用select函数改进客户端的代码,本篇文章用它来改进一下服务端代码!

用select函数实现的服务器程序,叫做并发服务器,它还是在排队处理任务,无法并行处理这些事件!

1、select事件发生条件

0.1 可读事件

套接口缓冲区有数据可读;

连接的读一半关闭,即接收到FIN段时,可读,并且读操作会返回0

如果是监听套接口,已完成队列不为空时,可读;即有新的连接请求

套接口发生一个错误待处理,该错误可通过getsockopt函数指定SO_ERROR选项来获取

0.2 可写事件

套接口发送缓冲区有空间容纳数据

连接的写一半关闭,即接收到RST段之后,可写,再次调用write操作可导致SIGPIPE信号的产生

套接口发生一个错误待处理,该错误可通过getsockopt函数指定SO_ERROR选项来获取

0.3 异常事件

套接口存在带外数据

2、select并发服务器实现

使用select函数,采用单进程来处理所有事件,实现并发服务器!

客户端程序与《第七篇》相同,编译命令为:

gcc -Wall -g -std=gnu99 client.c -o client


服务端完整程序

编译命令:gcc -Wall -g -std=c99 server.c -o server

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <string.h>
#include <sys/select.h>

#define handle_error(msg)   \
do{perror(msg);exit(EXIT_FAILURE);}while(0)

ssize_t readn(int fd, void *buf, size_t count)
{
if((fd < 0) || (buf == NULL) || (count < 0))
return -1;
size_t nleft = count;   //剩余字节数
ssize_t nread = 0;      //已读字节数
char *pbuf = (char*)buf;
while(nleft > 0)
{
if((nread = read(fd, pbuf, nleft)) < 0)
{
if(errno == EINTR)
continue;
return -1;
}
else if (nread == 0)
return count - nleft;
pbuf += nread;
nleft -= nread;
}
return count;
}

ssize_t writen(int fd, const void *buf, size_t count)
{
if((fd < 0) || (buf == NULL) || (count < 0))
return -1;
size_t nleft = count;   //剩余字节数
ssize_t nwritten = 0;      //已发送字节数
char *pbuf = (char*)buf;
while (nleft > 0)
{
if((nwritten = write(fd, pbuf, nleft)) < 0)
{
if(errno == EINTR)
continue;
return -1;
}
else if(nwritten == 0)
continue;
pbuf += nwritten;
nleft -= nwritten;
}
return count;
}

//使用recv函数从套接字接收缓冲区中接收数据,但并不从缓冲去中清除数据
ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while(true)
{
int iret = recv(sockfd, buf, len, MSG_PEEK);
if(iret == -1 && errno == EINTR)    //如果失败是因为信号中端,那么就重新再试
continue;
return iret;
}
}

//如果读到的数据包含\n,则返回,表示一条完整的消息读取完毕
ssize_t recvline(int sockfd, void *buf, size_t maxlen)
{
int iret = 0;
int nread = 0;  //已读数据
char *pbuf = (char*)buf;
int nleft = maxlen; //剩余字符
while(true)
{
iret = recv_peek(sockfd, pbuf, nleft);
if(iret < 0)
return iret;    //读取失败
else if(iret == 0)
return iret;    //对方关闭套接口

nread = iret;
if(nread > nleft)       //已经读取到的数据只可能小于或者等于剩余的数据
exit(EXIT_FAILURE);

for(int i = 0; i < nread; i++)
{
if(pbuf[i] == '\n')
{
iret = readn(sockfd, pbuf, i+1);    //从缓冲区中读走包括\n在内的数据
if(iret != i+1)
exit(EXIT_FAILURE);     //没有读取都i+1个数据,说明失败
return iret;                //读取都\n返回
}
}

//在当前读到的数据中没有发现\n,那么先将这部分数据从缓冲区中读走,然后接着偷窥后面的数据
nleft -= nread;
iret = readn(sockfd, pbuf, nread);
if(iret != nread)
exit(EXIT_FAILURE);
pbuf += nread;
}
return -1;
}

int main(void)
{
int sk_fd = socket(AF_INET, SOCK_STREAM , IPPROTO_TCP);
if(sk_fd < 0)
handle_error("socket");

//使用REUSEADDR,不必等待TIME_WAIT 状态消失,就可以重新使用端口
int on = 1;
if(setsockopt(sk_fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
{
close(sk_fd);
handle_error("setsockopt");
}

struct sockaddr_in sr_addr;
memset(&sr_addr,0,sizeof(sr_addr));
sr_addr.sin_family = AF_INET;
sr_addr.sin_port = htons(5188);
sr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
//sr_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
//inet_aton("127.0.0.1",&sr_addr.sin_addr);

if(bind(sk_fd, (struct sockaddr*)&sr_addr, sizeof(sr_addr)) < 0)
{
close(sk_fd);
handle_error("bind");
}

//被动套接字
if(listen(sk_fd, SOMAXCONN) < 0)     //内核为此套接字排队的最大连接数由SOMAXCONN宏指定
{
close(sk_fd);
handle_error("listen");
}

//使用select函数实现单进程并发服务器
fd_set readset;
fd_set allset;
FD_ZERO(&readset);
FD_ZERO(&allset);
FD_SET(sk_fd, &allset);     //初始状态下,只关心监听套接字

int maxfd = sk_fd;      //假设最大文件描述符
int maxfd_last = maxfd; //保存一个maxfd记录,上一次的最大值
int nready = 0;
int maxindex = 0;       //client_sk数组最大已用位置
int maxindex_last = 0;  //maxindex上一次的值
int client_sk[FD_SETSIZE];      //存储客户端连接信息
for(int i = 0; i < FD_SETSIZE; i++)
{
client_sk[i] = -1;
}

while (true)
{
readset = allset;
nready = select(maxfd + 1, &readset, NULL, NULL, NULL);
if(nready == -1)
{
if(errno == EINTR)      //被信号中断
continue;
handle_error("select");
}
if(nready == 0)     //time out
continue;
if(FD_ISSET(sk_fd, &readset))   //检测到监听套接字事件
{
//调用accept建立连接
struct sockaddr_in cl_addr;
socklen_t cl_length = sizeof(cl_addr);
memset(&cl_addr,0,sizeof(cl_addr));

int ac_sk = accept(sk_fd, (struct sockaddr *)&cl_addr, &cl_length);
if(ac_sk < 0)
{
if(errno == EINTR)
continue;
handle_error("accept");
}
//加入到监听集合中
FD_SET(ac_sk, &allset);
if(ac_sk > maxfd)
{
maxfd_last = maxfd; //保存历史记录
maxfd = ac_sk;      //更新最大文件描述符
}
int i;
for(i = 0; i < FD_SETSIZE; i++)
{
if(client_sk[i] < 0)
{
client_sk[i] = ac_sk;
if(maxindex < i)
{
maxindex_last = maxindex;   //保存历史记录
maxindex = i;       //更新最大已用位置
}
break;
}
}
if(i == FD_SETSIZE)     //client_sk数组空间已满,客户端太多
{
fprintf(stderr, "Client too many!");
exit(EXIT_FAILURE);
}
printf("Connect ip = %s\tport = %d\n",inet_ntoa(cl_addr.sin_addr),ntohs(cl_addr.sin_port));

if(--nready <= 0)   //如果除了监听套接字外没有其他套接字发生事件
continue;
}
//客户端连接套接字发生事件
for(int i = 0; i <= maxindex; i++)
{
int conn_sk = client_sk[i];
if(FD_ISSET(conn_sk, &readset))
{
//有数据可以接收
char recvbuf[1024];
memset(recvbuf,0,sizeof(recvbuf));
int iret = recvline(conn_sk,recvbuf,sizeof(recvbuf));    //获取包数据长度
if(iret == -1)
handle_error("read");
else if(iret == 0)
{
//客户端关闭,删除对应的连接套接字
if(conn_sk == maxfd)
{
maxfd = maxfd_last;     //如果删除的时最大文件描述符
}
if(i == maxindex)
{
maxindex = maxindex_last;   //如果删除的描述符的位置是当前最大已用位置
}
client_sk[i] = -1;
FD_CLR(conn_sk, &allset);
printf("Client was closed!\n");
close(conn_sk);
}
fputs(recvbuf,stdout);
writen(conn_sk, recvbuf, strlen(recvbuf));    //回传数据

if(--nready <= 0)       //判断是否处理完
break;
}
}

}
for(int i = 0; i <= maxindex; i++)
{
close(client_sk[i]);
}
close(sk_fd);
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: