您的位置:首页 > 理论基础 > 计算机网络

linux c++ 高并发tcp服务器架构

2017-01-06 14:53 288 查看
from:
http://blog.csdn.net/opencpu/article/details/47175813
epoll 接受数据到队列,线程池处理队列里的数据
具体实现方式:(只使用使用std的的数据结构,未使用boost)

//============================================================================

// Name        : hmserver.cpp

// Author      : song

// Version     :

// Copyright   : Your copyright notice

// Description : Hello World in C++, Ansi-style

//============================================================================

#include <stdio.h>

#include <iostream>

using namespace std;

#include "thread_pool.h"

#include "command.h"

int main()

{

    ThreadPool thread_pool;

    thread_pool.InitializeThreads();

    Command command;

    char arg[8] = {0};

    for(int i=1; i<=200; ++i)

    {

        command.set_cmd(i%3);

        sprintf(arg,"%d",i);

        command.set_arg(arg);

        thread_pool.AddWork(command);

    }

    sleep(30); // 用于测试线程池缩容

    thread_pool.ThreadDestroy();

    return 0;

}

//////////////////////////////////////////////////////

thread_pool.cpp

#include <pthread.h>

#include <stdlib.h>

#include "thread_pool.h"

#include "thread_process.h"

#include "command.h"

#include <unistd.h>

#include <stdio.h>

bool ThreadPool::bshutdown_ = false;

int ThreadPool::icurr_thread_num_ = THREAD_NUM;

std::vector<Command> ThreadPool::command_;

std::map<pthread_t,int> ThreadPool::thread_id_map_;

pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;

void ThreadPool::InitializeThreads()

{

    for (int i = 0; i < THREAD_NUM ; ++i)

    {

        pthread_t tempThread;

        pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);

        thread_id_map_[tempThread] = 0;

    }

}

void* ThreadPool::Process(void* arg)

{

    ThreadProcess threadprocess;

    Command command;

    while (true)

    {

        pthread_mutex_lock(&command_mutex_);

        // 如果线程需要退出,则此时退出

        if (1 == thread_id_map_[pthread_self()])

        {

            pthread_mutex_unlock(&command_mutex_);

            printf("thread %u will exit\n", pthread_self());

            pthread_exit(NULL);

        }

        // 当线程不需要退出且没有需要处理的任务时,需要缩容的则缩容,不需要的则等待信号

        if (0 == command_.size() && !bshutdown_)

        {

            if(icurr_thread_num_ >  THREAD_NUM)

            {

                DeleteThread();

                if (1 == thread_id_map_[pthread_self()])

                {

                    pthread_mutex_unlock(&command_mutex_);

                    printf("thread %u will exit\n", pthread_self());

                    pthread_exit(NULL);

                }

            }

            pthread_cond_wait(&command_cond_,&command_mutex_);

        }

        // 线程池需要关闭,关闭已有的锁,线程退出

        if(bshutdown_)

        {

            pthread_mutex_unlock (&command_mutex_);

            printf ("thread %u will exit\n", pthread_self ());

            pthread_exit (NULL);

        }

        // 如果线程池的最大线程数不等于初始线程数,则表明需要扩容

        if(icurr_thread_num_ < command_.size())

        {

            AddThread();

        }

        // 从容器中取出待办任务

        std::vector<Command>::iterator iter = command_.begin();

        command.set_arg(iter->get_arg());

        command.set_cmd(iter->get_cmd());

        command_.erase(iter);

        pthread_mutex_unlock(&command_mutex_);

        // 开始业务处理

        switch(command.get_cmd())

        {

        case 0:

            threadprocess.Process0(command.get_arg());

            break;

        case 1:

            threadprocess.Process1(command.get_arg());

            break;

        case 2:

            threadprocess.Process2(command.get_arg());

            break;

        default:

            break;

        }

    }

    return NULL; // 完全为了消除警告(eclipse编写的代码,警告很烦人)

}

void ThreadPool::AddWork(Command command)

{

    bool bsignal = false;

    pthread_mutex_lock(&command_mutex_);

    if (0 == command_.size())

    {

        bsignal = true;

    }

    command_.push_back(command);

    pthread_mutex_unlock(&command_mutex_);

    if (bsignal)

    {

        pthread_cond_signal(&command_cond_);

    }

}

void ThreadPool::ThreadDestroy(int iwait)

{

    while(0 != command_.size())

    {

        sleep(abs(iwait));

    }

    bshutdown_ = true;

    pthread_cond_broadcast(&command_cond_);

    std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();

    for (; iter!=thread_id_map_.end(); ++iter)

    {

        pthread_join(iter->first,NULL);

    }

    pthread_mutex_destroy(&command_mutex_);

    pthread_cond_destroy(&command_cond_);

}

void ThreadPool::AddThread()

{

    if(((icurr_thread_num_*ADD_FACTOR) < command_.size())

            && (MAX_THREAD_NUM != icurr_thread_num_))

    {

        InitializeThreads();

        icurr_thread_num_ += THREAD_NUM;

    }

}

void ThreadPool::DeleteThread()

{

    int size = icurr_thread_num_ - THREAD_NUM;

    std::map<pthread_t,int>::iterator iter = thread_id_map_.begin();

    for(int i=0; i<size; ++i,++iter)

    {

        iter->second = 1;

    }

}

//////////////

thread_poll.h

#ifndef THREAD_POOL_H_

#define THREAD_POOL_H_

#include <map>

#include <vector>

#include "command.h"

#define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍

#define ADD_FACTOR 4 // 该值表示一个线程可以处理的最大任务数

#define THREAD_NUM 10 // 初始线程数

class ThreadPool

{

public:

    ThreadPool() {};

    static void InitializeThreads();

    void AddWork(Command command);

    void ThreadDestroy(int iwait = 2);

private:

    static void* Process(void* arg);

    static void AddThread();

    static void DeleteThread();

    static bool bshutdown_;

    static int icurr_thread_num_;

    static std::map<pthread_t,int> thread_id_map_;

    static std::vector<Command> command_;

    static pthread_mutex_t command_mutex_;

    static pthread_cond_t command_cond_;

};

#endif /* THREAD_POOL_H_ */

thread_process.cpp:

#include
<stdio.h>

#include <unistd.h>

#include "thread_process.h"

#include <pthread.h>

void ThreadProcess::Process0(void* arg)

{

    printf("thread %u is starting process %s\n",pthread_self(),arg);

    usleep(100*1000);

}

void ThreadProcess::Process1(void* arg)

{

    printf("thread %u is starting process %s\n",pthread_self(),arg);

    usleep(100*1000);

}

void ThreadProcess::Process2(void* arg)

{

    printf("thread %u is starting process %s\n",pthread_self(),arg);

    usleep(100*1000);

}

thread_process.h:

#ifndef
THREAD_PROCESS_H_

#define THREAD_PROCESS_H_

class ThreadProcess

{

public:

    void Process0(void* arg);

    void Process1(void* arg);

    void Process2(void* arg);

};

#endif /* THREAD_PROCESS_H_ */

/////////////////////////////////////////////////////////////////////////

command.cpp:

#include
<string.h>

#include "command.h"

int Command::get_cmd()

{

    return cmd_;

}

char* Command::get_arg()

{

    return arg_;

}

void Command::set_cmd(int cmd)

{

    cmd_ = cmd;

}

void Command::set_arg(char* arg)

{

    if(NULL == arg)

    {

        return;

    }

    strncpy(arg_,arg,64);

    arg_[64] = '\0';

}

/////////////////////////////////////////////////

command.h:

#ifndef COMMAND_H_

#define COMMAND_H_

class Command

{

public:

    int get_cmd();

    char* get_arg();

    void set_cmd(int cmd);

    void set_arg(char* arg);

private:

    int cmd_;

    char arg_[65];

};

#endif /* COMMAND_H_ */
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: