您的位置:首页 > 其它

libevent源码分析之多线程准备工作

2015-06-25 15:32 429 查看
libevent中是默认不开启多线程的,也就没有什么锁, 条件变量等的说法了

我们可以使用evthread_use_pthreads()开启linux下的pthread

#ifdef WIN32
int evthread_use_windows_threads(void);
#define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
#endif
#ifdef _EVENT_HAVE_PTHREADS
int evthread_use_pthreads(void);
#define EVTHREAD_USE_PTHREADS_IMPLEMENTED
#endif


或者使用evthread_set_lock_callbacks定制属于自己的多线程, 锁, 条件变量等

libevent是如何封装多线程的呢, 还需要从下面这个结构体说起:

这是对某种线程的锁的一些操作封装:

struct evthread_lock_callbacks {
/** The current version of the locking API.  Set this to
* EVTHREAD_LOCK_API_VERSION */
/**/
int lock_api_version;
/** Which kinds of locks does this version of the locking API
* support?  A bitfield of EVTHREAD_LOCKTYPE_RECURSIVE and
* EVTHREAD_LOCKTYPE_READWRITE.
*
* (Note that RECURSIVE locks are currently mandatory, and
* READWRITE locks are not currently used.)
**/
unsigned supported_locktypes;

//初始化锁
void *(*alloc)(unsigned locktype);

//释放锁
void (*free)(void *lock, unsigned locktype);

int (*lock)(unsigned mode, void *lock);

int (*unlock)(unsigned mode, void *lock);
};


根据官方给出的文档, 支持的锁类型有:

普通的非递归锁                                               0
递归锁, 即本线程可以多次对某锁进行lock而不会进入死锁          EVTHREAD_LOCKTYPE_RECURSIVE
读写锁                                   EVTHREAD_LOCKTYPE_READWRITE


支持的锁mode有:

EVTHREAD_READ , EVTHREAD_WRITE ,  EVTHREAD_TRY


以上是封装的线程锁, 下面是封装的线程条件变量:

struct evthread_condition_callbacks {
int condition_api_version;

void *(*alloc_condition)(unsigned condtype);

void (*free_condition)(void *cond);

int (*signal_condition)(void *cond, int broadcast);

int (*wait_condition)(void *cond, void *lock,
const struct timeval *timeout);
};


因为对于条件变量的包装类似, 所以这里略过

如何与libeven相结合的呢?

文章顶部提到, 我们可以使用evthread_use_pthreads来开启linux下的pthread:

int
evthread_use_pthreads(void)
{
struct evthread_lock_callbacks cbs = {
EVTHREAD_LOCK_API_VERSION,
EVTHREAD_LOCKTYPE_RECURSIVE,
evthread_posix_lock_alloc,
evthread_posix_lock_free,
evthread_posix_lock,
evthread_posix_unlock
};
struct evthread_condition_callbacks cond_cbs = {
EVTHREAD_CONDITION_API_VERSION,
evthread_posix_cond_alloc,
evthread_posix_cond_free,
evthread_posix_cond_signal,
evthread_posix_cond_wait
};
/* Set ourselves up to get recursive locks. */
//以下为初始化递归锁的属性,attr_recursive是全局变量, 该属性对象是在初始化一个锁的时候使用的
//虽然我们的默认锁的名字叫EVTHREAD_LOCKTYPE_RECURSIVE,但真正让它称为递归锁还是要靠这个属性而不是单纯的名字就行...
if (pthread_mutexattr_init(&attr_recursive))
return -1;
//为该属性添加递归锁的递归属性
if (pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE))
return -1;

//设置整个event_base将会使用的线程锁、条件变量、 以及如何获取线程自身的线程ID
evthread_set_lock_callbacks(&cbs);
evthread_set_condition_callbacks(&cond_cbs);
evthread_set_id_callback(evthread_posix_get_id);
return 0;
}


关于这里面的函数, 要提一下的是下面的函数:

static void *
evthread_posix_lock_alloc(unsigned locktype)
{
pthread_mutexattr_t *attr = NULL;
pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t));
if (!lock)
return NULL;
//从这里可以看出, 如果我们需要递归锁, 那么就会用到那个全局变量, 否则就是普通的非递归锁
//还可以看出来的是, 并没有支持读写锁
if (locktype & EVTHREAD_LOCKTYPE_RECURSIVE)
attr = &attr_recursive;
if (pthread_mutex_init(lock, attr)) {
mm_free(lock);
return NULL;
}
return lock;
}


static int
evthread_posix_cond_signal(void *_cond, int broadcast)
{
pthread_cond_t *cond = _cond;
int r;
if (broadcast)                //可见, 如果broadcast不为0, 那么就会唤醒不止一个线程
r = pthread_cond_broadcast(cond);
else
r = pthread_cond_signal(cond);
return r ? -1 : 0;
}


好了,看到这里, 想必对libevent对线程的支持有所了解了,只是简单的在我们平时使用的基础上进行一些封装, 理解起来并不困难

还有要注意的是, 设置libevent支持线程必须在最开始就声明好!与自己设置内存分配函数、日志记录函数类似, 不能半途设置的!

debug锁

除了普通的系统支持的锁之外, libevent还定制了一种特有的锁

该锁支持对锁操作的几个检测,如下, 看过后还会做总结:

1、某线程可能解锁自己并不持有的锁

2、在没有解锁之前,想要锁住一个非递归的锁

如何开启呢, 可以在使用evthread_use_pthreads开启线程后, 再调用evthread_enable_lock_debugging就可以开启debug锁了

void
evthread_enable_lock_debuging(void)
{
struct evthread_lock_callbacks cbs = {
EVTHREAD_LOCK_API_VERSION,
EVTHREAD_LOCKTYPE_RECURSIVE,
debug_lock_alloc,
debug_lock_free,
debug_lock_lock,
debug_lock_unlock
};

//是否已经声明过该debug锁了
if (_evthread_lock_debugging_enabled)
return;

//全局变量_evthread_lock_fns是在evthread_use_pthreads中调用evthread_set_lock_callbacks为其赋值的
//现在用_original_lock_fns变量来保存它
memcpy(&_original_lock_fns, &_evthread_lock_fns,
sizeof(struct evthread_lock_callbacks));

//此刻的_evthread_lock_fns成为了debug锁
memcpy(&_evthread_lock_fns, &cbs,
sizeof(struct evthread_lock_callbacks));

memcpy(&_original_cond_fns, &_evthread_cond_fns,
sizeof(struct evthread_condition_callbacks));

//这里只是修改了条件变量里的一个函数, 这是因为只有wait函数才用到锁...
_evthread_cond_fns.wait_condition = debug_cond_wait;
_evthread_lock_debugging_enabled = 1;

/* XXX return value should get checked. */
event_global_setup_locks_(0);
}


也就是说, 现在所有的锁相关到操作都被替换为debug锁的操作了, 之前在evthread_set_lock_callbacks设置的虽然被取代但也都被保留了下来

于是我们现在看看debug锁的具体操作:

struct debug_lock {
unsigned locktype;
unsigned long held_by;
/* XXXX if we ever use read-write locks, we will need a separate
* lock to protect count. */
int count;
void *lock;
};
static void *
debug_lock_alloc(unsigned locktype)
{
struct debug_lock *result = mm_malloc(sizeof(struct debug_lock));
if (!result)
return NULL;
//如果我们是在开启了线程之后开启debug锁的, 那么我们会利用该锁的alloc函数为debug锁生成一个对象, 要注意的是, 是递归锁
if (_original_lock_fns.alloc) {
if (!(result->lock = _original_lock_fns.alloc(
locktype|EVTHREAD_LOCKTYPE_RECURSIVE))) {
mm_free(result);
return NULL;
}
} else {
result->lock = NULL;
}
result->locktype = locktype;
result->count = 0;
result->held_by = 0;
return result;
}


从上面我们可以知道, 分配、释放debug锁和分配、释放系统锁并没有太大差别.

差别主要在于debug的主要职责上, 即在加锁和解锁时进行特别的判定

如下:

//此时,我们需要上锁
static int
debug_lock_lock(unsigned mode, void *lock_)
{
struct debug_lock *lock = lock_;
int res = 0;
//此锁的locktype就是创建时候自己定义的,外加一个递归属性 , 如果此锁是读写锁
//这里的判断是, 如果是读写锁, 至少有个读锁或写锁的属性
if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
else
//既然不是读写锁, 那就不应该有读锁或写锁的属性
EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);

//如果被debug封装的锁具有锁的属性, 如果没有,那就表示这之前没有调用evthread_use_pthreads等开启多线程, 也表示lock变量中的lock成员为空
if (_original_lock_fns.lock)
res = _original_lock_fns.lock(mode, lock->lock);   //要知道的是, lock成功返回0

//进入这个if有两个条件, 一个是这之前没有调用evthread_use_pthreads等, 二是成功锁住此锁了
if (!res) {
evthread_debug_lock_mark_locked(mode, lock);
}
return res;
}

static void
evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock *lock)
{
++lock->count;            //此变量初始值为0
//如果没有递归这个属性, 即非递归锁, count只能为1. 只是若一个非递归的锁是正常的(是没bug的?),那么肯定会在调用_original_lock_fns.lock时形成死锁而不会到这一步...
if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE))
EVUTIL_ASSERT(lock->count == 1);
if (_evthread_id_fn) {
//得到当前线程的线程ID
unsigned long me;
me = _evthread_id_fn();
//如果count大于1了,说明到了递归锁的特性了, 必须是本线程先前就拥有的这把锁的所有权
if (lock->count > 1)
EVUTIL_ASSERT(lock->held_by == me);
lock->held_by = me;
}
}


看了加锁的实现, 肯定是迫不及待的去看解锁的实现了:

static int
debug_lock_unlock(unsigned mode, void *lock_)
{
struct debug_lock *lock = lock_;
int res = 0;
evthread_debug_lock_mark_unlocked(mode, lock);
if (_original_lock_fns.unlock)
res = _original_lock_fns.unlock(mode, lock->lock);
return res;
}
static void
evthread_debug_lock_mark_unlocked(unsigned mode, struct debug_lock *lock)
{
if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
else
EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
if (_evthread_id_fn) {
//如果持有锁的线程不是我,那么就出错了
EVUTIL_ASSERT(lock->held_by == _evthread_id_fn());
//如果此锁count为1,那么解锁后就不归任何线程所有了
if (lock->count == 1)
lock->held_by = 0;
}
--lock->count;
//如果之前没有锁过这个锁, 那么无需解...
EVUTIL_ASSERT(lock->count >= 0);
}




综上,再来总结一下debug锁解决了哪些不该出现的问题呢?

1、首先我们知道, 对于一个读写锁(在调用alloc申请的时候表明的意图),如果在给锁加锁的时候没有指定任何EVTHREAD_READ或写的加锁模式,那么肯定是错误的(解锁亦然)

2、(可能,这一条不确定...)对于一个非递归锁却在一个线程中调用lock不止一次,可能会报错(可能直接死锁了)

3、递归四只能在一个线程中锁几次

4、当前锁是锁住的,但加锁的不是我, 但我要去解锁,就出错

5、没有加过锁, 却还要解锁



设置libevent要用的线程相关工具

认识了debug锁, 接下来我们更加深入的探讨一下evthread_set_lock_callbacks函数:

int
evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs)
{
//_evthread_lock_debugging_enabled标志为全局变量,初始为0, 为是否开启debug锁, 不过开启debug锁一般在开启线程之后的
//然而调用此函数evthread_set_lock_callbacks
struct evthread_lock_callbacks *target =
_evthread_lock_debugging_enabled
? &_original_lock_fns : &_evthread_lock_fns;

if (!cbs) {
//加入参数为NULL, 那么可以取消锁的实现, 只是警告中说这并不一定起作用
if (target->alloc)
event_warnx("Trying to disable lock functions after "
"they have been set up will probaby not work.");
memset(target, 0, sizeof(_evthread_lock_fns));
return 0;
}
//这步操作可能并不是在我们之前提到的整个libevent的最前面发生的,情理之中也是不允许中途改锁的
if (target->alloc) {
/* Uh oh; we already had locking callbacks set up.*/
if (target->lock_api_version == cbs->lock_api_version &&
target->supported_locktypes == cbs->supported_locktypes &&
target->alloc == cbs->alloc &&
target->free == cbs->free &&
target->lock == cbs->lock &&
target->unlock == cbs->unlock) {
/* no change -- allow this. */
return 0;
}
event_warnx("Can't change lock callbacks once they have been "
"initialized.");
return -1;
}

//必须支持这里的所有操作,否则无法接受
if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) {
memcpy(target, cbs, sizeof(_evthread_lock_fns));
return event_global_setup_locks_(1);
} else {
return -1;
}
}


看到这里,是不是觉得就不能修改锁的相关函数了呢, 其实是可以的, 首先我们第一次调用传入NULL参数, 第二次调用传入新的操作函数结构体, 可能就可以了

但是, 这样做是存在问题的.

了解了锁的大部分实现后, 我们来看看是libevent是如何应用他们的

经常见到的就是这两个了:

EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);


EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
第一个参数为base , 第二个为锁对象

某个锁定然是与某个base相关的,于是我们第一事件找到event_base:

struct event_base {

... ...

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
/* threading support */
/** The thread currently running the event_loop for this base */
//主线程ID
unsigned long th_owner_id;
/** A lock to prevent conflicting accesses to this event_base */
//访问base中元素用的lock, 比如有时候需要添加一些事件等
void *th_base_lock;

... ...

/** A condition that gets signalled when we're done processing an
* event with waiters on it. */
void *current_event_cond;
/** Number of threads blocking on current_event_cond. */
int current_event_waiters;
#endif

... ...

};


接下来看看宏的相关实现:

#define EVBASE_ACQUIRE_LOCK(base, lockvar) do {                         \
EVLOCK_LOCK((base)->lockvar, 0);
} while (0)

#define EVLOCK_LOCK(lockvar,mode)                                       \
do {                                                            \
//如果这个锁是存在的, 就调用lock函数来锁
if (lockvar)                                            \
_evthread_lock_fns.lock(mode, lockvar);         \
} while (0)


#define EVBASE_RELEASE_LOCK(base, lockvar) do {                         \
EVLOCK_UNLOCK((base)->lockvar, 0);                      \
} while (0)

#define EVLOCK_UNLOCK(lockvar,mode)                                     \
do {                                                            \
if (lockvar)                                            \
_evthread_lock_fns.unlock(mode, lockvar);       \
} while (0)


这个lockvar我们看到在event_base中有定义, 凭什么说lockvar存在, 对应的锁函数就存在呢?

我们就来看看是如何初始化这个锁的:

struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{

... ...

/* prepare for threading */

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
if (EVTHREAD_LOCKING_ENABLED() &&
//如果没有事先声明不需要锁
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
int r;

//此锁被声明为递归锁
EVTHREAD_ALLOC_LOCK(base->th_base_lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
base->defer_queue.lock = base->th_base_lock;
EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_warnx("%s: Unable to make base notifiable.", __func__);
event_base_free(base);
return NULL;
}
}
#endif

... ...

}


此锁也是普通的利用锁的alloc函数构造的,所以并不是说 lockvar存在, 对应的锁函数就存在, 而是说如果lockvar不存在, 那么对应锁函数也不会存在, 也就不会去调用到它

也就避免了这个可能的错误

不过,之前我们提到, 在声明使用线程后, base的初始化就会成功初始化base->th_base_lock, 此时我们可以通过给evthread_set_lock_callbacks传入NULL参数, 现在就算lcokvar存在,我们也无法证明其对应的锁函数是存在的了!像现在的情况, 传入NULL后, 锁函数为空, 再次运行到锁的宏, 就会出错了。

所以不提倡在运行后修改锁的操作函数。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: