您的位置:首页 > 其它

高级IO复用应用:聊天室程序

2013-10-09 20:50 423 查看


高级IO复用应用:聊天室程序

分类: 网络编程2013-10-09
13:55 103人阅读 评论(0) 收藏 举报

高级IO复用应用聊天室程序

简单的聊天室程序:客户端从标准输入输入数据后发送给服务端,服务端将用户发送来的数据转发给其它用户。这里采用IO复用poll技术。客户端采用了splice零拷贝。服务端采用了空间换时间(分配超大的用户数据数组,然后通过用户连接的文件描述符即可以索引到用户数据)

客户端程序:

[cpp] view
plaincopy

#define _GNU_SOURCE 1//为了支持POLLRDHUP事件

#include<sys/types.h>

#include<sys/socket.h>

#include<netinet/in.h>

#include<arpa/inet.h>

#include<assert.h>

#include<stdio.h>

#include<unistd.h>

#include<string.h>

#include<stdlib.h>

#include<poll.h>

#include<fcntl.h>

#include<iostream>

#define BUFFER_SIZE 64

using namespace std;

int main(int argc,char* argv[]){

if(argc<=2){

cout<<"argc<=2"<<endl;

return 1;

}

const char* ip=argv[1];//服务端地址

int port=atoi(argv[2]);

struct sockaddr_in address;

bzero(&address,sizeof(address));

address.sin_family=AF_INET;

inet_pton(AF_INET,ip,&address.sin_addr);

address.sin_port=htons(port);

int sockfd=socket(PF_INET,SOCK_STREAM,0);

assert(sockfd>=0);

if(connect(sockfd,(struct sockaddr*)&address,sizeof(address))<0){

cout<<"connect error"<<endl;

close(sockfd);

return 1;

}

pollfd fds[2];//pollfd结构体数组

fds[0].fd=0;//fds[0]是标准输入

fds[0].events=POLLIN;//注册可写事件

fds[0].revents=0;

fds[1].fd=sockfd;//fds[1]是socket描述符

fds[1].events=POLLIN|POLLRDHUP;//注册可写和挂起事件

fds[1].revents=0;

char read_buf[BUFFER_SIZE];

int pipefd[2];

int ret=pipe(pipefd);//创建一个管道,splice函数的参数必须有个是管道描述符(实现零拷贝)

assert(ret!=-1);

while(1){

ret=poll(fds,2,-1);//无限期等待注册事件发生

if(ret<0){

cout<<"poll error"<<endl;

break;

}

if(fds[1].revents&POLLRDHUP){//若是socket描述符挂起事件则代表服务器关闭了连接

cout<<"server close the connection"<<endl;

break;

}

else if(fds[1].revents&POLLIN){//sokect描述符可读事件

memset(read_buf,'\0',BUFFER_SIZE);

recv(fds[1].fd,read_buf,BUFFER_SIZE-1,0);//接收服务端发送来的数据(服务端的数据是其它用户发送给它的数据)

cout<<read_buf<<endl;

}

if(fds[0].revents&POLLIN){//标准输入端可写事件发生(该用户有数据输入并需要发送给服务端)

ret=splice(0,NULL,pipefd[1],NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);//将标准输入的数据零拷贝到管道的写端

ret=splice(pipefd[0],NULL,sockfd,NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);//将管道的读端数据零拷贝到socket描述符

}

}

close(sockfd);

return 0;

}

服务端程序:

[cpp] view
plaincopy

#define _GNU_SOURCE 1//支持POLLRDHUP事件

#include<sys/types.h>

#include<sys/socket.h>

#include<netinet/in.h>

#include<arpa/inet.h>

#include<assert.h>

#include<stdio.h>

#include<unistd.h>

#include<errno.h>

#include<string.h>

#include<fcntl.h>

#include<stdlib.h>

#include<poll.h>

#include<iostream>

#define USER_LIMIT 5//最大用户数,限制用户是为了提高poll性能

#define BUFFER_SIZE 64//缓冲区大小

#define FD_LIMIT 65535//文件描述符限制

using namespace std;

struct client_data{//客户数据:客户端socket地址、待写到客户端的数据位置、从客户端读入的数据。用于服务端与客户端高速交互

sockaddr_in address;

char* write_buf;

char buf[BUFFER_SIZE];

};

int setnonblocking(int fd){//设置文件描述符为非阻塞

int old_option=fcntl(fd,F_GETFL);

int new_option=old_option|O_NONBLOCK;

fcntl(fd,F_SETFL,new_option);

return old_option;

}

int main(int argc,char* argv[]){

if(argc<=2){

cout<<"argc<=2"<<endl;

return 1;

}

const char* ip=argv[1];

int port=atoi(argv[2]);

int ret=0;

struct sockaddr_in address;//服务器地址

bzero(&address,sizeof(address));

address.sin_family=AF_INET;

inet_pton(AF_INET,ip,&address.sin_addr);

address.sin_port=htons(port);

int listenfd=socket(PF_INET,SOCK_STREAM,0);

assert(listenfd>=0);

ret=bind(listenfd,(struct sockaddr*)&address,sizeof(address));

assert(ret!=-1);

ret=listen(listenfd,5);

assert(listenfd!=-1);

client_data* users=new client_data[FD_LIMIT];//分配一个超大的用户数据数组,当用户与服务器连接产生的连接描述符可以作为数组下标迅速索引到用户数据

pollfd fds[USER_LIMIT+1];//pollfd结构体数组,每个用户注册一组poll事件

int user_count=0;

for(int i=0;i<=USER_LIMIT;i++){//初始化poll事件

fds[i].fd=-1;//不可能的描述符

fds[i].events=0;

}

fds[0].fd=listenfd;

fds[0].events=POLLIN|POLLERR;//监听端口注册可读和错误事件

fds[0].revents=0;

while(1){

ret=poll(fds,user_count+1,-1);//无限期等待注册事件就绪

if(ret<0){

cout<<"poll error"<<endl;

break;

}

for(int i=0;i<user_count+1;i++){//这是poll的特征,遍历全部注册文件描述符(+1是由于多了监听描述符的缘故)

if((fds[i].fd==listenfd)&&(fds[i].revents&POLLIN)){//监听端口若可读事件发生说明有新用户请求连接

struct sockaddr_in client_address;

socklen_t client_addrlength=sizeof(client_address);

int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);//新用户的连接

if(connfd<0){

cout<<"accept error "<<strerror(errno)<<endl;

continue;

}

if(user_count>=USER_LIMIT){//已连接的用户数大于最大用户数,则不允许连接

const char* info="too many users,you can't connect\n";

cout<<info<<endl;

send(connfd,info,strlen(info),0);

close(connfd);

continue;

}

user_count++;

users[connfd].address=client_address;//新用户的数据更新

setnonblocking(connfd);

fds[user_count].fd=connfd;

fds[user_count].events=POLLIN|POLLRDHUP|POLLERR;

fds[user_count].revents=0;

cout<<"a new user come id:"<<user_count<<endl;

}

else if(fds[i].revents&POLLERR){//用户连接出错

cout<<"poll error in:"<<fds[i].fd<<endl;

char errors[100];

memset(errors,'\0',100);

socklen_t length=sizeof(errors);

if(getsockopt(fds[i].fd,SOL_SOCKET,SO_ERROR,&errors,&length)<0){

cout<<"get socket option error"<<endl;

}

continue;

}

else if(fds[i].revents&POLLRDHUP){//用户连接挂起则断开连接

users[fds[i].fd]=users[fds[user_count].fd];

close(fds[i].fd);

fds[i]=fds[user_count];

i--;

user_count--;

cout<<"a user leave"<<endl;

}

else if(fds[i].revents&POLLIN){//用户连接可读事件发生,表示用户有数据发送到来

int connfd=fds[i].fd;

memset(users[connfd].buf,'\0',BUFFER_SIZE-1);

ret=recv(connfd,users[connfd].buf,BUFFER_SIZE-1,0);

cout<<"get data:"<<users[connfd].buf<<" from user:"<<connfd<<" bytes:"<<ret<<endl;

if(ret<0){

if(errno!=EAGAIN){//若是EAGAIN不是网络错误,非阻塞情形下可能是设备不可用,这里EAGAIN表示数据读取完毕可以进行期待下次事件发生

close(connfd);//否则断开用户连接

users[fds[i].fd]=users[fds[user_count].fd];

fds[i]=fds[user_count];

i--;

user_count--;

}

}

else if(ret==0){}//这里的原因是由于下面###1###处强行将其它用户连接事件置为POLLIN但是若其它用户连接其实没有数据可读的情形

else{

for(int j=1;j<=user_count;j++){//除去该用户外的其它用户事件置为POLLOUT

if(fds[j].fd==connfd){

continue;

}

fds[j].events|=~POLLIN;

fds[j].events|=POLLOUT;

users[fds[j].fd].write_buf=users[connfd].buf;//共享缓冲区数据

}

}

}

else if(fds[i].revents&POLLOUT){//被置为POLLOUT事件后意味着有某个用户的数据需要发送给当前用户

int connfd=fds[i].fd;

if(!users[connfd].write_buf){

continue;

}

ret=send(connfd,users[connfd].write_buf,strlen(users[connfd].write_buf),0);

users[connfd].write_buf=NULL;//恢复当前用户数据

fds[i].events|=~POLLOUT;

fds[i].events|=POLLIN;//###1###

}

}

}

delete[] users;

close(listenfd);

return 0;

}//这里会出现一个问题就是若某个用户A频繁发送消息给服务端服务端,再转发给其它用户,此期间其它用户也有话要说,则服务端可读事件全被A用户占领了,其它用户全是POLLOUT事件


IO复用高级应用:同时处理TCP和UDP服务

分类: 网络编程2013-10-09
15:03 73人阅读 评论(0) 收藏 举报

IO复用高级应用同时处理TCP和UDP服

一个socket只能与一个socket地址绑定即一个socket只能监听一个端口,服务器如果要同时监听多个端口就必须创建多个socket,若在同一个端口监听多个服务也要创建多个socket绑定到这个端口上。现在服务器监听一个端口上的TCP和UDP请求,并将发送来的数据回射到客户端。

服务端程序:

[cpp] view
plaincopy

#include <sys/types.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <assert.h>

#include <stdio.h>

#include <unistd.h>

#include <errno.h>

#include <string.h>

#include <fcntl.h>

#include <stdlib.h>

#include <sys/epoll.h>

#include <pthread.h>

#define MAX_EVENT_NUMBER 1024 //最大事件数目

#define TCP_BUFFER_SIZE 512//TCP缓冲区

#define UDP_BUFFER_SIZE 1024//UDP缓冲区

int setnonblocking( int fd )//设置为非阻塞描述符

{

int old_option = fcntl( fd, F_GETFL );

int new_option = old_option | O_NONBLOCK;

fcntl( fd, F_SETFL, new_option );

return old_option;

}

void addfd( int epollfd, int fd )//注册事件

{

epoll_event event;

event.data.fd = fd;

//event.events = EPOLLIN | EPOLLET;

event.events = EPOLLIN;//可读事件

epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );

setnonblocking( fd );

}

int main( int argc, char* argv[] )

{

if( argc <= 2 )

{

printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );

return 1;

}

const char* ip = argv[1];

int port = atoi( argv[2] );

int ret = 0;

struct sockaddr_in address;//绑定TCP端口

bzero( &address, sizeof( address ) );

address.sin_family = AF_INET;

inet_pton( AF_INET, ip, &address.sin_addr );

address.sin_port = htons( port );

int listenfd = socket( PF_INET, SOCK_STREAM, 0 );

assert( listenfd >= 0 );

ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );

assert( ret != -1 );

ret = listen( listenfd, 5 );

assert( ret != -1 );

bzero( &address, sizeof( address ) );//绑定UDP端口

address.sin_family = AF_INET;

inet_pton( AF_INET, ip, &address.sin_addr );

address.sin_port = htons( port );

int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );

assert( udpfd >= 0 );

ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );

assert( ret != -1 );

epoll_event events[ MAX_EVENT_NUMBER ];

int epollfd = epoll_create( 5 );

assert( epollfd != -1 );

addfd( epollfd, listenfd );//TCP端口注册事件

addfd( epollfd, udpfd );//UDP端口注册事件

while( 1 )

{

int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );//无限期等待事件发生

if ( number < 0 )

{

printf( "epoll failure\n" );

break;

}

for ( int i = 0; i < number; i++ )//EPOLL就绪事件

{

int sockfd = events[i].data.fd;

if ( sockfd == listenfd )//监听端口监听TCP连接事件

{

struct sockaddr_in client_address;

socklen_t client_addrlength = sizeof( client_address );

int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );

addfd( epollfd, connfd );

}

else if ( sockfd == udpfd )//UDP连接

{

char buf[ UDP_BUFFER_SIZE ];

memset( buf, '\0', UDP_BUFFER_SIZE );

struct sockaddr_in client_address;

socklen_t client_addrlength = sizeof( client_address );//客户端地址

//UDP专用接收数据

ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength );

if( ret > 0 )

{//UDP专用发送数据(回射数据)

sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength );

}

}

else if ( events[i].events & EPOLLIN )//TCP连接

{

char buf[ TCP_BUFFER_SIZE ];

while( 1 )

{

memset( buf, '\0', TCP_BUFFER_SIZE );

ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 );

if( ret < 0 )

{

if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )//非阻塞出现这种errrno是读取数据完毕

{

break;

}

close( sockfd );

break;

}

else if( ret == 0 )//关闭连接

{

close( sockfd );

}

else

{

send( sockfd, buf, ret, 0 );//回射数据

}

}

}

else

{

printf( "something else happened \n" );

}

}

}

close( listenfd );

return 0;

}

客户端程序:

[cpp] view
plaincopy

#include<sys/socket.h>

#include<netinet/in.h>

#include<arpa/inet.h>

#include<assert.h>

#include<stdio.h>

#include<unistd.h>

#include<string.h>

#include<stdlib.h>

#include<iostream>

#define BUF_SIZE 1024

using namespace std;

int main(int argc,char* argv[]){

if(argc<=2){

cout<<"argc<=2"<<endl;

return 1;

}

const char* ip=argv[1];

int port=atoi(argv[2]);

struct sockaddr_in server_address;

bzero(&server_address,sizeof(server_address));

server_address.sin_family=AF_INET;

inet_pton(AF_INET,ip,&server_address.sin_addr);

server_address.sin_port=htons(port);

int sockfd=socket(PF_INET,SOCK_STREAM,0);

int sockudp=socket(PF_INET,SOCK_DGRAM,0);

assert(sockfd>=0);

if(connect(sockfd,(struct sockaddr*)&server_address,sizeof(server_address))<0){//TCP数据发送与接收

cout<<"connect error"<<endl;

return 1;

}

else{

const char* tcp="this is TCP data\n";

send(sockfd,tcp,strlen(tcp),0);

char buf[BUF_SIZE];

int ret=recv(sockfd,buf,BUF_SIZE-1,0);

if(ret<0){

cout<<"recv tcp error"<<endl;

}

else{

buf[ret+1]='\0';

cout<<ret<<" "<<buf<<endl;

}

}

if(connect(sockudp,(struct sockaddr*)&server_address,sizeof(server_address))<0){//UDP数据发送与接收

cout<<"connect error"<<endl;

return 1;

}

else{

const char* udp="this is UDP data\n";

send(sockudp,udp,strlen(udp),0);

char buf[BUF_SIZE];

int ret=recv(sockudp,buf,BUF_SIZE-1,0);

if(ret<0){

cout<<"recv udp error"<<endl;

}

else{

buf[ret+1]='\0';

cout<<ret<<" "<<buf<<endl;

}

}

close(sockfd);

return 0;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: