您的位置:首页 > 运维架构 > Linux

【深入剖析Linux内核】Linux内核之旅--(三)同步

2013-08-10 18:47 323 查看
1、概述
同步问题是操作系统中的经典问题,它伴随着并发处理而自诞生。现代体系结构中常见的并发处理情况可以分为如下三种情况:
(1)多个线程在单处理器上执行——多线程编程
(2)多个线程在多处理器上执行——并行计算
(3)多个线程在分布的多个处理上执行——分布式计算
相应的编程也分成三种情况:
共享变量编程、分布式(基于消息)编程和并行编程。
1.1、并发程序设计的本质
并发程序通常包括两个或多个进程一起工作,共同完成一项任务,于是进程(线程)间的通信产生,也就产生了同步问题。进程(或者线程)间需要通信是产生同步的根本原因,正是因为需要通信,才需要同步。进程之间有两种通信方式:共享变量(shared
variables)和消息传递(message passing)。使用共享变量时,一个进程对变量进行写操作,另一个进程进行读操作。使用消息传递时,一个进程发送消息,一个进程接收消息。不管使用哪种通信方式,进程之间需要进行同步。有两种基本的同步方式:互斥(mutual
exclusion)和条件同步(condition
synchronization)。互斥保证关键代码段不会同一时刻执行。条件同步会阻塞进程,直到相应的条件发生。例如,对于通过共享内存方式来实现通信生产者和消费者进程,互斥变量保证生产者访问内存时,消费者不会访问内存。条件同步保证生产者写数据之前,消费者不会读数据。
同步的根本目的是创建临界区(critical region)或者等待特定的条件,常用的方式有:锁(lock)、信号量(semaphore)和管程(monitor)。前两者在Linux都有实现,最后一种方式一般在用户态层面实现(比如Java就是采用管程来实现同步原语的)。
    并发编程的硬件来源:中断和多处理器。
1.2、硬件架构
常见而流行的三种计算机架构:(1)单处理器和内存(2)共享内存多处理器(3)分布式内存,包括多计算机和计算机网络。
1.2.1、单处理器体系结构



1.2.2、共享内存多处理器
    处理器与内存之间通过互连网络连接起来。小规模的多处理器计算机可以包括30个处理器不等。互连网络通过内存总路(memory bus)或交换开关(crossbar switch)来实现。这种架构通常叫做UMA计算机,因为每个处理器和内存之间有相同的访问机会。UMA机器也叫做SMP。



1.2.3、分布式内存多处理器
    在分布式内存多处理器结构中,也存在用于通信的互连网络,但是每个处理器都有它自己的私有内存(private memory)。



这种结构使用消息传递,而不是读写内存;不存在缓存和内存一致性问题。

为了利用多个处理器,有三种应用程序:多线程系统、分布式系统和并行计算。

2、Linux内核同步
Linux内核中产生并发处理的硬件来源有两个:中断和多CPU。其基本同步方式有四种:关闭中断(只对本地CPU)、原子操作(atomic operation)、自旋锁(spin lock)和信号量。原子操作实际上只是对CPU原子指令的简单包装,它的原子性由硬件保证。而硬件提供的原子指令是实现锁和信号量的基础。
对于内核态,CPU可能处于两处不同的内核控制路径:中断处理和异常处理(包括系统调用)。对中断处理程序,在单CPU下,可以通过禁止中断实现临界区;在多CPU下,可以通过自旋锁实现临界区。对异常处理程序,在单CPU下,可以通过禁止内核抢占实现临界区;在多CPU下,可以通过信号量实现临界区。

2.1、自旋锁(spin lock)
自旋锁主要是针对多CPU的,它是一种忙等待形式的同步(所以浪费CPU机器周期),主要用于中断处理。对于由自旋锁保护的临界区,会禁止内核抢占。对于单CPU,自旋锁除了禁止(或者开启)内核抢占外,什么也不做。

2.1.1、硬件支持
在X86平台下,可以对如下一些指令加上LOCK前缀来保证指令的原子性执行:
 
(1)    位测试修改指令,如BTS,BTR和BTC;
(2)    交换指令XCHG,实际上,对于该指令,即使不加LOCK前缀,也是自动原子执行;
(3)    一些单操作数算术和逻辑运算指令,如INC, DEC ,NOT和NEG;
(4)    一些双操作数指令,如ADD, ADC,SUB,SBB, AND ,OR和XOR。
这些原子操作是实现自旋锁和信号量的基础。

2.1.2、自旋锁的实现
提供的接口如下:



数据结构:

//include/asm-i386/spinlock.h 

/*自旋锁数据结构,2.6.10*/

typedef struct {

    volatile unsigned int lock;

#ifdef CONFIG_DEBUG_SPINLOCK

    unsigned magic;
#endif

} spinlock_t; 

/*从2.6.11开始,与2.6.10有些变化*/

typedef struct {

    volatile unsigned int slock;

#ifdef CONFIG_DEBUG_SPINLOCK

    unsigned magic;
#endif

#ifdef CONFIG_PREEMPT

    unsigned int break_lock;
#endif

} spinlock_t; 

接口的实现:

//include/linux/spinlock.h
#define spin_lock(lock)        _spin_lock(lock) 
#define spin_unlock(lock)    _spin_unlock(lock) 

////////////////////////////抢占内核的spin_lock//////////////////////
//kernel/spinlock.c
void __lockfunc _spin_lock(spinlock_t *lock)

{

    preempt_disable(); //禁止内核抢占
    if (unlikely(!_raw_spin_trylock(lock)))

        __preempt_spin_lock(lock);

}

//include/asm-i386/spinlock.h
//返回1表示获得了自旋锁,返回0表示获取自旋锁失败
static inline int _raw_spin_trylock(spinlock_t *lock)

{

    char oldval;

    /*xchgb是原子字令.

    **这些指令相当于:oldval=0;tmp=oldval;oldval=lock->lock;lock->lock=tmp;

    **即读取lock字段的旧值,将将其设为0(即锁住状态)

 旧

    */

    __asm__ __volatile__(

        "xchgb %b0,%1"

        :"=q" (oldval), "=m" (lock->lock)

        :"0" (0) : "memory");

        //如果自旋锁的旧值为正数(即原自旋锁处于unlock状态,当前内核控制中径可以获取锁),则函数返回1,否则返回0
    return oldval > 0;

}
//include/linux/preempt.h
#define preempt_disable() \
do { \

    inc_preempt_count(); \

    barrier(); \

} while (0)

//kernel/spinlock.c
//当前CPU内核控制路径获取自旋锁失败时调用该函数
static inline void __preempt_spin_lock(spinlock_t *lock)

{

    if (preempt_count() > 1) {

        _raw_spin_lock(lock);

        return;

    }

    do {

        //抢占计数器值减1,在等待自旋锁时,允许内核抢占
        preempt_enable();

        while (spin_is_locked(lock))

            cpu_relax();

        preempt_disable();

    } while (!_raw_spin_trylock(lock));//循环请求自旋锁
}

///////////////////////////对于非抢占内核的spin_lock//////////////////////////////
//对于非抢占的内核spin_lock
void __lockfunc _spin_lock(spinlock_t *lock)

{

    //对于非抢占内核,什么也不做
    preempt_disable(); 

    _raw_spin_lock(lock);

}
//include/asm-i386/spinlock.h
static inline void _raw_spin_lock(spinlock_t *lock)

{

#ifdef CONFIG_DEBUG_SPINLOCK

    if (unlikely(lock->magic != SPINLOCK_MAGIC)) {

        printk("eip: %p\n", __builtin_return_address(0));

        BUG();

    }
#endif

    __asm__ __volatile__(

        spin_lock_string

        :"=m" (lock->lock) : : "memory");

}

#define spin_lock_string \

    "\n1:\t" \

    "lock ; decb %0\n\t" \ #锁计数器值减1

    "jns 3f\n" \       #如果小于0,则跳到标号3

    "2:\t" \

    "rep;nop\n\t" \    #执行空指令

    "cmpb $0,%0\n\t" \ #与0比较

    "jle 2b\n\t" \     #小于或等于0,则跳到2

    "jmp 1b\n" \       #大于0,则跳到1

    "3:\n\t"

    
/////////////////////////spin_unlock的实现/////////////////////////////////    
    
void __lockfunc _spin_unlock(spinlock_t *lock)

{

    _raw_spin_unlock(lock);

    preempt_enable();

}
//include/asm-i386/spinlock.h
static inline void _raw_spin_unlock(spinlock_t *lock)

{

#ifdef CONFIG_DEBUG_SPINLOCK

    BUG_ON(lock->magic != SPINLOCK_MAGIC);

    BUG_ON(!spin_is_locked(lock));
#endif

    __asm__ __volatile__(

        spin_unlock_string

    );

}
#define spin_unlock_string \

    "movb $1,%0" \

        :"=m" (lock->lock) : : "memory"

2.2、睡眠与唤醒

在操作系统中,睡眠和唤醒原语实际上是操作系统的基本原语,也是实现同步的一种方式,而且它还是实现信号量的基础。当进程请求的资源(如内存、文件等)不能得到满足时,就会主动放弃CPU,进入等待状态(可中断等待或者不可中断等待)。当资源满足时,就会由别的进程唤醒,从而投入运行。

2.2.1、等待队列

等待队列表示一组睡眠的进程,这些进程正在等待特定的事件发生(或者说条件为真),比如,等待足够的内存。等待队列是一个双链表,每个队列都有一个队列头,其定义如下:

//include/linux/wait.h
//等待队列头
struct __wait_queue_head {

      // 自旋锁
    spinlock_t lock;

    struct list_head task_list;

};

typedef struct __wait_queue_head wait_queue_head_t;

等待队列链表中的元素类型为:

typedef struct __wait_queue wait_queue_t;
//唤醒函数指针
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int sync, void *key);
//默认的唤醒函数
int default_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key);

struct __wait_queue {

    /*取值为WQ_FLAG_EXCLUSIVE(=1)表示互斥进程,由内核有选择的唤醒.为0时表示非互斥进程,由内核在

    **事件发生时唤醒所有等待进程.

    **/

    unsigned int flags;
#define WQ_FLAG_EXCLUSIVE    0x01

    //等待的任务描述符
    struct task_struct * task;

    //唤醒函数,默认为default_wake_function
    wait_queue_func_t func;

    struct list_head task_list;

};

其典型的结构如下:



等待队列头的初始化:

DECLARE_WAIT_QUEUE_HEAD(name);

其定义如下:

//incude/linux/wait.h
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) {                \

    .lock        = SPIN_LOCK_UNLOCKED,                \

    .task_list    = { &(name).task_list, &(name).task_list } }

//初始化等待队列头
#define DECLARE_WAIT_QUEUE_HEAD(name) \

    wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

或者如下:

wait_queue_head_t my_queue; 

init_waitqueue_head(&my_queue);

等待队列元素初始化:

//linux/wait.h
//wait_queue_t初始化
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)

{

    q->flags = 0;

    q->task = p;

    q->func = default_wake_function;

}

2.2.2、等待事件(Waiting on the Event)

内核提供的等待接口包括wait_event(), wait_event_ interruptible(), 和wait_event_interruptible_timeout()。此外sleep_on(), sleep_on_timeout(), 和interruptible_sleep_on()在2.6中仍然支持,但已经过时。这些接口的基本实现如下:



具体代码如下:

//linux/wait.h
#define wait_event(wq, condition)                     \
do {                                    \

    if (condition)    //条件发生                         \
        break;                            \

    __wait_event(wq, condition);                    \

} while (0)

#define __wait_event(wq, condition)                     \
do {                                    \

    DEFINE_WAIT(__wait);                        \

                                    \

    for (;;) {                            \

        prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE);    \

        if (condition)                        \

            break;                        \

        schedule();//调度                        \
    }                                \

    finish_wait(&wq, &__wait);                    \

} while (0)

//kernel/wait.c
void fastcall

prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)

{

    unsigned long flags;

    //非互斥进程
    wait->flags &= ~WQ_FLAG_EXCLUSIVE;

    //关中断,并请求自旋锁
    spin_lock_irqsave(&q->lock, flags);

    if (list_empty(&wait->task_list))

        __add_wait_queue(q, wait);  //将等待任务加入等待队列
    /*

     * don't alter the task state if this is just going to

     * queue an async wait queue callback

     */

    if (is_sync_wait(wait))

        set_current_state(state);  //设置任务当前的状态

    //释放自旋锁,并恢复处理器状态
    spin_unlock_irqrestore(&q->lock, flags);

}

//等待完成之后,应该设置任务的状态为运行状态,并从等待队列中删除
void fastcall finish_wait(wait_queue_head_t *q, wait_queue_t *wait)

{

    unsigned long flags;

    __set_current_state(TASK_RUNNING); //设置为运行状态
    

    if (!list_empty_careful(&wait->task_list)) {

        spin_lock_irqsave(&q->lock, flags);

        list_del_init(&wait->task_list);    //从等待队列中删除
        spin_unlock_irqrestore(&q->lock, flags);

    }

}

2.2.3、唤醒(Waking Up)

接口如下:

//include/inux/wait.h
#define wake_up(x)            __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL)
#define wake_up_nr(x, nr)        __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, nr, NULL)
#define wake_up_all(x)            __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 0, NULL)
#define wake_up_interruptible(x)    __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
#define wake_up_interruptible_nr(x, nr)    __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)
#define wake_up_interruptible_all(x)    __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
#define    wake_up_locked(x)        __wake_up_locked((x), TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE)
#define wake_up_interruptible_sync(x)   __wake_up_sync((x),TASK_INTERRUPTIBLE, 1)

具体实现:

//kernel/sched.c
void fastcall __wake_up(wait_queue_head_t *q, unsigned int mode,

                int nr_exclusive, void *key)

{

    unsigned long flags;

    //请求自旋锁,并关中断
    spin_lock_irqsave(&q->lock, flags);

    __wake_up_common(q, mode, nr_exclusive, 0, key);

    spin_unlock_irqrestore(&q->lock, flags);

}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,

                 int nr_exclusive, int sync, void *key)

{

    struct list_head *tmp, *next;

    list_for_each_safe(tmp, next, &q->task_list) {

        wait_queue_t *curr;

        unsigned flags;

        curr = list_entry(tmp, wait_queue_t, task_list);

        flags = curr->flags;

        //调用相应的唤醒函数, 唤醒第1个有WQ_FLAG_EXCLUSIVE标志的进程后停止
        if (curr->func(curr, mode, sync, key) &&

            (flags & WQ_FLAG_EXCLUSIVE) &&

            !--nr_exclusive)

            break;

    }

}
//默认的唤醒函数
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync, void *key)

{

    task_t *p = curr->task;

    return try_to_wake_up(p, mode, sync);

}

try_to_wake_up是唤醒原语中核心部分,其具体代码如下:

/*p被唤醒的进程描述符.

**state被唤醒的进程的状态掩码

**sync禁止被唤醒的进程抢占本地CPU正在运行的进程
*/
static int try_to_wake_up(task_t * p, unsigned int state, int sync)

{

    int cpu, this_cpu, success = 0;

    unsigned long flags;

    long old_state;

    runqueue_t *rq;

#ifdef CONFIG_SMP

    unsigned long load, this_load;

    struct sched_domain *sd;

    int new_cpu;
#endif

    //关闭中断,并获取最后执行该进程的CPU(可能不同于本地CPU)的运行队列的锁
    rq = task_rq_lock(p, &flags);

    schedstat_inc(rq, ttwu_cnt);

    old_state = p->state;

    if (!(old_state & state))

        goto out;

    if (p->array)

        goto out_running;

    

    //最后执行该任务的CPU
    cpu = task_cpu(p);

    //本地CPU
    this_cpu = smp_processor_id();

    
/*对于多CPU系统,检查要被唤醒的进程是否应该从最近执行该进程的CPU的运行队列,

**转移到另外一个CPU的运行队列.
*/

#ifdef CONFIG_SMP

    if (unlikely(task_running(rq, p)))

        goto out_activate;

    new_cpu = cpu;

    if (cpu == this_cpu || unlikely(!cpu_isset(this_cpu, p->cpus_allowed)))

        goto out_set_cpu;

    load = source_load(cpu);

    this_load = target_load(this_cpu);

    /*

     * If sync wakeup then subtract the (maximum possible) effect of

     * the currently running task from the load of the current CPU:

     */

    if (sync)

        this_load -= SCHED_LOAD_SCALE;

    /* Don't pull the task off an idle CPU to a busy one */

    if (load < SCHED_LOAD_SCALE/2 && this_load > SCHED_LOAD_SCALE/2)

        goto out_set_cpu;

    new_cpu = this_cpu; /* Wake to this CPU if we can */

    /*

     * Scan domains for affine wakeup and passive balancing

     * possibilities.

     */

    for_each_domain(this_cpu, sd) {

        unsigned int imbalance;

        /*

         * Start passive balancing when half the imbalance_pct

         * limit is reached.

         */

        imbalance = sd->imbalance_pct + (sd->imbalance_pct - 100) / 2;

        if ((sd->flags & SD_WAKE_AFFINE) &&

                !task_hot(p, rq->timestamp_last_tick, sd)) {

            /*

             * This domain has SD_WAKE_AFFINE and p is cache cold

             * in this domain.

             */

            if (cpu_isset(cpu, sd->span)) {

                schedstat_inc(sd, ttwu_wake_affine);

                goto out_set_cpu;

            }

        } else if ((sd->flags & SD_WAKE_BALANCE) &&

                imbalance*this_load <= 100*load) {

            /*

             * This domain has SD_WAKE_BALANCE and there is

             * an imbalance.

             */

            if (cpu_isset(cpu, sd->span)) {

                schedstat_inc(sd, ttwu_wake_balance);

                goto out_set_cpu;

            }

        }

    }

    new_cpu = cpu; /* Could not wake to this_cpu. Wake to cpu instead */

out_set_cpu:

    schedstat_inc(rq, ttwu_attempts);

    new_cpu = wake_idle(new_cpu, p);

    if (new_cpu != cpu && cpu_isset(new_cpu, p->cpus_allowed)) {

        schedstat_inc(rq, ttwu_moved);

        set_task_cpu(p, new_cpu);

        task_rq_unlock(rq, &flags);

        /* might preempt at this point */

        rq = task_rq_lock(p, &flags);

        old_state = p->state;

        if (!(old_state & state))

            goto out;

        if (p->array)

            goto out_running;

        this_cpu = smp_processor_id();

        cpu = task_cpu(p);

    }

out_activate:
#endif /* CONFIG_SMP */

    if (old_state == TASK_UNINTERRUPTIBLE) {

        rq->nr_uninterruptible--;

        /*

         * Tasks on involuntary sleep don't earn

         * sleep_avg beyond just interactive state.

         */

        p->activated = -1;

    }

    /*

     * Sync wakeups (i.e. those types of wakeups where the waker

     * has indicated that it will leave the CPU in short order)

     * don't trigger a preemption, if the woken up task will run on

     * this cpu. (in this case the 'I will reschedule' promise of

     * the waker guarantees that the freshly woken up task is going

     * to be considered on this CPU.)

     */

     //将进程p加入目标CPU的可运行队列
    activate_task(p, rq, cpu == this_cpu);

    /*如果没有设置sync标志(表示允许抢占),且目标CPU不是本地CPU,则检查p是否比rq运行队列中当前进程的动态优先级高.

    **即(p)->prio < (rq)->curr->prio,如果是,则调用resched_task()抢占rq->curr。

    */

    if (!sync || cpu != this_cpu) {

        if (TASK_PREEMPTS_CURR(p, rq))

            /*在单CPU中,仅仅设置TIF_NEED_RESCHED标志.多CPU系统中,则检查相应标志,并使目标CPU重新调度

            */

            resched_task(rq->curr);

    }

    success = 1;

out_running:

    //设置进程的状态
    p->state = TASK_RUNNING;
out:

    //释放rq的锁,并打开本地中断
    task_rq_unlock(rq, &flags);

    return success;

}

#ifdef CONFIG_SMP
//多CPU系统
static void resched_task(task_t *p)

{

    int need_resched, nrpolling;

    BUG_ON(!spin_is_locked(&task_rq(p)->lock));

    /* minimise the chance of sending an interrupt to poll_idle() */

    nrpolling = test_tsk_thread_flag(p,TIF_POLLING_NRFLAG);

    need_resched = test_and_set_tsk_thread_flag(p,TIF_NEED_RESCHED);

    nrpolling |= test_tsk_thread_flag(p,TIF_POLLING_NRFLAG);

    

    if (!need_resched && !nrpolling && (task_cpu(p) != smp_processor_id()))

        //产生IPI,强制目标CPU重新调度
        smp_send_reschedule(task_cpu(p));

}
#else
//单CPU系统
static inline void resched_task(task_t *p)

{

    set_tsk_need_resched(p);

}
#endif

2.2.4、互斥等待

当调用wake_up唤醒等待队列时,等待队列上的所有进程都转置为可运行。在一些情况下,这种做法是正确的,比如等待某个特定的事件。但是在另外一些情况,可以提前知道只有一个被唤醒的进程能够成功的获取资源,比如等待临界区资源,其它的进程将再次睡眠。如果等待队列中的进程数量太大,将会严重影响系统性能,这就是所谓的thundering herd行为。为此,内核引入互斥等待,它与非互斥等待的区别如下:

(1) 当一个等待队列入口有 WQ_FLAG_EXCLUSEVE 标志置位, 它被添加到等待队列的尾部. 没有这个标志的入口项, 相反, 添加到开始。

(2) 当 wake_up 被在一个等待队列上调用, 它在唤醒第一个有 WQ_FLAG_EXCLUSIVE 标志的进程后停止。

这样,进行互斥等待的进程一次只唤醒一个。使一个进程进入互斥等待是调用prepare_to_wait_exclusive完成的。

//kernel/wait.c
void fastcall

prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)

{

    unsigned long flags;

    //互斥标志
    wait->flags |= WQ_FLAG_EXCLUSIVE;

    spin_lock_irqsave(&q->lock, flags);

    if (list_empty(&wait->task_list))

        __add_wait_queue_tail(q, wait);

    /*

     * don't alter the task state if this is just going to

      * queue an async wait queue callback

     */

    if (is_sync_wait(wait))

        set_current_state(state);

    spin_unlock_irqrestore(&q->lock, flags);

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Linux Kernel 内核 同步