您的位置:首页 > 其它

面向连接的Socket Server的简单实现

2014-04-24 16:38 417 查看
转自:http://www.cnblogs.com/forfuture1978/archive/2010/09/12/1824443.html 仅供个人学习使用

一、基本原理

有时候我们需要实现一个公共的模块,需要对多个其他的模块提供服务,最常用的方式就是实现一个Socket Server,接受客户的请求,并返回给客户结果。
这经常涉及到如果管理多个连接及如何多线程的提供服务的问题,常用的方式就是连接池和线程池,基本流程如下:



首先服务器端有一个监听线程,不断监听来自客户端的连接。
当一个客户端连接到监听线程后,便建立了一个新的连接。
监听线程将新建立的连接放入连接池进行管理,然后继续监听新来的连接。
线程池中有多个服务线程,每个线程都监听一个任务队列,一个建立的连接对应一个服务任务,当服务线程发现有新的任务的时候,便用此连接向客户端提供服务。
一个Socket Server所能够提供的连接数可配置,如果超过配置的个数则拒绝新的连接。
当服务线程完成服务的时候,客户端关闭连接,服务线程关闭连接,空闲并等待处理新的任务。
连接池的监控线程清除其中关闭的连接对象,从而可以建立新的连接。

二、对Socket的封装

Socket的调用主要包含以下的步骤:



调用比较复杂,我们首先区分两类Socket,一类是Listening Socket,一类是Connected Socket.
Listening Socket由MySocketServer负责,一旦accept,则生成一个Connected Socket,又MySocket负责。
MySocket主要实现的方法如下:
int MySocket::write(const char * buf, int length)

{

        int ret = 0;

        int left = length;

        int index = 0;

        while(left > 0)

        {

                ret = send(m_socket, buf + index, left, 0);

                if(ret == 0)

                        break;

                else if(ret == -1)

                {

                        break;

                }

                left -= ret;

                index += ret;

        }

        if(left > 0)

                return -1;

        return 0;

}
int MySocket::read(char * buf, int length)

{

        int ret = 0;

        int left = length;

        int index = 0;

        while(left > 0)

        {

                ret = recv(m_socket, buf + index, left, 0);

                if(ret == 0)

                        break;

                else if(ret == -1)

                        return -1;

                left -= ret;

                index += ret;

        }
        return index;

}
int MySocket::status()

{

        int status;

        int ret;

        fd_set checkset;

        struct timeval timeout;
        FD_ZERO(&checkset);

        FD_SET(m_socket, &checkset);
        timeout.tv_sec = 10;

        timeout.tv_usec = 0;
        status = select((int)m_socket + 1, &checkset, 0, 0, &timeout);

        if(status < 0)

                ret = -1;

        else if(status == 0)

                ret = 0;

        else

                ret = 0;

        return ret;

}
int MySocket::close()

{

        struct linger lin;

        lin.l_onoff = 1;

        lin.l_linger = 0;

        setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));

        ::close(m_socket);

        return 0;

}
MySocketServer的主要方法实现如下:
int MySocketServer::init(int port)

{

        if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1)

        {

                return -1;

        }
        struct sockaddr_in serverAddr;

        memset(&serverAddr, 0, sizeof(struct sockaddr_in));

        serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);

        serverAddr.sin_family = AF_INET;

        serverAddr.sin_port = htons(port);
        if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1)

        {

                ::close(m_socket);

                return -1;

        }
        if(listen(m_socket, SOMAXCONN) == -1)

        {

                ::close(m_socket);

                return -1;

        }
        struct linger lin;

        lin.l_onoff = 1;

        lin.l_linger = 0;
        setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));

        m_port = port;

        m_inited = true;

        return 0;

}
MySocket * MySocketServer::accept()

{

        int sock;

        struct sockaddr_in clientAddr;

        socklen_t clientAddrSize = sizeof(clientAddr);

        if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1)

        {

                return NULL;

        }

        MySocket* socket = new MySocket(sock);

        return socket;

}
MySocket * MySocketServer::accept(int timeout)

{

        struct timeval timeout;

        timeout.tv_sec = timeout;

        timeout.tv_usec = 0;
        fd_set checkset;

        FD_ZERO(&checkset);

        FD_SET(m_socket, &checkset);
        int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout);

        if(status < 0)

                return NULL;

        else if(status == 0)

                return NULL;
        if(FD_ISSET(m_socket, &checkset))

        {

                return accept();

        }

}

三、线程池的实现

一个线程池一般有一个任务队列,启动的各个线程从任务队列中竞争任务,得到的线程则进行处理:list<MyTask *>  m_taskQueue;
任务队列由锁保护,使得线程安全:pthread_mutex_t m_queueMutex
任务队列需要条件变量来支持生产者消费者模式:pthread_cond_t m_cond
如果任务列表为空,则线程等待,等待中的线程个数为:m_numWaitThreads
需要一个列表来维护线程池中的线程:vector<MyThread *> m_threads
每个线程需要一个线程运行函数:
void * __thread_new_proc(void *p)

{

    ((MyThread *)p)->run();

    return 0;

}
每个线程由MyThread类负责,主要函数如下:
int MyThread::start()

{
    pthread_attr_t  attr;

    pthread_attr_init(&attr);

    pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
    int ret = pthread_create(&m_thread, &attr, thread_func, args);

    pthread_attr_destroy(&attr);
    if(ret != 0)

        return –1;
}
int MyThread::stop()

{
    int ret = pthread_kill(m_thread, SIGINT);
    if(ret != 0)

        return –1;

}
int MyThread::join()
{
    int ret = pthread_join(m_thread, NULL);
    if(ret != 0)
        return –1;
}
void MyThread::run()
{
    while (false == m_bStop)
    {
        MyTask *pTask = m_threadPool->getNextTask();
        if (NULL != pTask)
        {
            pTask->process();
        }
    }
}
线程池由MyThreadPool负责,主要函数如下:
int MyThreadPool::init()

{
    pthread_condattr_t cond_attr;

    pthread_condattr_init(&cond_attr);

    pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);

    int ret =  pthread_cond_init(&m_cond, &cond_attr);

    pthread_condattr_destroy(&cond_attr);
    if (ret_val != 0)

        return –1;
    pthread_mutexattr_t attr;

    pthread_mutexattr_init(&attr);

    pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);

    ret = pthread_mutex_init(&m_queueMutex, &attr);

    pthread_mutexattr_destroy(&attr);
    if (ret_val != 0)

        return –1;
    for (int i = 0; i< m_poolSize; ++i)

    {

        MyThread *thread = new MyThread(i+1, this);        

        m_threads.push_back(thread);

    }
    return 0;

}
int MyThreadPool::start()

{

    int ret;

    for (int i = 0; i< m_poolSize; ++i)

    {        

       ret = m_threads[i]->start();

       if (ret != 0)

           break;       

    }
    ret = pthread_cond_broadcast(&m_cond);
    if(ret != 0)

        return –1;

    return 0;

}
void MyThreadPool::addTask(MyTask *ptask)

{

    if (NULL == ptask)

        return;
    pthread_mutex_lock(&m_queueMutex);
    m_taskQueue.push_back(ptask);       
    if (m_waitingThreadCount > 0)

        pthread_cond_signal(&m_cond);
    pthread_mutex_unlock(&m_queueMutex);

}
MyTask * MyThreadPool::getNextTask()

{

    MyTask *pTask = NULL; 
    pthread_mutex_lock(&m_queueMutex);
    while (m_taskQueue.begin() == m_taskQueue.end())

    {  

        ++m_waitingThreadCount;
        pthread_cond_wait(&n_cond, &m_queueMutex);
        --m_waitingThreadCount;       

    }   
    pTask = m_taskQueue.front();
    m_taskQueue.pop_front();
    pthread_mutex_unlock(&m_queueMutex);

    return pTask;   

}
其中每一个任务的执行由MyTask负责,其主要方法如下:
void MyTask::process()
{
    //用read从客户端读取指令
    //对指令进行处理
    //用write向客户端写入结果
}
 

四、连接池的实现

每个连接池保存一个链表保存已经建立的连接:list<MyConnection *> * m_connections
当然这个链表也需要锁来进行多线程保护:pthread_mutex_t m_connectionMutex;
此处一个MyConnection也是一个MyTask,由一个线程来负责。
线程池也作为连接池的成员变量:MyThreadPool * m_threadPool   
连接池由类MyConnectionPool负责,其主要函数如下:
void MyConnectionPool::addConnection(MyConnection * pConn)

{
    pthread_mutex_lock(&m_connectionMutex);
    m_connections->push_back(pConn);
    pthread_mutex_unlock(&m_connectionMutex);
    m_threadPool->addTask(pConn);

}
MyConnectionPool也要启动一个背后的线程,来管理这些连接,移除结束的连接和错误的连接。
void MyConnectionPool::managePool()

{
    pthread_mutex_lock(&m_connectionMutex);
    for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); )

    {

        MyConnection *conn = *itr;        

        if (conn->isFinish())

        {

            delete conn;

            conn = NULL;

            list<MyConnection *>::iterator pos = itr++;

            m_connections->erase(pos);                         

        }

        else if (conn->isError())

        {
            //处理错误的连接

            ++itr;

        }

        else

        {

            ++itr;

        }

    }
    pthread_mutex_unlock(&m_connectionMutex);
}
 

五、监听线程的实现

监听线程需要有一个MySocketServer来监听客户端的连接,每当形成一个新的连接,查看是否超过设置的最大连接数,如果超过则关闭连接,如果未超过设置的最大连接数,则形成一个新的MyConnection,将其加入连接池和线程池。
MySocketServer *pServer = new MySocketServer(port);
MyConnectionPool *pPool = new MyConnectionPool();
while (!stopFlag)
{
    MySocket * sock = pServer->acceptConnection(5);
    if(sock != null)
    {
        if(m_connections.size > maxConnectionSize)
        {
            sock.close();
        }
        MyTask *pTask = new MyConnection();
        pPool->addConnection(pTask);      
    }
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  socket server