您的位置:首页 > 理论基础 > 计算机网络

使用select系统调用实现简单的TCP服务器

2017-03-01 17:34 447 查看
/*************************************************************************
> File Name: command_define.h
> Author: cjj
> Created Time: 2013年10月27日 星期日 00时57分04秒
************************************************************************/
#ifndef COMMAND_DEFINE_H
#define COMMAND_DEFINE_H
#include <stddef.h>

const int MAX_LISTEN = 3000;
const int SERVER_LISTEN_PORT = 8768;
const int MAX_DATA_LEN = 4096;

typedef struct
{
unsigned char buf[MAX_DATA_LEN];
size_t nLen;
int socket;
}ServerData, *pServerData;

typedef void (*pCallBack)(const char * szBuf, size_t nLen, int socket);

#endif


/*************************************************************************
> File Name: TcpServer.h
> Author: cjj
> Created Time: 2013年10月27日 星期日 01时01分23秒
************************************************************************/
#ifndef TCP_SERVER_H
#define TCP_SERVER_H

#include "command_define.h"
#include <set>
#include <list>
#include <sys/select.h>
#include <pthread.h>

class TcpServer
{
public:
TcpServer();
virtual ~TcpServer();
bool Initialize(unsigned int nPort, unsigned long recvFunc);
bool SendData(const unsigned char * szBuf, size_t nLen, int socket);
bool UnInitialize();

private:
static void * AcceptThread(void * pParam);
static void * OperatorThread(void * pParam);
static void * ManageThread(void * pParam);
private:
int m_server_socket;
fd_set m_fdReads;
pthread_mutex_t m_mutex;
pCallBack m_operaFunc;
//int m_client_socket[MAX_LISTEN];
std::set<int> m_client_socket;
std::list<ServerData> m_data;
pthread_t m_pidAccept;
pthread_t m_pidManage;
pthread_cond_t  m_cond;
};

#endif


/*************************************************************************
> File Name: TcpServer.cpp
> Author: cjj
> Created Time: 2013年10月27日 星期日 01时29分54秒
************************************************************************/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "TcpServer.h"
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include<iostream>
using namespace std;
TcpServer::TcpServer()
{
pthread_mutex_init(&m_mutex, NULL);
pthread_cond_init(&m_cond,NULL);
FD_ZERO(&m_fdReads);
m_client_socket.clear();
m_data.clear();
m_operaFunc = 0;
m_pidAccept = 0;
m_pidManage = 0;
}

TcpServer::~TcpServer()
{
FD_ZERO(&m_fdReads);
m_client_socket.clear();
m_data.clear();
m_operaFunc = NULL;
pthread_mutex_destroy(&m_mutex);
}

bool TcpServer::Initialize(unsigned int nPort, unsigned long recvFunc)
{
if(0 != recvFunc)
{
//设置回调函数
m_operaFunc = (pCallBack)recvFunc;
}
//先反初始化
UnInitialize();
//创建socket
m_server_socket = socket(AF_INET, SOCK_STREAM, 0);
if(-1 == m_server_socket)
{
printf("socket error:%m\n");
return false;
}
//绑定IP和端口
sockaddr_in serverAddr = {0};
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(nPort);
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
int res = bind(m_server_socket, (sockaddr*)&serverAddr, sizeof(serverAddr));
if(-1 == res)
{
printf("bind error:%m\n");
return false;
}
//监听
res = listen(m_server_socket, MAX_LISTEN);
if(-1 == res)
{
printf("listen error:%m\n");
return false;
}
//创建线程接收socket连接
if(0 != pthread_create(&m_pidAccept, NULL, AcceptThread, this))
{
printf("create accept thread failed\n");
return false;
}
//创建管理线程
if(0 != pthread_create(&m_pidManage, NULL, ManageThread, this))
{
printf("create manage thread failed\n");
return false;
}
return true;
}

//接收socket连接线程
void * TcpServer::AcceptThread(void * pParam)
{
if(!pParam)
{
printf("param is null\n");
return 0;
}
TcpServer * pThis = (TcpServer*)pParam;
int nMax_fd = 0;
int i = 0;
while(1)
{
FD_ZERO(&pThis->m_fdReads);
//把服务器监听的socket添加到监听的文件描述符集合
FD_SET(pThis->m_server_socket, &pThis->m_fdReads);
//设置监听的最大文件描述符
nMax_fd = nMax_fd > pThis->m_server_socket ? nMax_fd : pThis->m_server_socket;
std::set<int>::iterator iter = pThis->m_client_socket.begin();
//把客户端对应的socket添加到监听的文件描述符集合
for(; iter != pThis->m_client_socket.end(); ++iter)
{
FD_SET(*iter, &pThis->m_fdReads);
}
//判断最大的文件描述符
if(!pThis->m_client_socket.empty())
{
--iter;
nMax_fd = nMax_fd > (*iter) ? nMax_fd : (*iter);
}
//调用select监听所有文件描述符
int res = select(nMax_fd + 1, &pThis->m_fdReads, 0, 0, NULL);
if(-1 == res)
{
printf("select error:%m\n");
continue;
}
printf("select success\n");
//判断服务器socket是否可读
if(FD_ISSET(pThis->m_server_socket, &pThis->m_fdReads))
{
//接收新的连接
int fd = accept(pThis->m_server_socket, 0,0);
if(-1 == fd)
{
printf("accept error:%m\n");
continue;
}
//添加新连接的客户
pThis->m_client_socket.insert(fd);
printf("连接成功\n");
}
for(iter = pThis->m_client_socket.begin(); iter != pThis->m_client_socket.end(); ++iter)
{

//判断客户是否可读
if(-1 != *iter && FD_ISSET(*iter, &pThis->m_fdReads))
{
static int nCount = 0;
nCount ++;
unsigned char buf[MAX_DATA_LEN] = {0};
res = recv(*iter, buf, sizeof(buf), 0);
if(res > 0)
{
ServerData serverData = {0};
memcpy(serverData.buf, buf, res);
serverData.nLen = res;
serverData.socket = *iter;
pthread_mutex_lock(&pThis->m_mutex);
pThis->m_data.push_back(serverData);
pthread_mutex_unlock(&pThis->m_mutex);
printf("data size:%d\n",pThis->m_data.size());
pthread_cond_signal(&pThis->m_cond);
printf("send signal\n");

}
else if(0 == res)
{
printf("客户端退出\n");
pThis->m_client_socket.erase(iter);
}
else
{
printf("recv error\n");
}
}
}
}
}

//发送数据到指定的socket
bool TcpServer::SendData(const unsigned char * buf, size_t len, int sock)
{
if(NULL == buf)
{
return false;
}
int res = send(sock, buf, len, 0);
if(-1 == res)
{
printf("send error:%m\n");
return false;
}
return true;
}

//管理线程,用于创建处理线程
void * TcpServer::ManageThread(void * pParam)
{
if(!pParam)
{
return 0;
}
pthread_t pid;
TcpServer * pThis = (TcpServer *)pParam;
while(1)
{
//使用互斥量
pthread_mutex_lock(&pThis->m_mutex);
pthread_cond_wait(&pThis->m_cond,&pThis->m_mutex);
printf("wait cond\n");
int nCount = pThis->m_data.size();

if(nCount > 0)
{
printf("create thread :%ld\n",  pthread_self());
pid = 0;
//创建处理线程
pthread_attr_t a;
pthread_attr_init(&a);
pthread_attr_setdetachstate(&a,PTHREAD_CREATE_DETACHED);
if( 0 != pthread_create(&pid, &a, OperatorThread, pParam))
{
printf("creae operator thread failed\n");
}
}
pthread_mutex_unlock(&pThis->m_mutex);
//防止抢占CPU资源
usleep(100);
}
}

//数据处理线程
void * TcpServer::OperatorThread(void * pParam)
{
if(!pParam)
{
return 0;
}
TcpServer * pThis = (TcpServer*)pParam;
pthread_mutex_lock(&pThis->m_mutex);
if(!pThis->m_data.empty())
{
ServerData data = pThis->m_data.front();
pThis->m_data.pop_front();
if(pThis->m_operaFunc)
{
//把数据交给回调函数处理
pThis->m_operaFunc((char*)data.buf, data.nLen, data.socket);
}
}
pthread_mutex_unlock(&pThis->m_mutex);
}

bool TcpServer::UnInitialize()
{
close(m_server_socket);
for(std::set<int>::iterator iter = m_client_socket.begin(); iter != m_client_socket.end(); ++iter)
{
close(*iter);
}
m_client_socket.clear();
if(0 != m_pidAccept)
{
pthread_cancel(m_pidAccept);
}
if(0 != m_pidManage)
{
pthread_cancel(m_pidManage);
}
}

/*************************************************************************
> File Name: test.cpp
> Author: cjj
> Created Time: 2013年10月27日 星期日 02时23分51秒
************************************************************************/
#include "TcpServer.h"
#include <stdio.h>
#include <unistd.h>
#include<iostream>
using namespace std;
TcpServer tcpServer;
void printMessage(const char * buf, size_t nlen, int sock)
{
if(NULL == buf)
{
return;
}
static int nCount = 0;
ServerData * serverData = (ServerData*)buf;
cout <<"Message:"<< serverData->buf << ",size:" << nlen << "=======count:" << nCount++ << endl;
tcpServer.SendData(serverData->buf, nlen, serverData->socket);
}

int main()
{
unsigned int port = 55571;
if(false == tcpServer.Initialize(port, (unsigned long)printMessage))
{
printf("Initialize failed\n");
return -1;
}
printf("tcpserver port:%d\n", port);
while(1)
{
sleep(2);
}
return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: