您的位置:首页 > 运维架构 > Linux

linux c++线程池实现 - tbsys实现详解

2015-10-11 23:18 771 查看

linux线程池 - tbsys实现详解

一年前刚刚毕业入职还会写几篇博客,后来项目取消开源要求员工删除博客,写博客这件事情就荒废了。一年后,回忆一下,如果不养成写东西的习惯可能很难积累下知识,成长会慢很多。现在还是继续写吧,也可以读读其它开源代码写一写。

tbsys是阿里的一份开源代码,在阿里集团广泛应用,由多隆大神创作,内部代码实现有很多精致巧妙的地方。

今天学习下tbsys的线程池实现。

实现框架



Runnable类

该类是一个抽象类,拥有run的纯虚方法。该类主要提供给CThread类使用。

CThread类

该类对Linux线程方法做了一个简单的封装,实现了方法:

start 起一个线程,开始运行

join 等待线程退出

getRunnable 得到Runnable对象

getArgs 得到回调参数

getpid 获得线程id,linux系统中的id号

hook 线程回调函数,静态方法

CDefaultRunnable类

该类继承自Runnable,是真正的thread_pool。该类实现了方法:

setThreadCount 设置线程数

start 启动thread_count个线程,开始运行

stop 设置类内_stop变量,在该类中没有实际意义

wait 等待子线程退出,防止主线程先结束

使用方法

使用tbsys线程池时,具体业务类继承CDefaultRunnable类,实现Runnable类R中的run方法,通过setThreadCount设置线程数目,调用start方法启动线程池,然后调用wait方法等待退出。

实现思路

1、在CDefaultRunnable类中,start方法会申请thread_count个CThread类,然后用指向自身的this指针和thread index作为参数调用这thread_count个CThread变量的start方法。

2、CThread类的start方法,将传入的Runnable(CDefaultRunnable)指针赋值给自身runnable成员。然后通过pthread_create启动线程,同时用自身的静态方法hook作为线程回调函数。又把指向自己的this指针作为参数。

3、CThread的hook函数从入参中获取CThread变量,又通过CThread变量获取runnable成员,调用runnable成员的run函数。

代码

/*
* (C) 2007-2010 Taobao Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
*
* Version: $Id$
*
* Authors:
*   duolong <duolong@taobao.com>
*
*/

#ifndef TBSYS_RUNNABLE_H_
#define TBSYS_RUNNABLE_H_

namespace tbsys {

/**
* @brief Runnable是一个抽象类,它拥有一个run纯虚方法
* 主要用于Thread类
*/
class Runnable {

public:
/*
* 析构
*/
virtual ~Runnable() {
}
/**
* 运行入口函数
*/
virtual void run(CThread *thread, void *arg) = 0;
};

}

#endif /*RUNNABLE_H_*/


/*
* (C) 2007-2010 Taobao Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
*
* Version: $Id$
*
* Authors:
*   duolong <duolong@taobao.com>
*
*/

#ifndef TBSYS_THREAD_H_
#define TBSYS_THREAD_H_

#include <linux/unistd.h>

namespace tbsys {

/**
* @brief 对linux线程简单封装
*/
class CThread {

public:
/**
* 构造函数
*/
CThread() {
tid = 0;
pid = 0;
}

/**
* 起一个线程,开始运行
*/
bool start(Runnable *r, void *a) {
runnable = r;
args = a;
return 0 == pthread_create(&tid, NULL, CThread::hook, this);
}

/**
* 等待线程退出
*/
void join() {
if (tid) {
pthread_join(tid, NULL);
tid = 0;
pid = 0;
}
}

/**
* 得到Runnable对象
*
* @return Runnable
*/
Runnable *getRunnable() {
return runnable;
}

/**
* 得到回调参数
*
* @return args
*/
void *getArgs() {
return args;
}

/***
* 得到线程的进程ID
*/
int getpid() {
return pid;
}

/**
* 线程的回调函数
*
*/

static void *hook(void *arg) {
CThread *thread = (CThread*) arg;
thread->pid = gettid();

if (thread->getRunnable()) {
thread->getRunnable()->run(thread, thread->getArgs());
}

return (void*) NULL;
}

private:
/**
* 得到tid号
*/
#ifdef _syscall0
static _syscall0(pid_t,gettid)
#else
static pid_t gettid() { return static_cast<pid_t>(syscall(__NR_gettid));}
#endif

private:
pthread_t tid;      // pthread_self() id
int pid;            // 线程的进程ID
Runnable *runnable;
void *args;
};

}

#endif /*THREAD_H_*/


/*
* (C) 2007-2010 Taobao Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
*
* Version: $Id$
*
* Authors:
*   duolong <duolong@taobao.com>
*
*/

#ifndef TBSYS_DEFAULT_RUNNABLE_H_
#define TBSYS_DEFAULT_RUNNABLE_H_

namespace tbsys {

/**
* @brief 线程执行具体的业务的封装类,同时它维护了一个线程数据,也可以将此类
* 看成一个线程池类
*/
class CDefaultRunnable : public Runnable {

public:
/**
* 构造
*/
CDefaultRunnable(int threadCount = 1);

/**
* 析构
*/
virtual ~CDefaultRunnable();

/**
* 设置线程数
*/
void setThreadCount(int threadCount);

/**
* create %_threadCount threads
* @return started thread count;
*/
int start();

/**
* stop
*/
void stop();

/**
* wait
*/
void wait();

protected:
CThread *_thread;
int _threadCount;
bool _stop;
};

}

#endif /*RUNNABLE_H_*/


/*
* (C) 2007-2010 Taobao Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
*
* Version: $Id$
*
* Authors:
*   duolong <duolong@taobao.com>
*
*/

#include "tbsys.h"

namespace tbsys {

/**
* 构造
*/
CDefaultRunnable::CDefaultRunnable(int threadCount) {
_stop = false;
_threadCount = th
b754
readCount;
_thread = NULL;
}
/*
* 析构
*/
CDefaultRunnable::~CDefaultRunnable() {
if (_thread) {
delete[] _thread;
_thread = NULL;
}
}

/**
* 设置线程数
*/
void CDefaultRunnable::setThreadCount(int threadCount)
{
if (_thread != NULL) {
TBSYS_LOG(ERROR, "已经在运行了不能设置线程数");
return;
}
_threadCount = threadCount;
}

// start
int CDefaultRunnable::start() {
if (_thread != NULL || _threadCount < 1) {
TBSYS_LOG(ERROR, "start failure, _thread: %p, threadCount: %d", _thread, _threadCount);
return 0;
}
_thread = new CThread[_threadCount];
if (NULL == _thread)
{
TBSYS_LOG(ERROR, "create _thread object failed, threadCount: %d", _threadCount);
return 0;
}
int i = 0;
for (; i<_threadCount; i++)
{
if (!_thread[i].start(this, (void*)((long)i)))
{
return i;
}
}
return i;
}

// stop
void CDefaultRunnable::stop() {
_stop = true;
}

// wait
void CDefaultRunnable::wait() {
if (_thread != NULL)
{
for (int i=0; i<_threadCount; i++)
{
_thread[i].join();
}
}
}

}

////////END
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息