您的位置:首页 > 其它

变态的libDispatch结构分析-semaphore

2014-01-16 18:32 357 查看
文件位置:

libdispatch/dispatch/semaphore.h

libdispatch/src/semaphore.c

Libdispatch中的semaphore 是基于bionic的semaphore;

bionic中semaphore的接口分析,请参看bionic semaphore学习

Linux下载sem_t 则可以参看

Linux 信号量概述(翻译 man 7 sem_overview) 

Linux 信号量 sem_init(3) (翻译 man 3)

Linux 信号量sem_post(3)(翻译 man 3)

Linux 信号量sem_wait(3) (翻译 man 3)

针对Dispatch的semaphore定义了三个方法:

/*!
* @function dispatch_semaphore_create
*
* @abstract
* Creates new counting semaphore with an initial value.
*
* @discussion
* Passing zero for the value is useful for when two threads need to reconcile
* the completion of a particular event. Passing a value greather than zero is
* useful for managing a finite pool of resources, where the pool size is equal
* to the value.
*
* @param value
* The starting value for the semaphore. Passing a value less than zero will
* cause NULL to be returned.
*
* @result
* The newly created semaphore, or NULL on failure.
*/
__OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_4_0)
DISPATCH_EXPORT DISPATCH_MALLOC DISPATCH_NOTHROW
dispatch_semaphore_t
dispatch_semaphore_create(long value);

/*!
* @function dispatch_semaphore_wait
*
* @abstract
* Wait (decrement) for a semaphore.
*
* @discussion
* Decrement the counting semaphore. If the resulting value is less than zero,
* this function waits in FIFO order for a signal to occur before returning.
*
* @param dsema
* The semaphore. The result of passing NULL in this parameter is undefined.
*
* @param timeout
* When to timeout (see dispatch_time). As a convenience, there are the
* DISPATCH_TIME_NOW and DISPATCH_TIME_FOREVER constants.
*
* @result
* Returns zero on success, or non-zero if the timeout occurred.
*/
__OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_4_0)
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout);

/*!
* @function dispatch_semaphore_signal
*
* @abstract
* Signal (increment) a semaphore.
*
* @discussion
* Increment the counting semaphore. If the previous value was less than zero,
* this function wakes a waiting thread before returning.
*
* @param dsema The counting semaphore.
* The result of passing NULL in this parameter is undefined.
*
* @result
* This function returns non-zero if a thread is woken. Otherwise, zero is
* returned.
*/
__OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_4_0)
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW
long
dispatch_semaphore_signal(dispatch_semaphore_t dsema);


1. dispatch_semaphore_t结构

struct dispatch_semaphore_s {
DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s);
long dsema_value;
long dsema_orig;
size_t dsema_sent_ksignals;
#if USE_POSIX_SEM
sem_t dsema_sem;
#endif
size_t dsema_group_waiters;
struct dispatch_sema_notify_s *dsema_notify_head;
struct dispatch_sema_notify_s *dsema_notify_tail;
};


dsema_value : 信号值;
dsema_sem: Linux下的sem_t,信号量的数据类型,它本质上是一个长整型的数;
dsema_orig: 记录其初始值,或者成为原始值,这是创建信号时设置的;
dsema_sent_ksignals:从目前分析来看,只是一个防止信号量被意外唤醒的变量,这个变量,在signal过程中+1,然后wait的时候如果发现这个值不是1,那么直接就返回了,而不会进入接下来的sim_wait;

2. dispatch_semaphore_create(long value)

直接翻译成:创建一个计数方式的信号量。

Value值:

如果两个线程,协作完成一个事件: 那么传0;

若是管理有限的资源池,那么value等于这写资源数量,这里指的应该是线程数量;

若传入的value小于0 返回的将是NULL,创建失败;

dispatch_semaphore_t
dispatch_semaphore_create(long value)
{
dispatch_semaphore_t dsema;
#if USE_POSIX_SEM
int ret;
#endif

// If the internal value is negative, then the absolute of the value is
// equal to the number of waiting threads. Therefore it is bogus to
// initialize the semaphore with a negative value.
if (value < 0) {
return NULL;
}

dsema = (dispatch_semaphore_t)calloc(1, sizeof(struct dispatch_semaphore_s));

if (fastpath(dsema)) {
dsema->do_vtable = &_dispatch_semaphore_vtable;
dsema->do_next = (dispatch_semaphore_t)DISPATCH_OBJECT_LISTLESS;
dsema->do_ref_cnt = 1;
dsema->do_xref_cnt = 1;
dsema->do_targetq = dispatch_get_global_queue(0, 0);
dsema->dsema_value = value;
dsema->dsema_orig = value;
#if USE_POSIX_SEM
ret = sem_init(&dsema->dsema_sem, 0, 0);
(void)dispatch_assume_zero(ret);
#endif
}

return dsema;
}
sem_init(&dsema->dsema_sem, 0, 0); 将dsema->dsema_sem进行初始化。

3. dispatch_semaphore_wait

dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout);

等待一个信号的到来;

原理:信号值dsema_value减去1,若结果小于0,那么就等到FIFO队列中信号量的到来,直到timeout为止;

若返回0,表示wait成功;
非0表示time out.

long
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
if (dispatch_atomic_dec(&dsema->dsema_value) >= 0) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}


这段代码,中我把apple相关的东西去掉了;

也就是,如果信号量的值-1之后大于等于0,表示有资源可用,那么返回0;

否则进入如下的函数:

static long
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{

#if USE_POSIX_SEM
struct timespec _timeout;
#endif
#if USE_POSIX_SEM || USE_WIN32_SEM
int ret;
#endif
long orig;

again:
// Mach semaphores appear to sometimes spuriously wake up.  Therefore,
// we keep a parallel count of the number of times a Mach semaphore is
// signaled (6880961).
while ((orig = dsema->dsema_sent_ksignals)) {//这段代码应该是为了修复某个bug添加的。
if (dispatch_atomic_cmpxchg(&dsema->dsema_sent_ksignals, orig, orig - 1)) {
return 0;
}
}
// From xnu/osfmk/kern/sync_sema.c:
// wait_semaphore->count = -1;  /* we don't keep an actual count */
//
// The code above does not match the documentation, and that fact is
// not surprising. The documented semantics are clumsy to use in any
// practical way. The above hack effectively tricks the rest of the
// Mach semaphore logic to behave like the libdispatch algorithm.

switch (timeout) {
default:
#if USE_POSIX_SEM
do {
_timeout = _dispatch_timeout_ts(timeout);
//sem_timedwait返回0表示成功
  ret = slowpath(sem_timedwait(&dsema->dsema_sem,
&_timeout));
} while (ret == -1 && errno == EINTR); //EINTR表示调用被信号处理中断
if (!(ret == -1 && errno == ETIMEDOUT)) {
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
break;
}
#endif
// Fall through and try to undo what the fast path did to dsema->dsema_value
case DISPATCH_TIME_NOW:
while ((orig = dsema->dsema_value) < 0) {
if (dispatch_atomic_cmpxchg(&dsema->dsema_value, orig, orig + 1)) {
#if USE_POSIX_SEM || USE_WIN32_SEM
errno = ETIMEDOUT;
return -1;
#endif
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
#if USE_POSIX_SEM
do {
ret = sem_wait(&dsema->dsema_sem);
} while (ret != 0);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
break;
}

goto again;
}


进入_dispatch_semaphore_wait_slow之后根据timeout存在三种情况:

(1). 检测后立即返回,不等待:DISPATCH_TIME_NOW

若dsema_value 小于0,那么将其+1, 然后返回-1;

若大于等于0,表示有资源可用,那么进入到again;

(2). 无限等待: DISPATCH_TIME_FOREVER

sem_wait将无限等待到信号值>0;

等信号来了,跳转到again;

(3). 计时等待:default

sem_timedwait(&dsema->dsema_sem,&_timeout)


直到timeout或者有信号来了为止;

进入:Again之后,为了防止是虚假信号,需要检测dsema_sent_ksignals信号值,从代码上来看,这个值与信号量的值是保持一致的;

若这个值不大于0,表示传来的信号是非真实的,需要继续wait;
若这个值大于0,那么将dsema_sent_ksignals减去1,然后返回0;

从上面的红色字来看,如果在第三中情况,也就是timeout了,这时候走到again,发现dsema_sent_ksignals还是0;这时候会继续走到case分支里面来,又仅需到timedwait

因此从代码上来看,这个代码会一直等待,直到信号到来为止。这样看来timeOut似乎就是无效的一个东西了?

关于这里面的error值的获取问题,请参看:Linux错误处理

4. dispatch_semaphore_signal

翻译其说明:

 * Increment the counting semaphore. If the previous value was less than zero,

 * this function wakes a waiting thread before returning.

如果信号量之前的值小于0,那么+1;返回之前将唤醒一个正在等待的线程;

 * @result

 * This function returns non-zero if a thread is woken. Otherwise, zero is

 * returned.

若有一个线程被唤醒,那么返回非0值,否则返回0

long
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
if (dispatch_atomic_inc(&dsema->dsema_value) > 0) {
return 0;
}
return _dispatch_semaphore_signal_slow(dsema);
}


若信号值+1之后大于0,直接返回0;

否则进入到_dispatch_semaphore_signal_slow

static long
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
int ret;
// Before dsema_sent_ksignals is incremented we can rely on the reference
// held by the waiter. However, once this value is incremented the waiter
// may return between the atomic increment and the semaphore_signal(),
// therefore an explicit reference must be held in order to safely access
// dsema after the atomic increment.
_dispatch_retain(dsema);

dispatch_atomic_inc(&dsema->dsema_sent_ksignals);
#if USE_POSIX_SEM
ret = sem_post(&dsema->dsema_sem);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
_dispatch_release(dsema);

return 1;
}


这段代码里存在一个函数调用,_dispatch_retain

上面的注释中没怎么看懂,这里防止waiter在原子+1到发送信号之前就返回了,这要看wait和signal的源码:

wait中:

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout)
{
int  ret;
unsigned int shared;

if (sem == NULL) {
errno = EINVAL;
return -1;
}

/* POSIX says we need to try to decrement the semaphore
* before checking the timeout value. Note that if the
* value is currently 0, __sem_trydec() does nothing.
*/
if (__sem_trydec(&sem->count) > 0) {
ANDROID_MEMBAR_FULL();
return 0;
}
这段代码要结合第三点dispatch_semaphore_wait一起看,_sem_trydec若发现sem->count小于等于0,立即会返回sem->count;若sem->count大于0,

根本就不会进入到__futex_wait_ex中;

但是我们在_dispatch_semaphore_signal_slow中发现了其过程是如下这样的:

old = __sem_inc(&sem->count);
if (old < 0) {
/* contention on the semaphore, wake up all waiters */
__futex_wake_ex(&sem->count, shared, INT_MAX);
}


也就是先进行加1,然后再Wake;

问题:进入wake之后,waiter通过_sem_trydec发现sem->count大于0,直接返回了;而不会再次进入到__futex_wait_ex中,目前不知道会带来什么危害。

_dispatch_retain(dsema);

dispatch_atomic_inc(&dsema->dsema_sent_ksignals);
#if USE_POSIX_SEM
ret = sem_post(&dsema->dsema_sem);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
#endif
_dispatch_release(dsema);


这段代码在前后加入了retain和release,通过外部引用计数来控制,安全访问dsema。
究竟有什么好处,不得而至。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: