您的位置:首页 > 其它

[国嵌攻略][181][线程池技术优化]

2016-04-06 16:05 176 查看
服务器单发模式

初始化->等待连接->处理请求->关闭连接->再次等待连接

服务器并发模式

初始化->等待连接->交给子进程处理请求->再次等待连接

单发服务器不能同时处理多个客户端请求,并发服务器则可以同时处理多个客户端请求。并发服务器一般通过创建线程来处理多个客户端请求。当处理的客户端到达上万个时,不断的创建和销毁线程对服务器是一笔很大的开销。通过线程池技术,预先创建大量线程。在使用时直接从线程池中取出,用完后放回线程池。这样就可以大大减少对线程的创建和销毁开销。

线程池工作原理

线程池就是有一堆已经创建好了的线程,当有新的任务需要处理的时候,就从这个池子里面取出一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放回池中,以供后面的任务再次使用,当池子里面的线程全都处理忙碌状态时,这时新到来的任务需要稍作等待。

线程的创建和销毁比之进程的创建和销毁是轻量级的,但是当我们的任务需要大量进行大量线程的创建和销毁操作时,这个消耗就会变的相当大。线程池的好处就在于线程复用,当一个任务处理完成后,当前线程可以继续处理下一个任务,而不是销毁后再创建,非常适用于连续产生大量并发任务的场合。

优化服务器端程序

编译程序

gcc -lssl -lpthread server.c -o server

server.c

/********************************************************************
*名称:server.c
*作者:D
*时间:2016.04.06
*功能:网络安全传输系统服务端
*********************************************************************/

/********************************************************************
*头文件
*********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <fcntl.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include <openssl/ssl.h>
#include <openssl/err.h>

#include <pthread.h>

/********************************************************************
*宏定义
*********************************************************************/
#define SERVER_PORT 3333          //网络端口

/********************************************************************
*类型定义
*********************************************************************/
//线程任务
typedef struct cthread_task{
void *(*process)(int arg);    //任务函数
int arg;                      //任务参数
struct cthread_task *next;    //任务指针
}Cthread_task;

//线程池子
typedef struct cthread_poll{
pthread_mutex_t queue_lock;   //互斥变量
pthread_cond_t queue_ready;   //条件变量
Cthread_task *queue_head;     //等待队列

pthread_t *threadId;          //线程编号
int max_thread_num;           //线程数量
int cur_task_size;            //等待数量
int shutdown;                 //销毁标志
}Cthread_pool;

/********************************************************************
*全局变量
*********************************************************************/
SSL_CTX *ctx;               //会话环境
Cthread_pool *threadPool;   //线程池子

/********************************************************************
*函数原型
*********************************************************************/
int main(int argc, char **argv);
void *process(int arg);
void upload(SSL *ssl);
void download(SSL *ssl);

void pool_init(int max_thread_num);
void *thread_routine(void *arg);
int pool_destroy();
int pool_add_task(void *(process)(int arg), int arg);

/********************************************************************
*名称:main
*参数:
*    argc   参数数量
*    argv   参数列表
*返回:
*    stat   0 成功
*          -1 失败
*功能:主函数
*********************************************************************/
int main(int argc, char **argv){
//初始会话
SSL_library_init();                          //初始化加密库
OpenSSL_add_all_algorithms();                //载入加密算法
SSL_load_error_strings();                    //载入错误输出

ctx = SSL_CTX_new(SSLv23_server_method());   //服务器为模式

//设置会话
int isCert, isPriv;

isCert = SSL_CTX_use_certificate_file(ctx, "./certkey.pem", SSL_FILETYPE_PEM);   //载入数字证书
if(isCert == -1){
ERR_print_errors_fp(stdout);
return -1;
}

isPriv = SSL_CTX_use_PrivateKey_file(ctx, "./privkey.pem", SSL_FILETYPE_PEM);    //载入用户私钥
if(isPriv == -1){
ERR_print_errors_fp(stdout);
return -1;
}

//检测会话
int isCheck;

isCheck = SSL_CTX_check_private_key(ctx);
if(isCheck == 0){
ERR_print_errors_fp(stdout);
return -1;
}

//初始标志
int serverfd;

serverfd = socket(AF_INET, SOCK_STREAM, 0);
if(serverfd == -1){
printf("Can not create socket!\n");
return -1;
}

//绑定地址
struct sockaddr_in serverAddr;
int isBand;

serverAddr.sin_family = AF_INET;                   //设置协议
serverAddr.sin_port = htons(SERVER_PORT);          //设置端口
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);    //设置地址
bzero(serverAddr.sin_zero, 8);                     //设置为零

isBand = bind(serverfd, (struct sockaddr *)&serverAddr, sizeof(struct sockaddr));
if(isBand == -1){
printf("Can not bind!\n");
return -1;
}

//监听端口
int isListen;

isListen = listen(serverfd, 5);
if(isListen == -1){
printf("Can not listen!\n");
return -1;
}

//初始线程
pool_init(5);

//处理连接
while(1){
//等待连接
socklen_t clientAddrLen;
struct sockaddr_in clientAddr;
int clientfd;

clientAddrLen = sizeof(struct sockaddr);
clientfd = accept(serverfd, (struct sockaddr *)&clientAddr, &clientAddrLen);
if(clientfd == -1){
printf("Can not accept!\n");
return -1;
}

//添加任务
pool_add_task(process, clientfd);
}

//销毁线程
pool_destroy();

//关闭连接
close(serverfd);

//销毁会话
SSL_CTX_free(ctx);
}

/********************************************************************
*名称:process
*参数:
*    arg   任务参数
*返回:
*    none
*功能:处理线程任务
*********************************************************************/
void *process(int arg){
//连接 SSL
SSL *ssl;
int isAccept;

ssl = SSL_new(ctx);          //构造 SSL
SSL_set_fd(ssl, arg);        //绑定 SSL

isAccept = SSL_accept(ssl);  //连接 SSL
if(isAccept == -1){
ERR_print_errors_fp(stdout);
exit(-1);
}

//处理菜单
while(1){
//读取命令
char cmd;

SSL_read(ssl, (void *)&cmd, sizeof(cmd));

//处理命令
switch(cmd){
//上传文件
case 'U':
upload(ssl);
break;

//下载文件
case 'D':
download(ssl);
break;

//退出程序
case 'Q':
break;

//其他命令
default:
break;
}

//是否退出
if(cmd == 'Q'){
break;
}
}

//关闭 SSL
SSL_shutdown(ssl);  //关闭 SSL
SSL_free(ssl);      //释放 SSL

//关闭连接
close(arg);
}

/********************************************************************
*名称:upload
*参数:
*    ssl   客户端标志
*返回:
*    none
*功能:上传文件
*********************************************************************/
void upload(SSL *ssl){
//接收文件名称
int namesize;
char filename[20];

SSL_read(ssl, (void *)&namesize, sizeof(namesize));
SSL_read(ssl, (void *)&filename, namesize);

//创建上传文件
int fd;

fd = open(filename, O_RDWR | O_CREAT, 0777);
if(fd == -1){
printf("Can not create file!\n");
return ;
}

//接收文件长度
int fileszie;

SSL_read(ssl, &fileszie, sizeof(fileszie));

//接收文件内容
char buf[1024];
int  num;

num = SSL_read(ssl, (void *)buf, sizeof(buf));
while(num > 0){
//写入接收内容
write(fd, (void *)&buf, num);

//是否接收结束
fileszie = fileszie - num;
if(fileszie == 0){
break;
}

//接收文件内容
num = SSL_read(ssl, (void *)buf, sizeof(buf));
}

//关闭上传文件
close(fd);
}

/********************************************************************
*名称:download
*参数:
*    ssl   客户端标志
*返回:
*    none
*功能:下载文件
*********************************************************************/
void download(SSL *ssl){
//接收文件名称
int namesize;
char filename[20];

SSL_read(ssl, (void *)&namesize, sizeof(namesize));
SSL_read(ssl, (void *)&filename, namesize);

//打开下载文件
int fd;

fd = open(filename, O_RDONLY);
if(fd == -1){
printf("Can not open file!\n");
return ;
}

//发送文件长度
struct stat fstat;
int isState;

isState = stat(filename, &fstat);
if(isState == -1){
printf("Can not get file state!\n");
return ;
}

SSL_write(ssl, (void *)&(fstat.st_size), sizeof(fstat.st_size));

//发送文件内容
char buf[1024];
int  num;

num = read(fd, (void *)buf, sizeof(buf));
while(num > 0){
//发送文件内容
SSL_write(ssl, (void *)&buf, num);

//读取文件内容
num = read(fd, (void *)buf, sizeof(buf));
}

//关闭下载文件
close(fd);
}

/*----------------------------------------分割线----------------------------------------*/

/********************************************************************
*名称:pool_init
*参数:
*    max_thread_num   线程数量
*返回:
*    none
*功能:初始线程池子
*********************************************************************/
void pool_init(int max_thread_num){
//初始线程池子
threadPool = (Cthread_pool *)malloc(sizeof(Cthread_pool));                        //构造线程池子

pthread_mutex_init(&(threadPool->queue_lock), NULL);                              //设置互斥变量
pthread_cond_init(&(threadPool->queue_ready), NULL);                              //设置条件变量
threadPool->queue_head = NULL;                                                    //设置等待队列

threadPool->threadId = (pthread_t *)malloc(max_thread_num * sizeof(pthread_t));   //分配线程编号
threadPool->max_thread_num = max_thread_num;                                      //设置线程数量
threadPool->cur_task_size = 0;                                                    //设置等待数量
threadPool->shutdown = 0;                                                         //设置销毁标志

//添加线程池子
int i;

for(i = 0; i < max_thread_num; i++){
pthread_create(&(threadPool->threadId[i]), NULL, thread_routine, NULL);
}
}

/********************************************************************
*名称:thread_routine
*参数:
*    arg   参数
*返回:
*    none
*功能:处理线程池子
*********************************************************************/
void *thread_routine(void *arg){
//处理线程池子
while(1){
//上锁线程池子
pthread_mutex_lock(&(threadPool->queue_lock));

//是否等待池子
while( (threadPool->cur_task_size == 0) && (!threadPool->shutdown) ){
printf("Thread 0x%x is waiting!\n", pthread_self());
pthread_cond_wait(&(threadPool->queue_ready), &(threadPool->queue_lock));
}

//是否销毁池子
if(threadPool->shutdown){
//解锁线程池子
pthread_mutex_unlock(&(threadPool->queue_lock));

//结束线程池子
printf("Thread 0x%x is eixting!\n", pthread_self());
pthread_exit(NULL);
}

//设置线程池子
Cthread_task *task;

task = threadPool->queue_head;
threadPool->queue_head = threadPool->queue_head->next;
threadPool->cur_task_size--;

//解锁线程池子
pthread_mutex_unlock(&(threadPool->queue_lock));

//处理线程任务
printf("Thread 0x%x is working!\n", pthread_self());
(*(task->process))(task->arg);

//释放线程任务
free(task);
task = NULL;
}

//结束线程池子
printf("Thread 0x%x is eixting!\n", pthread_self());
pthread_exit(NULL);
}

/********************************************************************
*名称:pool_destroy
*参数:
*    none
*返回:
*    stat   0 成功
*          -1 失败
*功能:销毁线程池子
*********************************************************************/
int pool_destroy(){
//是否已经销毁
if(threadPool->shutdown){
return -1;                  //防止多次销毁
}else{
threadPool->shutdown = 1;   //设置销毁标志
}

//唤醒所有线程
pthread_cond_broadcast(&(threadPool->queue_ready));

//阻塞线程任务
int i;

for(i = 0; i < threadPool->max_thread_num; i++){
pthread_join(threadPool->threadId[i], NULL);
}

//释放线程编号
free(threadPool->threadId);

//销毁等待队列
Cthread_task *task = NULL;

while(threadPool->queue_head != NULL){
//取出任务
task = threadPool->queue_head;
threadPool->queue_head = threadPool->queue_head->next;

//释放任务
free(task);
}

//销毁相关变量
pthread_mutex_destroy(&(threadPool->queue_lock));
pthread_cond_destroy(&(threadPool->queue_ready));

//释放线程池子
free(threadPool);
threadPool = NULL;

return 0;
}

/********************************************************************
*名称:pool_add_task
*参数:
*    process   任务函数
*    arg       任务参数
*返回:
*    stat   0 成功
*          -1 失败
*功能:添加线程任务
*********************************************************************/
int pool_add_task(void *(process)(int arg), int arg){
//初始线程任务
Cthread_task *task;

task = (Cthread_task *)malloc(sizeof(Cthread_task));   //构造线程任务

task->process = process;                               //设置任务函数
task->arg = arg;                                       //设置任务参数
task->next = NULL;                                     //设置任务指针

//上锁等待队列
pthread_mutex_lock(&(threadPool->queue_lock));

//添加线程任务
Cthread_task *node;

node = threadPool->queue_head;
if(node != NULL){
//定位队列尾部
while(node->next != NULL){
node = node->next;
}

//添加线程任务
node->next = task;
}else{
//添加线程任务
threadPool->queue_head = task;
}

//设置线程池子
threadPool->cur_task_size++;

//解锁等待队列
pthread_mutex_unlock(&(threadPool->queue_lock));

//唤醒等待队列
pthread_cond_signal(&(threadPool->queue_ready));

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: