Linux下select函数实现的聊天服务器
2012-10-23 01:02
218 查看
消息缓冲区类MessageBuffer,接收线程将受到的消息放入缓冲区,发送线程从缓冲区中取出消息
MessageBuffer.h
[cpp] view
plaincopy
//MessageBuffer.h
#ifndef _MESSAGE_BUF_INCLUDE_
#define _MESSAGE_BUF_INCLUDE_
#include <pthread.h>
#define MESSAGE_COUNT 16
#define MESSAGE_LENGTH 2048
class MessageBuffer{
private:
pthread_mutex_t mutex;//访问缓冲的互斥量
pthread_cond_t condition;//访问缓冲区的条件变量
//消息缓冲区,循环队列
char buf[MESSAGE_COUNT][MESSAGE_LENGTH];
int rear; //循环队列的队尾
int front; //循环队列的队首
public:
bool toStop;
//构造函数
MessageBuffer();
//析构函数
virtual ~MessageBuffer();
//将消息放入消息缓冲区,当缓冲区满时阻塞,toStop=true时返回-1
int PutMessage(const char *message);
//从消息缓冲区中获得消息,当缓冲区空时阻塞,toStop=true时返回-1
int GetMessage(char *mbuf, int buflen);
};
#endif
MessageBuffer.cpp
[cpp] view
plaincopy
//MessageBuffer.cpp
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include "MessageBuffer.h"
MessageBuffer::MessageBuffer() {
toStop = false;
pthread_mutex_init(&mutex,NULL);//初始化互斥量
pthread_cond_init(&condition,NULL);//初始化条件变量
rear = 0; //队尾指针指向0
front = 0; //队首指针指向0
printf("A MessageBuffer intance created./n");
}
MessageBuffer::~MessageBuffer(){
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&condition);
printf("A MessageBuffer instance destroyed./n");
}
//将消息放入消息缓冲区
int MessageBuffer::PutMessage(const char *message){
struct timespec t;
//等待互斥量
pthread_mutex_lock(&mutex);
while(!toStop && (rear+1)%MESSAGE_COUNT==front){
t.tv_sec = time(NULL)+1;
t.tv_nsec = 0;
pthread_cond_timedwait(&condition,&mutex,&t);
}
if(toStop){
pthread_cond_broadcast(&condition);
pthread_mutex_unlock(&mutex);
return -1;
}
int messageLen = strlen(message);
int copyLen = messageLen>=MESSAGE_LENGTH?MESSAGE_LENGTH-1:messageLen;
memcpy(buf[rear],message,copyLen);
buf[rear][copyLen]='/0';
rear = (rear+1)%MESSAGE_COUNT;
pthread_cond_signal(&condition);
pthread_mutex_unlock(&mutex);
return 0;
}
//从消息缓冲区中获得消息
int MessageBuffer::GetMessage(char *mbuf, int buflen){
struct timespec t;
pthread_mutex_lock(&mutex);
while(!toStop && rear==front){
t.tv_sec = time(NULL)+1;
t.tv_nsec = 0;
pthread_cond_timedwait(&condition,&mutex,&t);
}
if(toStop){
pthread_cond_broadcast(&condition);
pthread_mutex_unlock(&mutex);
return -1;
}
int messageLen = strlen(buf[front]);
int copyLen = messageLen>=buflen ? buflen-1 : messageLen;
memcpy(mbuf,buf[front],copyLen);
mbuf[copyLen]='/0';
front = (front+1)%MESSAGE_COUNT;
pthread_cond_signal(&condition);
pthread_mutex_unlock(&mutex);
return 0;
}
客户类Clients,用于维护套接字socket和套接字地址struct sockaddr_in之间的对应关系,并维护用户的姓名。
Clients.h
[cpp] view
plaincopy
//Clients.h
#ifndef _CLIENTS_INCLUDE_
#define _CLIENTS_INCLUDE_
#include <sys/types.h>
#include <netinet/in.h>
#include <pthread.h>
#define NAME_LEN 50
#define MAX_CLIENT 30
typedef struct client_info{
int sock;
struct sockaddr_in clientAddr;
char name[NAME_LEN];
}CLIENT_INFO;
class Clients{
private:
pthread_mutex_t mutex;
CLIENT_INFO client[MAX_CLIENT];
int clientCount;
int IPtoString(unsigned long ip, char *buf, int buflen);
int Search(int sock);
public:
Clients();//构造函数
virtual ~Clients();//析构函数
int GetClientCount();
bool PutClient(int sock,const struct sockaddr_in &clientAddr);
void RemoveClient(int sock);
bool GetAddrBySocket(int sock,struct sockaddr_in *addr);
bool PutName(int sock,const char *name, int namelen);
bool GetName(int sock, char *name, int namelen);
int GetAllSocket(int* sockArray, int arrayLen );
};
#endif
Clients.cpp
[cpp] view
plaincopy
//Clients.cpp
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include "Clients.h"
Clients::Clients() {
pthread_mutex_init(&mutex, NULL);
clientCount = 0;
printf("Clients created./n");
}
Clients::~Clients() {
pthread_mutex_destroy(&mutex);
printf("Clients destroyed./n");
}
int Clients::Search(int sock){
int index = -1;
for(int i=0; i<clientCount; i++) {
if(client[i].sock==sock){
index = i;
break;
}
}
return index;
}
int Clients::IPtoString(unsigned long ip,char *buf,int buflen){
unsigned char *p = (unsigned char*)&ip;
if(buflen<16){
return -1;
}
sprintf(buf,"%u.%u.%u.%u",*p,*(p+1),*(p+2),*(p+3));
return strlen(buf);
}
int Clients::GetClientCount(){
return clientCount;
}
bool Clients::PutClient(int sock,const struct sockaddr_in &clientAddr) {
if(clientCount==MAX_CLIENT){
return false;
}
pthread_mutex_lock(&mutex);
client[clientCount].sock = sock;
client[clientCount].clientAddr = clientAddr;
int buflen = sizeof(client[clientCount].name);
int pos = IPtoString(clientAddr.sin_addr.s_addr,client[clientCount].name,buflen);
sprintf(&client[clientCount].name[pos],":%d",ntohs(clientAddr.sin_port));
clientCount++;
pthread_mutex_unlock(&mutex);
return true;
}
void Clients::RemoveClient(int sock){
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
for(int i=index; i<clientCount-1; i++){
client[i] = client[i+1];
}
clientCount--;
}
pthread_mutex_unlock(&mutex);
}
bool Clients::GetAddrBySocket(int sock,struct sockaddr_in *addr){
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
memcpy(addr,&client[index].clientAddr,sizeof(struct sockaddr_in));
}
pthread_mutex_unlock(&mutex);
return index!=-1;
}
bool Clients::PutName(int sock,const char *name,int namelen) {
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
int copyLen = namelen>=NAME_LEN ? NAME_LEN-1:namelen;
memcpy(client[index].name,name,copyLen);
client[index].name[copyLen]='/0';
}
pthread_mutex_unlock(&mutex);
return index!=-1;
}
bool Clients::GetName(int sock, char *name, int namelen) {
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
int msgLen = strlen(client[index].name);
int copyLen = (msgLen<namelen)? msgLen:(namelen-1);
memcpy(name,client[index].name,copyLen);
name[copyLen]='/0';
}
pthread_mutex_unlock(&mutex);
return index!=-1;
}
int Clients::GetAllSocket(int* sockArray, int arrayLen ) {
pthread_mutex_lock(&mutex);
int copyCount = arrayLen>clientCount ? clientCount : arrayLen;
for(int i=0; i<copyCount; i++){
sockArray[i] = client[i].sock;
}
pthread_mutex_unlock(&mutex);
return copyCount;
}
聊天室服务器主程序Server.cpp
[cpp] view
plaincopy
/*server.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <pthread.h>
#include <unistd.h>
#include <netdb.h>
#include <arpa/inet.h>
#include "MessageBuffer.h"
#include "Clients.h"
using namespace std;
#define SERVER_PORT 8000
#define BUFFER_SIZE 4096
#ifndef MAX_CLIENT
#define MAX_CLIENT 30
#endif
#ifndef NAME_LEN
#define NAME_LEN 50
#endif
MessageBuffer messageBuffer;
Clients clients;
void* ListenThread(void*);
void* RecvThread(void*);
void* SendThread(void*);
void ProcessMessage(int sock,char buf[],int bufsize,int bytes);
bool toStop=false;
int main(int argc,char* argv[]) {
if(argc!=2){
printf("Usage: %s PortNumber/n",argv[0]);
return -1;
}
unsigned short port;
if((port = atoi(argv[1]))==0){
printf("incorrect port number./n");
return -1;
}
int s;
struct sockaddr_in serverAddr;
s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(s==-1){
fprintf(stderr,"create socket failed./n");
return -1;
}
bzero(&serverAddr,sizeof(struct sockaddr_in));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(s,(struct sockaddr*)&serverAddr,sizeof(serverAddr))==-1){
fprintf(stderr,"bind socket to port %d failed./n",port);
return -1;
}
if(listen(s,SOMAXCONN)==-1){
fprintf(stderr,"listen failed./n");
return -1;
}
printf("Server is listening on ");
char hostname[255];
if(gethostname(hostname,sizeof(hostname))){
printf("gethostname() failed./n");
return -1;
}
struct hostent* pHost = gethostbyname(hostname);
if(pHost){
for(int i=0; pHost->h_addr_list[i]; i++){
printf("%s ",inet_ntoa(*(in_addr*)pHost->h_addr_list[i]));
}
}
printf("/nport: %d/n",port);
pthread_t tListenId;
if(pthread_create(&tListenId,NULL,ListenThread,&s)){
printf("failed to create listen thread./n");
return -1;
}
pthread_t tRecvId;
if(pthread_create(&tRecvId,NULL,RecvThread,NULL)){
printf("failed to create recv thread./n");
return -1;
}
pthread_t tSendId;
if(pthread_create(&tSendId,NULL,SendThread,NULL)){
printf("failed to create send thread./n");
return -1;
}
while(getchar()!='q');
toStop = true;
messageBuffer.toStop = true;
pthread_join(tListenId,NULL);
pthread_join(tRecvId,NULL);
pthread_join(tSendId,NULL);
close(s);
int sock[MAX_CLIENT];
int count = clients.GetAllSocket(sock,MAX_CLIENT);
for(int i=0;i<count;i++){
close(sock[i]);
}
printf("server stopped./n");
return 0;
}
void* ListenThread(void*ps){
int s=*(int*)ps;
fd_set listenSet;
int sock;
struct sockaddr_in clientAddr;
struct timeval timeout;
while(!toStop){
FD_ZERO(&listenSet);
FD_SET(s,&listenSet);
timeout.tv_sec = 5;
timeout.tv_usec = 0;
int ret = select(s+1,&listenSet,NULL,NULL,&timeout);
if(toStop){
printf("ListenThread: exit./n");
return NULL;
}
if(ret==-1){
printf("ListenThread: select() failed!/n");
}else if(ret==0){
printf("ListenThread: select() time out./n");
}else{
if(FD_ISSET(s,&listenSet)){
socklen_t addrlen = sizeof(struct sockaddr_in);
memset(&clientAddr,0,sizeof(struct sockaddr_in));
if((sock=accept(s,(struct sockaddr*)&clientAddr,&addrlen))==-1){
fprintf(stderr,"accept failed./n");
}
if(!clients.PutClient(sock,clientAddr)){
printf("max client limited. MAX_CLIENT=%d/n",MAX_CLIENT);
close(sock);
}
printf("accept a connection from %s:%u/n",
inet_ntoa(*(struct in_addr*)&(clientAddr.sin_addr.s_addr)),
ntohs(clientAddr.sin_port));
printf("new socket is: %u/n",sock);
}
}
}
return NULL;
}
void* RecvThread(void*){
fd_set readSet;
int sock[MAX_CLIENT];
char buf[BUFFER_SIZE];
struct timeval timeout;
while(!toStop){
int count = clients.GetAllSocket(sock,MAX_CLIENT);
if(count==0){
sleep(2);
if(toStop){
printf("RecvThread: exit./n");
return NULL;
}
continue;
}
FD_ZERO(&readSet);
int maxfd=0;
for(int i=0;i<count;i++){
printf("--%d",sock[i]);
FD_SET(sock[i],&readSet);
if(sock[i]>maxfd){
maxfd = sock[i];
}
}
printf("/n");
timeout.tv_sec = 2;
timeout.tv_usec = 0;
int ret = select(maxfd+1,&readSet,NULL,NULL,&timeout);
if(toStop){
printf("RecvThread: exit./n");
return NULL;
}
if(ret==-1){
printf("RecvThread: select() failed!/n");
}else if(ret==0){
printf("RecvThread: select() time out./n");
}else{
for(int i=0; i<count; i++){
if(FD_ISSET(sock[i],&readSet)){
int bytes=recv(sock[i],buf,sizeof(buf)-1,0);
if(bytes==-1){
printf("RecvThread: recv failed./n");
clients.RemoveClient(sock[i]);
close(sock[i]);
}else if(bytes==0){
printf("RecvThread: socket closed by the other side./n");
clients.RemoveClient(sock[i]);
close(sock[i]);
}else{
ProcessMessage(sock[i],buf,sizeof(buf),bytes);
}
}
}
}
}
return NULL;
}
void* SendThread(void*){
fd_set writeSet;
int sock[MAX_CLIENT];
char buf[BUFFER_SIZE];
struct timeval timeout;
while(!toStop){
int ret = messageBuffer.GetMessage(buf,sizeof(buf));
printf("get a message from buffer./n");
if(ret==-1){
printf("SendThread: exit./n");
return NULL;
}
int count = clients.GetAllSocket(sock,MAX_CLIENT);
FD_ZERO(&writeSet);
int maxfd = 0;
for(int i=0;i<count;i++){
FD_SET(sock[i],&writeSet);
if(sock[i]>maxfd){
maxfd = sock[i];
}
}
timeout.tv_sec = 2;
timeout.tv_usec = 0;
ret = select(maxfd+1,NULL,&writeSet,NULL,&timeout);
if(toStop){
printf("SendThread: exit./n");
return NULL;
}
if(ret==-1){
printf("SendThread: select() failed!/n");
}else if(ret==0){
printf("SendThread: select() time out./n");
}else{
for(int i=0;i<count;i++){
if(FD_ISSET(sock[i],&writeSet)){
int messageLen = strlen(buf);
int bytes = send(sock[i],buf,messageLen,0);
if(bytes==-1){
printf("SendThread: send() failed./n");
}else if(bytes!=messageLen){
printf("SendThread: send message trunked.");
}else{
//do nothing
}
}
}
}
}
return NULL;
}
void ProcessMessage(int sock,char buf[],int bufsize,int bytes){
struct sockaddr_in clientAddr;
if(!clients.GetAddrBySocket(sock,&clientAddr)){
printf("ProcessMessage: can not find socket address./n");
return;
}
char ipString[16];
unsigned char *ip = (unsigned char*)&clientAddr.sin_addr.s_addr;
sprintf(ipString,"%u.%u.%u.%u",*ip,*(ip+1),*(ip+2),*(ip+3));
unsigned short port = ntohs(clientAddr.sin_port);
buf[bytes]='/0';
printf("Message from %s:%d: %s/n",ipString,port,buf);
const char* CMD_BYE="bye";
if(strcmp(buf,CMD_BYE)==0){
send(sock,CMD_BYE,strlen(CMD_BYE),0);
clients.RemoveClient(sock);
close(sock);
printf("%s:%u disconnected./n", ipString, port);
return;
}else{
char bufWithName[BUFFER_SIZE+NAME_LEN];
char cmdname[5];
char name[NAME_LEN];
memcpy(cmdname, buf, 4);
cmdname[4] = '/0';
const char* CMD_NAME="name";
if(strcmp(cmdname,CMD_NAME)==0){
char newname[NAME_LEN];
int nameLen = strlen(buf+5);
int copyLen;
if(nameLen>=NAME_LEN){
copyLen = NAME_LEN-1;
}else{
copyLen = nameLen;
}
memcpy(newname,buf+5,copyLen);
newname[copyLen]='/0';
clients.GetName(sock,name,sizeof(name));
sprintf(bufWithName,"%s change name to %s",name,newname);
clients.PutName(sock,newname,strlen(newname));
messageBuffer.PutMessage(bufWithName);
}else{
clients.GetName(sock,name,sizeof(name));
sprintf(bufWithName,"%s: %s",name,buf);
messageBuffer.PutMessage(bufWithName);
}
}
}
编译脚本文件compile
g++ -c MessageBuffer.cpp
g++ -c Clients.cpp
g++ -c Server.cpp
g++ -lpthread -o server MessageBuffer.o Clients.o Server.o
chmod +x compile
./compile 就可以编译并链接
运行服务器
./server 8000
注意Linux下的防火墙iptables服务是否已经启动,如果启动了,需要在/etc/sysconfig/iptables中加入例外端口8000,并重启启动防火墙
/etc/init.d/iptables restart
转载:点击打开链接
MessageBuffer.h
[cpp] view
plaincopy
//MessageBuffer.h
#ifndef _MESSAGE_BUF_INCLUDE_
#define _MESSAGE_BUF_INCLUDE_
#include <pthread.h>
#define MESSAGE_COUNT 16
#define MESSAGE_LENGTH 2048
class MessageBuffer{
private:
pthread_mutex_t mutex;//访问缓冲的互斥量
pthread_cond_t condition;//访问缓冲区的条件变量
//消息缓冲区,循环队列
char buf[MESSAGE_COUNT][MESSAGE_LENGTH];
int rear; //循环队列的队尾
int front; //循环队列的队首
public:
bool toStop;
//构造函数
MessageBuffer();
//析构函数
virtual ~MessageBuffer();
//将消息放入消息缓冲区,当缓冲区满时阻塞,toStop=true时返回-1
int PutMessage(const char *message);
//从消息缓冲区中获得消息,当缓冲区空时阻塞,toStop=true时返回-1
int GetMessage(char *mbuf, int buflen);
};
#endif
MessageBuffer.cpp
[cpp] view
plaincopy
//MessageBuffer.cpp
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include "MessageBuffer.h"
MessageBuffer::MessageBuffer() {
toStop = false;
pthread_mutex_init(&mutex,NULL);//初始化互斥量
pthread_cond_init(&condition,NULL);//初始化条件变量
rear = 0; //队尾指针指向0
front = 0; //队首指针指向0
printf("A MessageBuffer intance created./n");
}
MessageBuffer::~MessageBuffer(){
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&condition);
printf("A MessageBuffer instance destroyed./n");
}
//将消息放入消息缓冲区
int MessageBuffer::PutMessage(const char *message){
struct timespec t;
//等待互斥量
pthread_mutex_lock(&mutex);
while(!toStop && (rear+1)%MESSAGE_COUNT==front){
t.tv_sec = time(NULL)+1;
t.tv_nsec = 0;
pthread_cond_timedwait(&condition,&mutex,&t);
}
if(toStop){
pthread_cond_broadcast(&condition);
pthread_mutex_unlock(&mutex);
return -1;
}
int messageLen = strlen(message);
int copyLen = messageLen>=MESSAGE_LENGTH?MESSAGE_LENGTH-1:messageLen;
memcpy(buf[rear],message,copyLen);
buf[rear][copyLen]='/0';
rear = (rear+1)%MESSAGE_COUNT;
pthread_cond_signal(&condition);
pthread_mutex_unlock(&mutex);
return 0;
}
//从消息缓冲区中获得消息
int MessageBuffer::GetMessage(char *mbuf, int buflen){
struct timespec t;
pthread_mutex_lock(&mutex);
while(!toStop && rear==front){
t.tv_sec = time(NULL)+1;
t.tv_nsec = 0;
pthread_cond_timedwait(&condition,&mutex,&t);
}
if(toStop){
pthread_cond_broadcast(&condition);
pthread_mutex_unlock(&mutex);
return -1;
}
int messageLen = strlen(buf[front]);
int copyLen = messageLen>=buflen ? buflen-1 : messageLen;
memcpy(mbuf,buf[front],copyLen);
mbuf[copyLen]='/0';
front = (front+1)%MESSAGE_COUNT;
pthread_cond_signal(&condition);
pthread_mutex_unlock(&mutex);
return 0;
}
客户类Clients,用于维护套接字socket和套接字地址struct sockaddr_in之间的对应关系,并维护用户的姓名。
Clients.h
[cpp] view
plaincopy
//Clients.h
#ifndef _CLIENTS_INCLUDE_
#define _CLIENTS_INCLUDE_
#include <sys/types.h>
#include <netinet/in.h>
#include <pthread.h>
#define NAME_LEN 50
#define MAX_CLIENT 30
typedef struct client_info{
int sock;
struct sockaddr_in clientAddr;
char name[NAME_LEN];
}CLIENT_INFO;
class Clients{
private:
pthread_mutex_t mutex;
CLIENT_INFO client[MAX_CLIENT];
int clientCount;
int IPtoString(unsigned long ip, char *buf, int buflen);
int Search(int sock);
public:
Clients();//构造函数
virtual ~Clients();//析构函数
int GetClientCount();
bool PutClient(int sock,const struct sockaddr_in &clientAddr);
void RemoveClient(int sock);
bool GetAddrBySocket(int sock,struct sockaddr_in *addr);
bool PutName(int sock,const char *name, int namelen);
bool GetName(int sock, char *name, int namelen);
int GetAllSocket(int* sockArray, int arrayLen );
};
#endif
Clients.cpp
[cpp] view
plaincopy
//Clients.cpp
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include "Clients.h"
Clients::Clients() {
pthread_mutex_init(&mutex, NULL);
clientCount = 0;
printf("Clients created./n");
}
Clients::~Clients() {
pthread_mutex_destroy(&mutex);
printf("Clients destroyed./n");
}
int Clients::Search(int sock){
int index = -1;
for(int i=0; i<clientCount; i++) {
if(client[i].sock==sock){
index = i;
break;
}
}
return index;
}
int Clients::IPtoString(unsigned long ip,char *buf,int buflen){
unsigned char *p = (unsigned char*)&ip;
if(buflen<16){
return -1;
}
sprintf(buf,"%u.%u.%u.%u",*p,*(p+1),*(p+2),*(p+3));
return strlen(buf);
}
int Clients::GetClientCount(){
return clientCount;
}
bool Clients::PutClient(int sock,const struct sockaddr_in &clientAddr) {
if(clientCount==MAX_CLIENT){
return false;
}
pthread_mutex_lock(&mutex);
client[clientCount].sock = sock;
client[clientCount].clientAddr = clientAddr;
int buflen = sizeof(client[clientCount].name);
int pos = IPtoString(clientAddr.sin_addr.s_addr,client[clientCount].name,buflen);
sprintf(&client[clientCount].name[pos],":%d",ntohs(clientAddr.sin_port));
clientCount++;
pthread_mutex_unlock(&mutex);
return true;
}
void Clients::RemoveClient(int sock){
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
for(int i=index; i<clientCount-1; i++){
client[i] = client[i+1];
}
clientCount--;
}
pthread_mutex_unlock(&mutex);
}
bool Clients::GetAddrBySocket(int sock,struct sockaddr_in *addr){
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
memcpy(addr,&client[index].clientAddr,sizeof(struct sockaddr_in));
}
pthread_mutex_unlock(&mutex);
return index!=-1;
}
bool Clients::PutName(int sock,const char *name,int namelen) {
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
int copyLen = namelen>=NAME_LEN ? NAME_LEN-1:namelen;
memcpy(client[index].name,name,copyLen);
client[index].name[copyLen]='/0';
}
pthread_mutex_unlock(&mutex);
return index!=-1;
}
bool Clients::GetName(int sock, char *name, int namelen) {
pthread_mutex_lock(&mutex);
int index = Search(sock);
if(index!=-1){
int msgLen = strlen(client[index].name);
int copyLen = (msgLen<namelen)? msgLen:(namelen-1);
memcpy(name,client[index].name,copyLen);
name[copyLen]='/0';
}
pthread_mutex_unlock(&mutex);
return index!=-1;
}
int Clients::GetAllSocket(int* sockArray, int arrayLen ) {
pthread_mutex_lock(&mutex);
int copyCount = arrayLen>clientCount ? clientCount : arrayLen;
for(int i=0; i<copyCount; i++){
sockArray[i] = client[i].sock;
}
pthread_mutex_unlock(&mutex);
return copyCount;
}
聊天室服务器主程序Server.cpp
[cpp] view
plaincopy
/*server.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <pthread.h>
#include <unistd.h>
#include <netdb.h>
#include <arpa/inet.h>
#include "MessageBuffer.h"
#include "Clients.h"
using namespace std;
#define SERVER_PORT 8000
#define BUFFER_SIZE 4096
#ifndef MAX_CLIENT
#define MAX_CLIENT 30
#endif
#ifndef NAME_LEN
#define NAME_LEN 50
#endif
MessageBuffer messageBuffer;
Clients clients;
void* ListenThread(void*);
void* RecvThread(void*);
void* SendThread(void*);
void ProcessMessage(int sock,char buf[],int bufsize,int bytes);
bool toStop=false;
int main(int argc,char* argv[]) {
if(argc!=2){
printf("Usage: %s PortNumber/n",argv[0]);
return -1;
}
unsigned short port;
if((port = atoi(argv[1]))==0){
printf("incorrect port number./n");
return -1;
}
int s;
struct sockaddr_in serverAddr;
s = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(s==-1){
fprintf(stderr,"create socket failed./n");
return -1;
}
bzero(&serverAddr,sizeof(struct sockaddr_in));
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(s,(struct sockaddr*)&serverAddr,sizeof(serverAddr))==-1){
fprintf(stderr,"bind socket to port %d failed./n",port);
return -1;
}
if(listen(s,SOMAXCONN)==-1){
fprintf(stderr,"listen failed./n");
return -1;
}
printf("Server is listening on ");
char hostname[255];
if(gethostname(hostname,sizeof(hostname))){
printf("gethostname() failed./n");
return -1;
}
struct hostent* pHost = gethostbyname(hostname);
if(pHost){
for(int i=0; pHost->h_addr_list[i]; i++){
printf("%s ",inet_ntoa(*(in_addr*)pHost->h_addr_list[i]));
}
}
printf("/nport: %d/n",port);
pthread_t tListenId;
if(pthread_create(&tListenId,NULL,ListenThread,&s)){
printf("failed to create listen thread./n");
return -1;
}
pthread_t tRecvId;
if(pthread_create(&tRecvId,NULL,RecvThread,NULL)){
printf("failed to create recv thread./n");
return -1;
}
pthread_t tSendId;
if(pthread_create(&tSendId,NULL,SendThread,NULL)){
printf("failed to create send thread./n");
return -1;
}
while(getchar()!='q');
toStop = true;
messageBuffer.toStop = true;
pthread_join(tListenId,NULL);
pthread_join(tRecvId,NULL);
pthread_join(tSendId,NULL);
close(s);
int sock[MAX_CLIENT];
int count = clients.GetAllSocket(sock,MAX_CLIENT);
for(int i=0;i<count;i++){
close(sock[i]);
}
printf("server stopped./n");
return 0;
}
void* ListenThread(void*ps){
int s=*(int*)ps;
fd_set listenSet;
int sock;
struct sockaddr_in clientAddr;
struct timeval timeout;
while(!toStop){
FD_ZERO(&listenSet);
FD_SET(s,&listenSet);
timeout.tv_sec = 5;
timeout.tv_usec = 0;
int ret = select(s+1,&listenSet,NULL,NULL,&timeout);
if(toStop){
printf("ListenThread: exit./n");
return NULL;
}
if(ret==-1){
printf("ListenThread: select() failed!/n");
}else if(ret==0){
printf("ListenThread: select() time out./n");
}else{
if(FD_ISSET(s,&listenSet)){
socklen_t addrlen = sizeof(struct sockaddr_in);
memset(&clientAddr,0,sizeof(struct sockaddr_in));
if((sock=accept(s,(struct sockaddr*)&clientAddr,&addrlen))==-1){
fprintf(stderr,"accept failed./n");
}
if(!clients.PutClient(sock,clientAddr)){
printf("max client limited. MAX_CLIENT=%d/n",MAX_CLIENT);
close(sock);
}
printf("accept a connection from %s:%u/n",
inet_ntoa(*(struct in_addr*)&(clientAddr.sin_addr.s_addr)),
ntohs(clientAddr.sin_port));
printf("new socket is: %u/n",sock);
}
}
}
return NULL;
}
void* RecvThread(void*){
fd_set readSet;
int sock[MAX_CLIENT];
char buf[BUFFER_SIZE];
struct timeval timeout;
while(!toStop){
int count = clients.GetAllSocket(sock,MAX_CLIENT);
if(count==0){
sleep(2);
if(toStop){
printf("RecvThread: exit./n");
return NULL;
}
continue;
}
FD_ZERO(&readSet);
int maxfd=0;
for(int i=0;i<count;i++){
printf("--%d",sock[i]);
FD_SET(sock[i],&readSet);
if(sock[i]>maxfd){
maxfd = sock[i];
}
}
printf("/n");
timeout.tv_sec = 2;
timeout.tv_usec = 0;
int ret = select(maxfd+1,&readSet,NULL,NULL,&timeout);
if(toStop){
printf("RecvThread: exit./n");
return NULL;
}
if(ret==-1){
printf("RecvThread: select() failed!/n");
}else if(ret==0){
printf("RecvThread: select() time out./n");
}else{
for(int i=0; i<count; i++){
if(FD_ISSET(sock[i],&readSet)){
int bytes=recv(sock[i],buf,sizeof(buf)-1,0);
if(bytes==-1){
printf("RecvThread: recv failed./n");
clients.RemoveClient(sock[i]);
close(sock[i]);
}else if(bytes==0){
printf("RecvThread: socket closed by the other side./n");
clients.RemoveClient(sock[i]);
close(sock[i]);
}else{
ProcessMessage(sock[i],buf,sizeof(buf),bytes);
}
}
}
}
}
return NULL;
}
void* SendThread(void*){
fd_set writeSet;
int sock[MAX_CLIENT];
char buf[BUFFER_SIZE];
struct timeval timeout;
while(!toStop){
int ret = messageBuffer.GetMessage(buf,sizeof(buf));
printf("get a message from buffer./n");
if(ret==-1){
printf("SendThread: exit./n");
return NULL;
}
int count = clients.GetAllSocket(sock,MAX_CLIENT);
FD_ZERO(&writeSet);
int maxfd = 0;
for(int i=0;i<count;i++){
FD_SET(sock[i],&writeSet);
if(sock[i]>maxfd){
maxfd = sock[i];
}
}
timeout.tv_sec = 2;
timeout.tv_usec = 0;
ret = select(maxfd+1,NULL,&writeSet,NULL,&timeout);
if(toStop){
printf("SendThread: exit./n");
return NULL;
}
if(ret==-1){
printf("SendThread: select() failed!/n");
}else if(ret==0){
printf("SendThread: select() time out./n");
}else{
for(int i=0;i<count;i++){
if(FD_ISSET(sock[i],&writeSet)){
int messageLen = strlen(buf);
int bytes = send(sock[i],buf,messageLen,0);
if(bytes==-1){
printf("SendThread: send() failed./n");
}else if(bytes!=messageLen){
printf("SendThread: send message trunked.");
}else{
//do nothing
}
}
}
}
}
return NULL;
}
void ProcessMessage(int sock,char buf[],int bufsize,int bytes){
struct sockaddr_in clientAddr;
if(!clients.GetAddrBySocket(sock,&clientAddr)){
printf("ProcessMessage: can not find socket address./n");
return;
}
char ipString[16];
unsigned char *ip = (unsigned char*)&clientAddr.sin_addr.s_addr;
sprintf(ipString,"%u.%u.%u.%u",*ip,*(ip+1),*(ip+2),*(ip+3));
unsigned short port = ntohs(clientAddr.sin_port);
buf[bytes]='/0';
printf("Message from %s:%d: %s/n",ipString,port,buf);
const char* CMD_BYE="bye";
if(strcmp(buf,CMD_BYE)==0){
send(sock,CMD_BYE,strlen(CMD_BYE),0);
clients.RemoveClient(sock);
close(sock);
printf("%s:%u disconnected./n", ipString, port);
return;
}else{
char bufWithName[BUFFER_SIZE+NAME_LEN];
char cmdname[5];
char name[NAME_LEN];
memcpy(cmdname, buf, 4);
cmdname[4] = '/0';
const char* CMD_NAME="name";
if(strcmp(cmdname,CMD_NAME)==0){
char newname[NAME_LEN];
int nameLen = strlen(buf+5);
int copyLen;
if(nameLen>=NAME_LEN){
copyLen = NAME_LEN-1;
}else{
copyLen = nameLen;
}
memcpy(newname,buf+5,copyLen);
newname[copyLen]='/0';
clients.GetName(sock,name,sizeof(name));
sprintf(bufWithName,"%s change name to %s",name,newname);
clients.PutName(sock,newname,strlen(newname));
messageBuffer.PutMessage(bufWithName);
}else{
clients.GetName(sock,name,sizeof(name));
sprintf(bufWithName,"%s: %s",name,buf);
messageBuffer.PutMessage(bufWithName);
}
}
}
编译脚本文件compile
g++ -c MessageBuffer.cpp
g++ -c Clients.cpp
g++ -c Server.cpp
g++ -lpthread -o server MessageBuffer.o Clients.o Server.o
chmod +x compile
./compile 就可以编译并链接
运行服务器
./server 8000
注意Linux下的防火墙iptables服务是否已经启动,如果启动了,需要在/etc/sysconfig/iptables中加入例外端口8000,并重启启动防火墙
/etc/init.d/iptables restart
转载:点击打开链接
相关文章推荐
- Linux下select函数实现的聊天服务器
- Linux下select函数实现的聊天服务器
- Linux下select函数实现的聊天服务器
- Linux下select函数实现的聊天服务器
- 在linux下实现简单聊天系统(三)服务器
- Linux客户端与服务器相互实现聊天功能
- 用 Python 脚本实现对proc读写 Linux 服务器的监控
- [转]用 Python 脚本实现对 Linux 服务器的监控
- 用 Python 脚本实现对 Linux 服务器的监控
- maven实现一键部署到Linux上的tomcat(无需重启服务器)
- LINUX环境并发服务器的三种实现模型
- windows db2备份至Linux备份服务器之脚本实现
- Linux C TCP Socket实现客户与服务器简单通信
- linux网络编程:用C语言实现的聊天程序(异步通信)
- linux 用expect脚本实现scp服务器之间的文件备份
- 使用 ipmitool 实现 Linux 系统下对服务器的 ipmi 管理
- 基于socket的Linux上的网络聊天程序--多线程的服务器
- 用 Python 脚本实现对 Linux 服务器的监控
- Linux 下VLC服务器RTSP功能实现
- linux网络编程:用C语言实现的聊天程序(同步通信)