您的位置:首页 > 运维架构 > Linux

Linux下select函数实现的聊天服务器

2012-10-23 01:02 218 查看
消息缓冲区类MessageBuffer,接收线程将受到的消息放入缓冲区,发送线程从缓冲区中取出消息

MessageBuffer.h

[cpp] view
plaincopy

//MessageBuffer.h

#ifndef _MESSAGE_BUF_INCLUDE_

#define _MESSAGE_BUF_INCLUDE_

#include <pthread.h>

#define MESSAGE_COUNT 16

#define MESSAGE_LENGTH 2048

class MessageBuffer{

private:

pthread_mutex_t mutex;//访问缓冲的互斥量

pthread_cond_t condition;//访问缓冲区的条件变量

//消息缓冲区,循环队列

char buf[MESSAGE_COUNT][MESSAGE_LENGTH];

int rear; //循环队列的队尾

int front; //循环队列的队首

public:

bool toStop;

//构造函数

MessageBuffer();

//析构函数

virtual ~MessageBuffer();

//将消息放入消息缓冲区,当缓冲区满时阻塞,toStop=true时返回-1

int PutMessage(const char *message);

//从消息缓冲区中获得消息,当缓冲区空时阻塞,toStop=true时返回-1

int GetMessage(char *mbuf, int buflen);

};

#endif



MessageBuffer.cpp

[cpp] view
plaincopy

//MessageBuffer.cpp

#include <stdio.h>

#include <string.h>

#include <time.h>

#include <pthread.h>

#include "MessageBuffer.h"

MessageBuffer::MessageBuffer() {

toStop = false;

pthread_mutex_init(&mutex,NULL);//初始化互斥量

pthread_cond_init(&condition,NULL);//初始化条件变量

rear = 0; //队尾指针指向0

front = 0; //队首指针指向0

printf("A MessageBuffer intance created./n");

}

MessageBuffer::~MessageBuffer(){

pthread_mutex_destroy(&mutex);

pthread_cond_destroy(&condition);

printf("A MessageBuffer instance destroyed./n");

}

//将消息放入消息缓冲区

int MessageBuffer::PutMessage(const char *message){

struct timespec t;

//等待互斥量

pthread_mutex_lock(&mutex);

while(!toStop && (rear+1)%MESSAGE_COUNT==front){

t.tv_sec = time(NULL)+1;

t.tv_nsec = 0;

pthread_cond_timedwait(&condition,&mutex,&t);

}

if(toStop){

pthread_cond_broadcast(&condition);

pthread_mutex_unlock(&mutex);

return -1;

}

int messageLen = strlen(message);

int copyLen = messageLen>=MESSAGE_LENGTH?MESSAGE_LENGTH-1:messageLen;

memcpy(buf[rear],message,copyLen);

buf[rear][copyLen]='/0';

rear = (rear+1)%MESSAGE_COUNT;

pthread_cond_signal(&condition);

pthread_mutex_unlock(&mutex);

return 0;

}

//从消息缓冲区中获得消息

int MessageBuffer::GetMessage(char *mbuf, int buflen){

struct timespec t;

pthread_mutex_lock(&mutex);

while(!toStop && rear==front){

t.tv_sec = time(NULL)+1;

t.tv_nsec = 0;

pthread_cond_timedwait(&condition,&mutex,&t);

}

if(toStop){

pthread_cond_broadcast(&condition);

pthread_mutex_unlock(&mutex);

return -1;

}

int messageLen = strlen(buf[front]);

int copyLen = messageLen>=buflen ? buflen-1 : messageLen;

memcpy(mbuf,buf[front],copyLen);

mbuf[copyLen]='/0';

front = (front+1)%MESSAGE_COUNT;

pthread_cond_signal(&condition);

pthread_mutex_unlock(&mutex);

return 0;

}



客户类Clients,用于维护套接字socket和套接字地址struct sockaddr_in之间的对应关系,并维护用户的姓名。

Clients.h

[cpp] view
plaincopy

//Clients.h

#ifndef _CLIENTS_INCLUDE_

#define _CLIENTS_INCLUDE_

#include <sys/types.h>

#include <netinet/in.h>

#include <pthread.h>

#define NAME_LEN 50

#define MAX_CLIENT 30

typedef struct client_info{

int sock;

struct sockaddr_in clientAddr;

char name[NAME_LEN];

}CLIENT_INFO;

class Clients{

private:

pthread_mutex_t mutex;

CLIENT_INFO client[MAX_CLIENT];

int clientCount;

int IPtoString(unsigned long ip, char *buf, int buflen);

int Search(int sock);

public:

Clients();//构造函数

virtual ~Clients();//析构函数



int GetClientCount();



bool PutClient(int sock,const struct sockaddr_in &clientAddr);

void RemoveClient(int sock);

bool GetAddrBySocket(int sock,struct sockaddr_in *addr);

bool PutName(int sock,const char *name, int namelen);

bool GetName(int sock, char *name, int namelen);

int GetAllSocket(int* sockArray, int arrayLen );

};

#endif



Clients.cpp

[cpp] view
plaincopy

//Clients.cpp

#include <stdio.h>

#include <string.h>

#include <arpa/inet.h>

#include "Clients.h"

Clients::Clients() {

pthread_mutex_init(&mutex, NULL);

clientCount = 0;

printf("Clients created./n");

}

Clients::~Clients() {

pthread_mutex_destroy(&mutex);

printf("Clients destroyed./n");

}



int Clients::Search(int sock){

int index = -1;

for(int i=0; i<clientCount; i++) {

if(client[i].sock==sock){

index = i;

break;

}

}

return index;

}

int Clients::IPtoString(unsigned long ip,char *buf,int buflen){

unsigned char *p = (unsigned char*)&ip;

if(buflen<16){

return -1;

}

sprintf(buf,"%u.%u.%u.%u",*p,*(p+1),*(p+2),*(p+3));

return strlen(buf);

}

int Clients::GetClientCount(){

return clientCount;

}



bool Clients::PutClient(int sock,const struct sockaddr_in &clientAddr) {

if(clientCount==MAX_CLIENT){

return false;

}

pthread_mutex_lock(&mutex);

client[clientCount].sock = sock;

client[clientCount].clientAddr = clientAddr;

int buflen = sizeof(client[clientCount].name);

int pos = IPtoString(clientAddr.sin_addr.s_addr,client[clientCount].name,buflen);

sprintf(&client[clientCount].name[pos],":%d",ntohs(clientAddr.sin_port));



clientCount++;

pthread_mutex_unlock(&mutex);

return true;

}

void Clients::RemoveClient(int sock){

pthread_mutex_lock(&mutex);

int index = Search(sock);

if(index!=-1){

for(int i=index; i<clientCount-1; i++){

client[i] = client[i+1];

}

clientCount--;

}

pthread_mutex_unlock(&mutex);

}



bool Clients::GetAddrBySocket(int sock,struct sockaddr_in *addr){

pthread_mutex_lock(&mutex);

int index = Search(sock);

if(index!=-1){

memcpy(addr,&client[index].clientAddr,sizeof(struct sockaddr_in));

}

pthread_mutex_unlock(&mutex);

return index!=-1;

}

bool Clients::PutName(int sock,const char *name,int namelen) {

pthread_mutex_lock(&mutex);

int index = Search(sock);

if(index!=-1){

int copyLen = namelen>=NAME_LEN ? NAME_LEN-1:namelen;

memcpy(client[index].name,name,copyLen);

client[index].name[copyLen]='/0';

}

pthread_mutex_unlock(&mutex);

return index!=-1;

}

bool Clients::GetName(int sock, char *name, int namelen) {

pthread_mutex_lock(&mutex);

int index = Search(sock);

if(index!=-1){

int msgLen = strlen(client[index].name);

int copyLen = (msgLen<namelen)? msgLen:(namelen-1);

memcpy(name,client[index].name,copyLen);

name[copyLen]='/0';

}

pthread_mutex_unlock(&mutex);

return index!=-1;

}

int Clients::GetAllSocket(int* sockArray, int arrayLen ) {

pthread_mutex_lock(&mutex);

int copyCount = arrayLen>clientCount ? clientCount : arrayLen;

for(int i=0; i<copyCount; i++){

sockArray[i] = client[i].sock;

}

pthread_mutex_unlock(&mutex);

return copyCount;

}

聊天室服务器主程序Server.cpp

[cpp] view
plaincopy

/*server.c*/

#include <stdio.h>

#include <string.h>

#include <stdlib.h>

#include <sys/types.h>

#include <netinet/in.h>

#include <sys/socket.h>

#include <sys/select.h>

#include <pthread.h>

#include <unistd.h>

#include <netdb.h>

#include <arpa/inet.h>

#include "MessageBuffer.h"

#include "Clients.h"

using namespace std;

#define SERVER_PORT 8000

#define BUFFER_SIZE 4096

#ifndef MAX_CLIENT

#define MAX_CLIENT 30

#endif

#ifndef NAME_LEN

#define NAME_LEN 50

#endif

MessageBuffer messageBuffer;

Clients clients;

void* ListenThread(void*);

void* RecvThread(void*);

void* SendThread(void*);

void ProcessMessage(int sock,char buf[],int bufsize,int bytes);

bool toStop=false;

int main(int argc,char* argv[]) {



if(argc!=2){

printf("Usage: %s PortNumber/n",argv[0]);

return -1;

}

unsigned short port;

if((port = atoi(argv[1]))==0){

printf("incorrect port number./n");

return -1;

}

int s;

struct sockaddr_in serverAddr;



s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);

if(s==-1){

fprintf(stderr,"create socket failed./n");

return -1;

}

bzero(&serverAddr,sizeof(struct sockaddr_in));

serverAddr.sin_family = AF_INET;

serverAddr.sin_port = htons(port);

serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);

if(bind(s,(struct sockaddr*)&serverAddr,sizeof(serverAddr))==-1){

fprintf(stderr,"bind socket to port %d failed./n",port);

return -1;

}

if(listen(s,SOMAXCONN)==-1){

fprintf(stderr,"listen failed./n");

return -1;

}

printf("Server is listening on ");

char hostname[255];

if(gethostname(hostname,sizeof(hostname))){

printf("gethostname() failed./n");

return -1;

}

struct hostent* pHost = gethostbyname(hostname);

if(pHost){

for(int i=0; pHost->h_addr_list[i]; i++){

printf("%s ",inet_ntoa(*(in_addr*)pHost->h_addr_list[i]));

}

}

printf("/nport: %d/n",port);

pthread_t tListenId;

if(pthread_create(&tListenId,NULL,ListenThread,&s)){

printf("failed to create listen thread./n");

return -1;

}

pthread_t tRecvId;

if(pthread_create(&tRecvId,NULL,RecvThread,NULL)){

printf("failed to create recv thread./n");

return -1;

}

pthread_t tSendId;

if(pthread_create(&tSendId,NULL,SendThread,NULL)){

printf("failed to create send thread./n");

return -1;

}



while(getchar()!='q');



toStop = true;

messageBuffer.toStop = true;



pthread_join(tListenId,NULL);

pthread_join(tRecvId,NULL);

pthread_join(tSendId,NULL);

close(s);

int sock[MAX_CLIENT];

int count = clients.GetAllSocket(sock,MAX_CLIENT);

for(int i=0;i<count;i++){

close(sock[i]);

}



printf("server stopped./n");



return 0;

}

void* ListenThread(void*ps){

int s=*(int*)ps;

fd_set listenSet;

int sock;

struct sockaddr_in clientAddr;

struct timeval timeout;

while(!toStop){

FD_ZERO(&listenSet);

FD_SET(s,&listenSet);

timeout.tv_sec = 5;

timeout.tv_usec = 0;

int ret = select(s+1,&listenSet,NULL,NULL,&timeout);

if(toStop){

printf("ListenThread: exit./n");

return NULL;

}

if(ret==-1){

printf("ListenThread: select() failed!/n");

}else if(ret==0){

printf("ListenThread: select() time out./n");

}else{

if(FD_ISSET(s,&listenSet)){

socklen_t addrlen = sizeof(struct sockaddr_in);

memset(&clientAddr,0,sizeof(struct sockaddr_in));

if((sock=accept(s,(struct sockaddr*)&clientAddr,&addrlen))==-1){

fprintf(stderr,"accept failed./n");

}

if(!clients.PutClient(sock,clientAddr)){

printf("max client limited. MAX_CLIENT=%d/n",MAX_CLIENT);

close(sock);

}

printf("accept a connection from %s:%u/n",

inet_ntoa(*(struct in_addr*)&(clientAddr.sin_addr.s_addr)),

ntohs(clientAddr.sin_port));

printf("new socket is: %u/n",sock);

}

}

}

return NULL;

}

void* RecvThread(void*){

fd_set readSet;

int sock[MAX_CLIENT];

char buf[BUFFER_SIZE];

struct timeval timeout;

while(!toStop){

int count = clients.GetAllSocket(sock,MAX_CLIENT);

if(count==0){

sleep(2);

if(toStop){

printf("RecvThread: exit./n");

return NULL;

}

continue;

}

FD_ZERO(&readSet);

int maxfd=0;

for(int i=0;i<count;i++){

printf("--%d",sock[i]);

FD_SET(sock[i],&readSet);

if(sock[i]>maxfd){

maxfd = sock[i];

}

}

printf("/n");

timeout.tv_sec = 2;

timeout.tv_usec = 0;

int ret = select(maxfd+1,&readSet,NULL,NULL,&timeout);

if(toStop){

printf("RecvThread: exit./n");

return NULL;

}

if(ret==-1){

printf("RecvThread: select() failed!/n");

}else if(ret==0){

printf("RecvThread: select() time out./n");

}else{

for(int i=0; i<count; i++){

if(FD_ISSET(sock[i],&readSet)){

int bytes=recv(sock[i],buf,sizeof(buf)-1,0);

if(bytes==-1){

printf("RecvThread: recv failed./n");

clients.RemoveClient(sock[i]);

close(sock[i]);

}else if(bytes==0){

printf("RecvThread: socket closed by the other side./n");

clients.RemoveClient(sock[i]);

close(sock[i]);

}else{

ProcessMessage(sock[i],buf,sizeof(buf),bytes);

}

}

}

}



}



return NULL;

}

void* SendThread(void*){

fd_set writeSet;

int sock[MAX_CLIENT];

char buf[BUFFER_SIZE];

struct timeval timeout;

while(!toStop){

int ret = messageBuffer.GetMessage(buf,sizeof(buf));

printf("get a message from buffer./n");

if(ret==-1){

printf("SendThread: exit./n");

return NULL;

}

int count = clients.GetAllSocket(sock,MAX_CLIENT);

FD_ZERO(&writeSet);

int maxfd = 0;

for(int i=0;i<count;i++){

FD_SET(sock[i],&writeSet);

if(sock[i]>maxfd){

maxfd = sock[i];

}

}

timeout.tv_sec = 2;

timeout.tv_usec = 0;

ret = select(maxfd+1,NULL,&writeSet,NULL,&timeout);

if(toStop){

printf("SendThread: exit./n");

return NULL;

}

if(ret==-1){

printf("SendThread: select() failed!/n");

}else if(ret==0){

printf("SendThread: select() time out./n");

}else{

for(int i=0;i<count;i++){

if(FD_ISSET(sock[i],&writeSet)){

int messageLen = strlen(buf);

int bytes = send(sock[i],buf,messageLen,0);

if(bytes==-1){

printf("SendThread: send() failed./n");

}else if(bytes!=messageLen){

printf("SendThread: send message trunked.");

}else{

//do nothing

}

}

}

}

}

return NULL;

}

void ProcessMessage(int sock,char buf[],int bufsize,int bytes){

struct sockaddr_in clientAddr;

if(!clients.GetAddrBySocket(sock,&clientAddr)){

printf("ProcessMessage: can not find socket address./n");

return;

}

char ipString[16];

unsigned char *ip = (unsigned char*)&clientAddr.sin_addr.s_addr;

sprintf(ipString,"%u.%u.%u.%u",*ip,*(ip+1),*(ip+2),*(ip+3));

unsigned short port = ntohs(clientAddr.sin_port);

buf[bytes]='/0';

printf("Message from %s:%d: %s/n",ipString,port,buf);

const char* CMD_BYE="bye";

if(strcmp(buf,CMD_BYE)==0){

send(sock,CMD_BYE,strlen(CMD_BYE),0);

clients.RemoveClient(sock);

close(sock);

printf("%s:%u disconnected./n", ipString, port);

return;

}else{

char bufWithName[BUFFER_SIZE+NAME_LEN];

char cmdname[5];

char name[NAME_LEN];

memcpy(cmdname, buf, 4);

cmdname[4] = '/0';

const char* CMD_NAME="name";

if(strcmp(cmdname,CMD_NAME)==0){

char newname[NAME_LEN];

int nameLen = strlen(buf+5);

int copyLen;

if(nameLen>=NAME_LEN){

copyLen = NAME_LEN-1;

}else{

copyLen = nameLen;

}

memcpy(newname,buf+5,copyLen);

newname[copyLen]='/0';

clients.GetName(sock,name,sizeof(name));

sprintf(bufWithName,"%s change name to %s",name,newname);

clients.PutName(sock,newname,strlen(newname));

messageBuffer.PutMessage(bufWithName);

}else{

clients.GetName(sock,name,sizeof(name));

sprintf(bufWithName,"%s: %s",name,buf);

messageBuffer.PutMessage(bufWithName);

}

}

}



编译脚本文件compile



g++ -c MessageBuffer.cpp

g++ -c Clients.cpp

g++ -c Server.cpp

g++ -lpthread -o server MessageBuffer.o Clients.o Server.o



chmod +x compile



./compile 就可以编译并链接



运行服务器

./server 8000

注意Linux下的防火墙iptables服务是否已经启动,如果启动了,需要在/etc/sysconfig/iptables中加入例外端口8000,并重启启动防火墙

/etc/init.d/iptables restart
转载:点击打开链接
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: