您的位置:首页 > 其它

转载_工作队列实现机制

2013-04-03 13:30 441 查看
walle project android 2.2 and linux 2.6.32

一、工作项、工作队列和工作者线程

把推后执行的任务叫做工作(work),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events。

工作队列(work queue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。

通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则:

如果推后执行的任务需要睡眠,那么只能选择工作队列;

如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时;

如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程;

如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。

实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。

二、工作队列使用

相关文件:

kernel/include/linux/workqueue.h

Kernel/kernel/workqueue.c

要使用工作队列,需要先创建工作项,有两种方式:

静态创建:

DECLARE_WORK(name, function); 定义正常执行的工作项

DECLARE_DELAYED_WORK(name, function); 定义延后执行的工作项

eg:

@ kernel/driver/input/keyboard/mt6516_kpd.c

@ mtk/src/custom/common/kernel/touchpanel/st1332/driver.c

static void kpd_switch_backlight(struct work_struct *work);

static DECLARE_WORK(kpd_backlight_work, kpd_switch_backlight);

static void st1332_kpd_switch_backlight(struct delayed_work *work);

static DECLARE_DELAYED_WORK(kpd_backlight_work, st1332_kpd_switch_backlight);

动态创建,运行时创建:

eg:

@ kernel/driver/input/touchscreen/tspad.c

static struct work_struct work;

struct delayed_work led_work;

static void new_ts_work(struct work_struct *work);

static void s0340_ledtime_scanf(unsigned long data);

通常在probe()函数中执行下面的操作来初始化工作项:

INIT_WORK(&work, new_ts_work);

INIT_DELAYED_WORK(&led_work, s0340_ledtime_scanf);

工作队列待执行的函数原型是:

typedef void (*work_func_t)(struct work_struct *work);

这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。

创建了工作项之后,在适当的时候可以通过下面的两种方式来提交工作项给工作者线程,通常我们使用的工作队列和工作者线程都是系统初始化时候默认创建的。

schedule_work(&work) ;

&work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。

schedule_delayed_work(&delay_work, delay);

&delay_work指向的 delay_work 直到 delay 指定的时钟节拍用完以后才会执行。

eg :

schedule_delayed_work(&kpd_backlight_work, msecs_to_jiffies(300));

三、默认工作队列和工作者线程创建过程

系统默认的工作队列名称是:keventd_wq, 默认的工作者线程叫:events/n, 这里的n是处理器的编号, 每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。

默认的工作者线程会从多个地方得到被推后的工作。许多内核驱动程序都把它们的下半部交给默认的工作者线程去做。除非一个驱动程序或者子系统必须建立一个属于它自己的内核线程,否则最好使用默认线程。不过并不存在什么东西能够阻止代码创建属于自己的工作者线程。如果你需要在工作者线程中执行大量的处理操作,这样做或许会带来好处。处理器密集型和性能要求严格的任务会因为拥有自己的工作者线程而获得好处。

默认的工作队列keventd_wq只有一个,但是其工作者线程在每一个cpu上都有。而标记为singlethread的工作者线程最存在于一个cpu上。

关于默认工作队列keventd_wq和工作者线程events/n的建立在文件Kernel/kernel/workqueue.c中实现。

Start_kernel() --> rest_init(), 该函数中创建了两个内核线程kernel_init和kthreadd,这两个线程都和本文描述的部分有关系,先说说kernel_init。

kernel_init() --> do_basic_setup() --> init_workqueues(), 该函数中创建了上面提到的默认工作队列和工作者线程。

init_workqueues() -->

--> hotcpu_notifier(workqueue_cpu_callback, 0);

--> keventd_wq = create_workqueue("events");

注册的cpu通知链cpu_chain上的回调函数是workqueue_cpu_callback(), raw_notifier_call_chain()函数用来调用cpu_chain上的所有回调函数。

这里主要关注的是函数:create_workqueue("events");

@ kernel/include/linux/workqueue.h

#define __create_workqueue(name, singlethread, freezeable, rt) /

__create_workqueue_key((name), (singlethread), (freezeable), (rt), /

NULL, NULL)

#define create_workqueue(name) __create_workqueue((name), 0, 0, 0)

#define create_rt_workqueue(name) __create_workqueue((name), 0, 0, 1)

#define create_freezeable_workqueue(name) __create_workqueue((name), 1, 1, 0)

#define create_singlethread_workqueue(name) __create_workqueue((name), 1, 0, 0)

从宏__create_workqueue的参数可以看出,可以通过传递不同的参数: 是否单cpu线程,是否可冻结,是否实时来创建不同类型的工作队列和工作者线程。

work_struct工作项结构体定义:@ kernel/include/linux/workqueue.h

struct work_struct;

typedef void (*work_func_t)(struct work_struct *work);

struct work_struct {

atomic_long_t data;

#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */

#define WORK_STRUCT_FLAG_MASK (3UL)

#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)

struct list_head entry;

work_func_t func;

#ifdef CONFIG_LOCKDEP

struct lockdep_map lockdep_map;

#endif

};

工作队列workqueue_struct结构体: @ kernel/kernel/workqueue.c

/*

* The per-CPU workqueue (if single thread, we always use the first

* possible cpu).

*/

struct cpu_workqueue_struct {

spinlock_t lock; // __queue_work(), run_workqueue()

struct list_head worklist; // __queue_work(), run_workqueue()

wait_queue_head_t more_work; // 定义一个等待队列头 run_workqueue()

struct work_struct *current_work; // run_workqueue()

struct workqueue_struct *wq; // init_cpu_workqueue()

struct task_struct *thread; // create_workqueue_thread()

} ____cacheline_aligned;

/*

* The externally visible workqueue abstraction is an array of

* per-CPU workqueues:

*/

struct workqueue_struct {

struct cpu_workqueue_struct *cpu_wq;

// __create_workqueue_key(), init_cpu_workqueue()

struct list_head list; // __create_workqueue_key()

const char *name; // __create_workqueue_key()

int singlethread; // __create_workqueue_key()

int freezeable; /* Freeze threads during suspend */ // __create_workqueue_key()

int rt; // __create_workqueue_key()

#ifdef CONFIG_LOCKDEP

struct lockdep_map lockdep_map; // __create_workqueue_key()

#endif

};

关键函数__create_workqueue_key()分析:

struct workqueue_struct *__create_workqueue_key(const char *name,

int singlethread,

int freezeable,

int rt,

struct lock_class_key *key,

const char *lock_name)

{

struct workqueue_struct *wq;

struct cpu_workqueue_struct *cwq;

int err = 0, cpu;

wq = kzalloc(sizeof(*wq), GFP_KERNEL);

if (!wq)

return NULL;

wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

if (!wq->cpu_wq) {

kfree(wq);

return NULL;

}

wq->name = name;

lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);

wq->singlethread = singlethread;

wq->freezeable = freezeable;

wq->rt = rt;

INIT_LIST_HEAD(&wq->list);

if (singlethread) { // 创建单模块线程

cwq = init_cpu_workqueue(wq, singlethread_cpu); note -1

//初始化cpu_workqueue_struct结构体 cwq

// singlethread_cpu -- the first cpu in a cpumask

err = create_workqueue_thread(cwq, singlethread_cpu); note 0

// p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); note 1

// trace_workqueue_creation(cwq->thread, cpu); note 2

start_workqueue_thread(cwq, -1); // run this thread

} else {

cpu_maps_update_begin();

/*

* We must place this wq on list even if the code below fails.

* cpu_down(cpu) can remove cpu from cpu_populated_map before

* destroy_workqueue() takes the lock, in that case we leak

* cwq[cpu]->thread.

*/

spin_lock(&workqueue_lock);

list_add(&wq->list, &workqueues);

spin_unlock(&workqueue_lock);

/*

* We must initialize cwqs for each possible cpu even if we

* are going to call destroy_workqueue() finally. Otherwise

* cpu_up() can hit the uninitialized cwq once we drop the

* lock.

*/

for_each_possible_cpu(cpu) {// 为每个cpu都建立一个对应的线程

cwq = init_cpu_workqueue(wq, cpu);

if (err || !cpu_online(cpu))

continue;

err = create_workqueue_thread(cwq, cpu);

start_workqueue_thread(cwq, cpu);

}

cpu_maps_update_done();

}

if (err) {

destroy_workqueue(wq);

wq = NULL;

}

return wq;

}

Note -1: @ kernel/kernel/workqueue.c

static struct cpu_workqueue_struct *

init_cpu_workqueue(struct workqueue_struct *wq, int cpu)

{

struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);

cwq->wq = wq;

spin_lock_init(&cwq->lock);

INIT_LIST_HEAD(&cwq->worklist); // 初始化工作项列表, 使用时提交的工作项都是挂接在这个链表上的

init_waitqueue_head(&cwq->more_work);

// 初始化等待队列头

return cwq;

}

Note 0: @ kernel/kernel/workqueue.c

static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)

{

struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };

struct workqueue_struct *wq = cwq->wq;

const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";

struct task_struct *p;

p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);

// fmt - 线程命名格式; cpu -- cpu number; cwq -- 传递的参数

// 线程函数: worker_thread()

/*

* Nobody can add the work_struct to this cwq,

* if (caller is __create_workqueue)

* nobody should see this wq

* else // caller is CPU_UP_PREPARE

* cpu is not on cpu_online_map

* so we can abort safely.

*/

if (IS_ERR(p))

return PTR_ERR(p);

if (cwq->wq->rt)

sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);

// 是否需要设置实时属性

cwq->thread = p;

// cpu_workqueue_struct.thread 中记录返回线程的task_struct结构体

trace_workqueue_creation(cwq->thread, cpu);

return 0;

}

Note 1 : @ kernel/kernel/kthread.c 这里会牵扯到内核线程的创建机制,可以扩展一下

static DEFINE_SPINLOCK(kthread_create_lock);

static LIST_HEAD(kthread_create_list);

struct kthread_create_info

{

/* Information passed to kthread() from kthreadd. */

int (*threadfn)(void *data);

void *data;

/* Result passed back to kthread_create() from kthreadd. */

struct task_struct *result;

struct completion done;

struct list_head list;

};

/**

* kthread_create - create a kthread.

* @threadfn: the function to run until signal_pending(current).

* @data: data ptr for @threadfn.

* @namefmt: printf-style name for the thread.

*

* Description: This helper function creates and names a kernel

* thread. The thread will be stopped: use wake_up_process() to start

* it. See also kthread_run(), kthread_create_on_cpu().

*

* When woken, the thread will run @threadfn() with @data as its

* argument. @threadfn() can either call do_exit() directly if it is a

* standalone thread for which noone will call kthread_stop(), or

* return when 'kthread_should_stop()' is true (which means

* kthread_stop() has been called). The return value should be zero

* or a negative error number; it will be passed to kthread_stop().

*

* Returns a task_struct or ERR_PTR(-ENOMEM).

*/

struct task_struct *kthread_create(int (*threadfn)(void *data),

void *data,

const char namefmt[],

...)

{

struct kthread_create_info create;

create.threadfn = threadfn;

create.data = data;

init_completion(&create.done); // 初始化完成量

spin_lock(&kthread_create_lock);

list_add_tail(&create.list, &kthread_create_list);

// 将新建的kthread挂接到全局的线程链表kthread_create_list中

spin_unlock(&kthread_create_lock);

wake_up_process(kthreadd_task);

// kthreadd_task = find_task_by_pid_ns(pid, &init_pid_ns); @ kernel/init/main.c // rest_init()中初始化,该指针保存的是线程kthreadd的task_struct结构体指针。

// 唤醒线程kthreadd

wait_for_completion(&create.done);

// 等待完成量,我们这里转到kthreadd线程的执行函数中去看一下,这个完成量的唤醒应该是在kthreadd线程中做的,kthreadd线程应该是根据kthread_create_list上挂接的kthread_create_info结构体来创建特定线程。

// 这部分关于内核线程创建的机制请阅读分析文档:内核线程创建 目录中的相关文件和内核源码。这里不再详细分析。

// 新线程创建ok后,进入了睡眠,然后唤醒了对应的完成量create.done,这边继续执行

if (!IS_ERR(create.result)) {

// create.result中保存的是新创建内核线程的task_struct结构体指针

struct sched_param param = { .sched_priority = 0 };

va_list args;

va_start(args, namefmt);

vsnprintf(create.result->comm, sizeof(create.result->comm),

namefmt, args); // 设置当前线程的名字

// 名字的格式来源于函数上层上层调用函数,这里是来源于工作队列创建函数create_workqueue_thread()中:@ kernel/kernel/workqueue.c

// const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";

va_end(args);

/*

* root may have changed our (kthreadd's) priority or CPU mask.

* The kernel thread should not inherit these properties.

*/

sched_setscheduler_nocheck(create.result, SCHED_NORMAL, ¶m);

// 调度策略设置

set_cpus_allowed_ptr(create.result, cpu_all_mask);

}

return create.result; // 返回的是新建内核线程的task_struct结构体指针

}

kthread_create()函数通过专门创建线程的内核线程kthreadd创建了公用线程kthread,而在该kthread线程函数中调用其参数传递进来的回调函数threadfn(),这个threadfn()函数就是我们调用kthread_create()函数时传递进来的第一个参数,第二个参数则是执行回调函数时的参数。该函数原形如下:

struct task_struct *kthread_create(int (*threadfn)(void *data),

void *data,

const char namefmt[],

...);

调用示例:

kthread_create(worker_thread, cwq, fmt, wq->name, cpu);

这个线程创建ok之后,会在线程kthread中调用函数worker_thread(cwq);

worker_thread()函数如下,是每一个工作者线程的共用的线程函数。其实工作队列对应的数据结构是workqueue_struct,而该结构体中包含一个对应cpu的数据结构cpu_workqueue_struct,这个数据结构中包含了工作项链表worklist。而所有的工作者线程,只是名字不一样而已,所跑的线程函数都是一样:worker_thread。

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq = __cwq;

DEFINE_WAIT(wait); // 定义一个等待队列项wait

// @ kernel/include/linux/wait.h

if (cwq->wq->freezeable)

set_freezable(); // current->flags &= ~PF_NOFREEZE;

for (;;) {

prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);

// 可中断睡眠

// 准备进入睡眠等待,wait加入cwq->more_work等待队列头中,

// 设置非独占进程标志

// 和可中断睡眠标志 @ kernel/kernel/wait.c

if (!freezing(current) &&

!kthread_should_stop() &&

list_empty(&cwq->worklist))//

// 当前进程是非冻结状态,当前线程没停止,同时工作项列表为空

// 的时候进入睡眠让出cpu

schedule();

finish_wait(&cwq->more_work, &wait);// 当前线程被唤醒后马上要做的事情

try_to_freeze();

if (kthread_should_stop())// 检查当前线程是否被要求stop

break;

run_workqueue(cwq); // 运行工作项中对应的函数

}

return 0;

}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

在分析函数run_workqueue()之前,我们先来看一下,提交工作项的时候发生了什么事情。还是从函数int schedule_work(struct work_struct *work);开始说起吧!

@ kernel/kernel/workqueue.c

int schedule_work(struct work_struct *work)

{

return queue_work(keventd_wq, work);

}

keventd_wq工作队列是在函数init_workqueues()中创建的(参看前文),所有这里在提交工作项的时候就用上了。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

int ret;

ret = queue_work_on(get_cpu(), wq, work);

put_cpu();

return ret;

}

该函数将work工作项提交到当前做该项提交的cpu上的工作队列wq上,如果这个cpu被标记为die,那么可以提交到别的cpu上去执行。返回0,表示该项工作已经提交过,还没执行。非0表示提交成功。

int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)

{

int ret = 0;

if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

BUG_ON(!list_empty(&work->entry));

__queue_work(wq_per_cpu(wq, cpu), work);

ret = 1;

}

return ret;

}

@ kernel/include/linux/workqueue.h

#define work_data_bits(work) ((unsigned long *)(&(work)->data))

在work_struct结构体的第一个word中保留该标识,宏也在该结构体中定义。

test_and_set_bit(int nr, volatile void *addr) 将*addr的第nr位设置为1,并返回它的原值。

http://www.shangshuwu.cn/index.php/Linux%E5%86%85%E6%A0%B8%E7%9A%84%E5%8E%9F%E5%AD%90%E6%93%8D%E4%BD%9C Linux内核的原子操作

工作项在初始化的时候会调用WORK_DATA_INIT()宏来将work_struct的data域初始化成0,所有这里!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))结果为1。

static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)

{

unsigned long flags;

spin_lock_irqsave(&cwq->lock, flags);

insert_work(cwq, work, &cwq->worklist);

spin_unlock_irqrestore(&cwq->lock, flags);

}

static void insert_work(struct cpu_workqueue_struct *cwq,

struct work_struct *work, struct list_head *head)

{

trace_workqueue_insertion(cwq->thread, work);

set_wq_data(work, cwq); // 设置work_struct的pending未决标志

/*

* Ensure that we get the right work->data if we see the

* result of list_add() below, see try_to_grab_pending().

*/

smp_wmb();// 多处理器的相关动作

list_add_tail(&work->entry, head); // 工作项加入链表

wake_up(&cwq->more_work);// 唤醒等待在该等待队列头上的所有等待队列项

}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

static void run_workqueue(struct cpu_workqueue_struct *cwq)

{

spin_lock_irq(&cwq->lock);

while (!list_empty(&cwq->worklist)) {

struct work_struct *work = list_entry(cwq->worklist.next,

struct work_struct, entry);

work_func_t f = work->func; // 取出工作项函数

#ifdef CONFIG_LOCKDEP

/*

* It is permissible to free the struct work_struct

* from inside the function that is called from it,

* this we need to take into account for lockdep too.

* To avoid bogus "held lock freed" warnings as well

* as problems when looking into work->lockdep_map,

* make a copy and use that here.

*/

struct lockdep_map lockdep_map = work->lockdep_map;

#endif

trace_workqueue_execution(cwq->thread, work);

cwq->current_work = work;

list_del_init(cwq->worklist.next);// 从链表中删除工作项节点

spin_unlock_irq(&cwq->lock);

BUG_ON(get_wq_data(work) != cwq);

work_clear_pending(work);

lock_map_acquire(&cwq->wq->lockdep_map);

lock_map_acquire(&lockdep_map);

f(work);

// 执行对应的工作项函数,将work_struct结构体指针作为参数传递进去

lock_map_release(&lockdep_map);

lock_map_release(&cwq->wq->lockdep_map);

if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {

printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "

"%s/0x%08x/%d/n",

current->comm, preempt_count(),

task_pid_nr(current));

printk(KERN_ERR " last function: ");

print_symbol("%s/n", (unsigned long)f);

debug_show_held_locks(current);

dump_stack();

}

spin_lock_irq(&cwq->lock);

cwq->current_work = NULL;

}

spin_unlock_irq(&cwq->lock);

}

我们在新建工作项的时候,需要将工作函数的参数设置成work_struct 结构体指针,例如:

static void sitronix_ts_work(struct work_struct *work);

INIT_WORK(&priv->work, sitronix_ts_work);

XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

虽然以上内容是通过创建系统默认的工作队列keventd_wq和工作者线程events/n来分析了其创建过程,提交工作项过程和提交工作后唤醒工作者线程之后的所做的动作。

其实我们自己也可以使用这些接口来创建独立的工作队列和工作者线程来专门为特定的任务服务,例如在android和linux的睡眠和唤醒架构中就使用这种方式,@ kernel/kernel/power/wakelock.c

core_initcall(wakelocks_init)在wakelocks_init()函数中有创建两个工作队列和其对于的工作者线程:

sys_sync_work_queue = create_singlethread_workqueue("fs_sync");

suspend_work_queue = create_singlethread_workqueue("suspend");

early suspend的时候调用: @ kernel/kernel/power/earlysuspend.c

static DECLARE_WORK(early_sys_sync_work, early_sys_sync);

queue_work(sys_sync_work_queue, &early_sys_sync_work);

static DECLARE_WORK(early_suspend_work, early_suspend);

queue_work(suspend_work_queue, &early_suspend_work);

suspend的时候调用: @ kernel/kernel/power/wakelock.c

static DECLARE_WORK(suspend_work, suspend);

queue_work(suspend_work_queue, &suspend_work);

下面来看一看延时执行的工作项是如何提交的,这里和上面共同的部分不讨论,只讨论如何实现的延时执行,其余部分是相同的。

delayed_work结构体的定义:@ kernel/include/linux/workqueue.h

struct delayed_work {

struct work_struct work;

struct timer_list timer;

// 对work_struct结构体进行了封装,添加了一个timer_list结构体

};

#define DECLARE_DELAYED_WORK(n, f) /

struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f)

#define __DELAYED_WORK_INITIALIZER(n, f) { /

.work = __WORK_INITIALIZER((n).work, (f)), /

.timer = TIMER_INITIALIZER(NULL, 0, 0), /

} // 初始化work_struct结构体和前文方式一样,这里需要多初始化timer域。

@ kernel/include/linux/timer.h

#define TIMER_INITIALIZER(_function, _expires, _data) { /

.entry = { .prev = TIMER_ENTRY_STATIC }, /

.function = (_function), /

.expires = (_expires), /

.data = (_data), /

.base = &boot_tvec_bases, /

__TIMER_LOCKDEP_MAP_INITIALIZER( /

__FILE__ ":" __stringify(__LINE__)) /

}

通常情况下使用的定义一个定时器也是调用该宏来初始化:

#define DEFINE_TIMER(_name, _function, _expires, _data) /

struct timer_list _name = /

TIMER_INITIALIZER(_function, _expires, _data)

提交一个延时执行的工作项使用函数:

int schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)

{

return queue_delayed_work(keventd_wq, dwork, delay);

} // delay - 单位是jiffies,或者传递0的话,就是立即执行和schedule_work()一样了

// @ kernel/kernel/timer.c文件中有实现一些time to jiffies的函数:

// msecs_to_jiffied() 、 usecs_to_jiffies()等

int queue_delayed_work(struct workqueue_struct *wq,

struct delayed_work *dwork, unsigned long delay)

{

if (delay == 0) // 如果传递进来的delay是0,那么走立即执行的通路

return queue_work(wq, &dwork->work);

return queue_delayed_work_on(-1, wq, dwork, delay);

}

int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,

struct delayed_work *dwork, unsigned long delay)

{

int ret = 0;

struct timer_list *timer = &dwork->timer;

struct work_struct *work = &dwork->work;

// test_and_set_bit()设置特定位并传回该位原来的值

// 如果未决位为0,设置pending未决位后返回0

if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {

BUG_ON(timer_pending(timer));

BUG_ON(!list_empty(&work->entry));

timer_stats_timer_set_start_info(&dwork->timer);

/* This stores cwq for the moment, for the timer_fn */

set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));

timer->expires = jiffies + delay; // 到时时间阀值

timer->data = (unsigned long)dwork; // 向定时执行函数传递的参数

timer->function = delayed_work_timer_fn; // 定时执行函数

if (unlikely(cpu >= 0))

add_timer_on(timer, cpu);

else

add_timer(timer); // 向系统添加一个timer

ret = 1;

}

return ret;

}

static void delayed_work_timer_fn(unsigned long __data)

{

struct delayed_work *dwork = (struct delayed_work *)__data;

struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);

struct workqueue_struct *wq = cwq->wq;

__queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);

}

看到函数__queue_work()是不是觉得很眼熟呢?没错,延时执行的工作项走的提交路线和正常提交工作项在该函数之前不一样,后面后市一样了。换句话说,提交延时工作项,只是延时提交了而已,并不是立即提交给工作者线程,让其工作者线程延时来执行。

其余函数介绍:

void flush_workqueue(struct workqueue_struct *wq);

此函数刷新指定工作队列,他会一直等待,知道该工作队列中所有工作项都已完成。

void flush_scheduled_work(void);

和上面函数类似,只是刷新默认工作队列:keventd_wq。

void flush_delayed_work(struct delayed_work *dwork);

等待一个delayed_work执行完。

int flush_work(struct work_struct *work);

等待一个work执行完。

如何取消提交的延时工作项?

cancel_work_sync(struct work_struct *work);

该函数取消已排在工作队列中的未决work,返回true。如果work的callback已经在运行了,那么该函数将会阻塞到其执行完毕。

static inline int __cancel_delayed_work(struct delayed_work *work)

{

int ret;

ret = del_timer(&work->timer);

if (ret)

work_clear_pending(&work->work);

return ret;

}

// if it returns 0 the timer function may be running and the queueing is in progress.

static inline int cancel_delayed_work(struct delayed_work *work)

{

int ret;

ret = del_timer_sync(&work->timer); // 阻塞直到定时函数执行完

if (ret)

work_clear_pending(&work->work);

return ret;

}

// 同上

三、工作队列新老版本比较

http://liaowb1234.blog.163.com/blog/static/77155547200911296838120/

这篇网文已有详细的说明,请参考。

参考网址:

http://blog.csdn.net/yuanfeng5721/archive/2011/04/29/6371502.aspx

http://hi.baidu.com/zengzhaonong/blog/item/7437cfa2f169a8a3caefd049.html

http://blog.csdn.net/kenxausten/archive/2010/12/14/6074862.aspx

Linux2.6.32源码

linux work queue及并发可管理工作队列

并发可管理工作队列的出现

慢工作机制

为什么说是“提供过内核中还曾短暂出现过慢工作机制 (slow work mechanism)”,原因是在 mainline内核中,曾经出现过慢工作机制 (slow work mechanism),但随着并发管理工作队列 (cmwq) 的出现,它已经全部被 cmwq所替换,淡出了 mainline

在内核代码中,经常希望延缓部分工作到将来某个时间执行,这样做的原因很多,比如:在持有锁时做大量(或者说费时的)工作不合适;或希望将工作聚集以获取批处理的性能;或调用了一个可能导致睡眠的函数使得在此时执行新调度非常不合适等。

内核中提供了许多机制来提供延迟执行,如中断的下半部处理可延迟中断上下文中的部分工作;定时器可指定延迟一定时间后执行某工作;工作队列则允许在进程上下文环境下延迟执行等。除此之外,内核中还曾短暂出现过慢工作机制 (slow work mechanism),还有异步函数调用(asynchronous function calls)以及各种私有实现的线程池等。在上面列出的如此多的内核基础组件中,使用最多则是工作队列。

工作队列 (workqueues)

在讨论之前,先定义几个内核中使用工作队列时用到的术语方便后面描述。

workqueues:所有工作项被 ( 需要被执行的工作 ) 排列于该队列,因此称作工作队列 (workqueues) 。
worker thread:工作者线程 (worker thread) 是一个用于执行工作队列中各个工作项的内核线程,当工作队列中没有工作项时,该线程将变为 idle 状态。
single threaded(ST)::工作者线程的表现形式之一,在系统范围内,只有一个工作者线程为工作队列服务
multi threaded(MT):工作者线程的表现形式之一,在多 CPU 系统上每个 CPU 上都有一个工作者线程为工作队列服务

工作队列之所以成为使用最多的延迟执行机制,得益于它的实现中的一些有意思的地方:

使用的接口简单明了

对于使用者,基本上只需要做 3 件事情,依次为:

创建工作队列 ( 如果使用内核默认的工作队列,连这一步都可以省略掉 )
创建工作项
向工作队列中提交工作项

执行在进程上下文中,这样使得它可以睡眠,被调度及被抢占

执行在进程上下文中是一个非常大的优势,其他的下半部工作机制,基本上都运行于中断上下文中,我们知道在中断上下文里,不能睡眠,不能阻塞;原因是中断上下文并不与任何进程关联,如在中断上下文睡眠,调度器将不能将其唤醒,所以在中断上下文中不能有导致内核进入睡眠的行为,如持有信号量,执行非原子的内存分配等。工作队列运行于进程上下文中 ( 他们通过内核线程执行 ),因此它完全可以睡眠,可以被调度,也可以被其他进程所抢占。

在多核环境下的使用也非常友好

与 tasklet 机制相较而言,工作队列可以在不同 CPU 上同时运行是个优势。这使得该接口在多核情况下也非常适合,内核邮件列表中就曾经有过用软中断和工作队列来替换不支持多 CPU 执行的 tasklet 的讨论。

总体说来,工作队列和定时器函数的处理有点类似,都是延迟执行相关的回调函数,但和定时器处理函数不同的是定时器回调函数只执行一次 (当然可以在执行时再次注册以反复调用,但这需要显示的再次注册 ), 且执行定时器回调函数时在时钟中断环境 ,限制较多,因此回调函数不能太复杂;而工作队列是通过内核线程实现,一直有效,可重复执行,执行时可以休眠,因此工作队列非常适合处理那些不是很紧急的任务,如垃圾回收处理等。

工作队列的使用和一些缺陷

注意

在2.6.20 之前,创建工作项的接口并不是这个样子,而是在 2.6.20之时对工作队列的接口进行过一次“瘦身”,其原因则非常简单,工作队列的使用越来越多,能节省一个字节对于内核也是一件好事情,这次瘦身也将工作项在创建时就明确区分为一般的工作项和需要延迟某段时间再执行的工作项

之前简单讨论了工作队列使用上的便利性,依据工作队列的使用步骤,在下面列出了在 2.6.36之前提供的接口,并描述了使用时的一些选择。由于工作队列的实现中,已有默认的共享工作队列,因此在选择接口时,就出现了 2种选择:要么使用内核已经提供的共享工作队列,要么自己创建工作队列。

如选择使用共享的工作队列,基本的步骤为:

1. 创建工作项

创建工作项的接口分为静态和动态方式,接口分别是:

清单 1. 静态创建工作项

typedef void (*work_func_t)(struct work_struct *work);

DECLARE_WORK(name, func);
DECLARE_DELAYED_WORK(name, func);
该系列宏静态创建一个以 name 命名的工作项,并设置了回调函数 func

清单 2. 动态创建工作项

INIT_WORK(struct work_struct work, work_func_t func);
PREPARE_WORK(struct work_struct work, work_func_t func);
INIT_DELAYED_WORK(struct delayed_work work, work_func_t func);
PREPARE_DELAYED_WORK(struct delayed_work work, work_func_t func);
该系列宏在运行时初始化工作项 work,并设置了回调函数 func

2. 调度工作项

清单 3. 调度工作项

int schedule_work(struct work_struct *work);
int schedule_delayed_work(struct delayed_work *work, unsigned long delay);
上面两个函数将工作项添加到共享的工作队列,工作项随后在某个合适时机将被执行。

如果因为某些原因,如需要执行的是个阻塞性质的任务而不愿或不能使用内核提供的共享工作队列,这时需要自己创建工作队列,则上述步骤和使用的接口则略有改变:

3. 创建工作队列

在 2.6.36 之前,内核中的每个工作队列都有一个专用的内核线程来为它服务,创建工作队列时,有 2 个选择,可选择系统范围内的 ST,也可选择每 CPU 一个内核线程的 MT,其接口如下:

清单 4. 创建工作队列

create_singlethread_workqueue(name)
create_workqueue(name)
相对于create_singlethread_workqueue,create_workqueue 同样会分配一个 wq的工作队列。不同之处在于,对于多 CPU 系统而言,对每一个 active 的 CPU,都会为之创建一个 per-CPU 的 cwq结构,对应每一个 cwq,都会生成一个新的 worker_thread。

4. 创建工作项

创建工作项的接口和使用内核默认的共享工作队列时是一样的。

向工作队列提交工作项

清单 5. 向工作队列中提交工作项

int queue_work(workqueue_t *queue, work_t *work);
int queue_delayed_work(workqueue_t *queue, work_t *work, unsigned long delay);
它们都会将工作项 work 提交到工作队列queue,但第二个函数确保最少延迟 delay jiffies 之后该工作才会被执行。对于 MT 的情况,当用 queue_work 向cwq 上提交工作项节点时, 是哪个 active CPU 正在调用该函数,那么便向该 CPU 对应的 cwq 上的 worklist上增加工作项节点。

假如你需要取消一个挂起的工作队列中的工作项 , 你可以调用:

清单 6. 取消工作队列中挂起的工作项

int cancel_delayed_work(struct work_struct *work);
如果这个工作项在它开始执行前被取消,返回值是非零。内核保证给定工作项的执行不会在调用 cancel_delay_work 成功后被执行。 如果 cancel_delay_work 返回0,则这个工作项可能已经运行在一个不同的处理器,并且仍然可能在调用 cancel_delayed_work之后被执行。要绝对确保工作函数没有在 cancel_delayed_work 返回 0 后在任何地方运行,你必须跟随这个调用之后接着调用flush_workqueue。在 flush_workqueue 返回后。任何在改调用之前提交的工作函数都不会在系统任何地方运行。

当你结束对一个工作队列的使用后,你可以使用下面的函数释放相关资源:

清单 7. 释放工作队列

void destroy_workqueue(struct workqueue_struct *queue);
前面比较了工作队列与其他基于中断上下文的延迟机制之间的优势,但工作队列并非没有缺点。首先是公共的共享工作队列不能提供更多的好处,因为如果其中的任一工作项阻塞,则其他工作项将不能被执行,因此在实际的使用中,使用者多会自己创建工作队列,而这又导致下面的一些问题:

MT的工作队列导致了内核的线程数增加得非常的快,这样带来一些问题:一个是占用了 pid 数目,这对于服务器可不是一个好消息,因为 pid实际上是一种全局资源;而大量的工作线程对于资源的竞争也导致了无效的调度,而这些调度其实是不需要的,对调度器也带来了压力。
现有的工作队列机制某些情况下有导致死锁的倾向,特别是在两个工作项之间存在依赖时。如果你曾经调试过这种偶尔出现的死锁,会知道这种问题让人非常的沮丧。

并发可管理工作队列 (Concurrency-managed workqueues)

在2.6.36 之前的工作队列,其核心是每个工作队列都有专有的内核线程为其服务——系统范围内的 ST 或每个 CPU 都有一个内核线程的MT。新的 cmwq 在实现上摒弃了这一点,不再有专有的线程与每个工作队列关联,事实上,现在变成了 Online CPU number + 1个线程池来为工作队列服务,这样将线程的管理权实际上从工作队列的使用者交还给了内核。当一个工作项被创建以及排队,将在合适的时机被传递给其中一个线程,而 cmwq 最有意思的改变是:被提交到相同工作队列,相同 CPU 的工作项可能并发执行,这也是命名为并发可管理工作队列的原因。

cmwq 的实现遵循了以下几个原则:

与原有的工作队列接口保持兼容,cmwq 只是更改了创建工作队列的接口,很容易移植到新的接口。
工作队列共享 per-CPU 的线程池,提供灵活的并发级别而不再浪费大量的资源。
自动平衡工作者线程池和并发级别,这样工作队列的用户不再需要关注如此多的细节。

在工作队列的用户眼中,cmwq 与之前的工作队列相比,创建工作队列的接口实现的后端有所改变,现在的新接口为:

清单 8. cmwq 中创建工作队列的后端接口

struct workqueue_struct
*alloc_workqueue(char *name, unsigned int flags, int max_active);
其中:

name:为工作队列的名字,而不像 2.6.36 之前实际是为工作队列服务的内核线程的名字。

flag 指明工作队列的属性,可以设定的标记如下:

WQ_NON_REENTRANT:默认情况下,工作队列只是确保在同一 CPU 上不可重入,即工作项不能在同一 CPU上被多个工作者线程并发执行,但容许在多个 CPU 上并发执行。但该标志标明在多个 CPU上也是不可重入的,工作项将在一个不可重入工作队列中排队,并确保至多在一个系统范围内的工作者线程被执行。
WQ_UNBOUND:工作项被放入一个由特定 gcwq 服务的未限定工作队列,该客户工作者线程没有被限定到特定的 CPU,这样,未限定工作者队列就像简单的执行上下文一般,没有并发管理。未限定的 gcwq 试图尽可能快的执行工作项。
WQ_FREEZEABLE:可冻结 wq 参与系统的暂停操作。该工作队列的工作项将被暂停,除非被唤醒,否者没有新的工作项被执行。
WQ_MEM_RECLAIM:所有的工作队列可能在内存回收路径上被使用。使用该标志则保证至少有一个执行上下文而不管在任何内存压力之下。
WQ_HIGHPRI:高优先级的工作项将被排练在队列头上,并且执行时不考虑并发级别;换句话说,只要资源可用,高优先级的工作项将尽可能快的执行。高优先工作项之间依据提交的顺序被执行。
WQ_CPU_INTENSIVE:CPU 密集的工作项对并发级别并无贡献,换句话说,可运行的 CPU 密集型工作项将不阻止其它工作项。这对于限定得工作项非常有用,因为它期望更多的 CPU 时钟周期,所以将它们的执行调度交给系统调度器。

代码的迁移

在之前的代码中,一些用户依赖于 ST 中的严格执行顺序,这种行为在 cmwq 中可以将 max_active 设为 1,flag 设置为 WQ_UNBOUND 来获得相同的行为

max_active:决定了一个 wq 在 per-CPU 上能执行的最大工作项。比如 max_active 设置为 16 表示一个工作队列上最多 16个工作项能同时在 per-CPU 上同时执行。当前实行中,对所有限定工作队列,max_active 的最大值是 512,而设定为 0 时表示是256;而对于未限定工作队列,该最大值为:MAX[512,4 * num_possible_cpus()],除非有特别的理由需要限流或者其它原因,一般设定为 0 就可以了。

cmwq 本质上是提供了一个公共的内核线程池的实现,其接口基本上和以前保持了兼容,只是更改了创建工作队列的函数的后端,它实际上是将工作队列和内核线程的一一绑定关系改为由内核来管理内核线程的创建,因此在 cmwq 中创建工作队列并不意味着一定会创建内核线程。

而之前的接口的则改为基于 alloc_workqueue 来实现。

清单 9. 基于新后端接口的实现

#define create_workqueue(name) 					 \
alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
#define create_freezeable_workqueue(name) 			 \
alloc_workqueue((name), WQ_FREEZEABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define create_singlethread_workqueue(name) 			 \
alloc_workqueue((name), WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
调度器中的 hook 函数

为了知道工作者线程何时将睡眠或被唤醒,在内核中增加了一个 PF_WQ_WORKER 类型的标记,表明是工作者线程,并且添加了 2 个 hook 函数到当前的调度器中。

清单 10. 调度器中的 hook 函数

void wq_worker_waking_up(struct task_struct *task, unsigned int cpu);
struct task_struct *wq_worker_sleeping(struct task_struct *task, unsigned int cpu);
其中 wq_worker_waking_up在一个工作者线程被唤醒时在 try_to_wake_up/try_to_wake_up_local 中被调用。而wq_worker_sleeping 则在 schedule () 中被调用,表明该工作者线程将会睡眠,返回值是一个 task,它可在相同的CPU 上被 try_to_wake_up_local 用来唤醒。现在 2 个 hook函数都是硬编码在内核的调度器中,后续可能会以其它形式改变其实现方式。

并发可管理工作队列的后端 gcwq

在 cmwq 的实现中,最重要的是其后端 gcwq:

清单 11. gcwq

/*
* Global per-cpu workqueue.  There's one and only one for each cpu
* and all works are queued and processed here regardless of their
* target workqueues.
*/
struct global_cwq {
spinlock_t 		 lock; 		 /* the gcwq lock */
struct list_head 	 worklist; 	 /* L: list of pending works */
unsigned int 		 cpu; 		 /* I: the associated cpu */
unsigned int 		 flags; 		 /* L: GCWQ_* flags */

int 			 nr_workers; 	 /* L: total number of workers */
int 			 nr_idle; 	 /* L: currently idle ones */

/* workers are chained either in the idle_list or busy_hash */
struct list_head 	 idle_list; 	 /* X: list of idle workers */
struct hlist_head 	 busy_hash[BUSY_WORKER_HASH_SIZE];
/* L: hash of busy workers */

struct timer_list 	 idle_timer; 	 /* L: worker idle timeout */
struct timer_list 	 mayday_timer; 	 /* L: SOS timer for dworkers */

struct ida 		 worker_ida; 	 /* L: for worker IDs */

// 为了实现 CPU 热插拔时候的委托机制
struct task_struct 	 *trustee; 	 /* L: for gcwq shutdown */
unsigned int 		 trustee_state; 	 /* L: trustee state */
wait_queue_head_t 	 trustee_wait; 	 /* trustee wait */
struct worker 		 *first_idle; 	 /* L: first idle worker */
} ____cacheline_aligned_in_smp;
它用来管理线程池,其数量为每个 CPU 一个gcwq,还有一个特定的 gcwq 为未限定 (unbound) 工作队列的工作项服务。需要注意的是在 cmwq 中只有 Number ofonline CPU + 1 (unbound) 个线程池。由于计数从 0 开始,所以可能的线程池的数目最大为 NR_CPUS。由于涉及到CPU 的热插拔问题,因此只有 online 的 CPU 上才有线程池与之绑定。

该结构体中的一些重要字段如下:

worklist:所有未决的工作项被链接在该链表中

cpu:表明该线程池和哪个 CPU 绑定,实现中有一个未绑定到任何 CPU 的 gcwq,其标记为WORK_CPU_UNBOUND,在代码中,将这个未绑定到特定 CPU 的 gcwq 和绑定到 CPU 的 gcwq 一起处理,应此定义WORK_CPU_UNBOUND = NR_CPUS,这也是代码中的一个小小的技巧。

nr_workers:总的工作者线程数

nr_idle:当前的空闲工作者线程数

idle_list:空闲的工作者线程链接成该链表

busy_hash[BUSY_WORKER_HASH_SIZE] :正执行工作项任务的工作者线程放入该哈希表中

有了前面基础,我们可以开始看看 cmwq 的实现,根据以往的经验,从初始化部分开始:

清单 12. cmwq 的初始化

static int __init init_workqueues(void)
{
unsigned int cpu;
int i;
// 注册 CPU 事件的通知链,主要用于处理 CPU 热插拔时候,将该 CPU 上的工作队列迁移到 online 的 CPU
// 在 cmwq 中,将这种机制叫做 trustee
cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);

// 初始化 CPU 数目 +1 个 gcwq
for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
…… .
}

// 初始化 online CPU 数目 +1 个工作者线程池
// 创建的线程命名方式如下 :
// 对于与 CPU 绑定的线程,以 ps 命令看到的为:[kworker/cup_id:thread_id],cup_id 为 CPU 的编号
// thread_id 为创建的工作者线程 id,对于未绑定的 CPU 的线程池中的线程,则显示为
//[kworker/u:thread_id]
for_each_online_gcwq_cpu(cpu) {
…… .
worker = create_worker(gcwq, true);
…… .
start_worker(worker);
…… .
}

// 创建 4 个全局的工作队列
system_wq = alloc_workqueue("events", 0, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
……
return 0;
}
工作者线程池的管理

为了实现工作者线程池,针对每个工作者线程,封装了一个结构体 worker 用于工作者线程的管理,如下:

清单 13. 工作者的管理结构体

struct worker {
// 与工作者线程的状态有关系,如果工作者线程处于 idle 状态,则使用 entry;如果处于 busy 状态,
// 则使用哈希节点 hentry,参考 gcwq 中的 idle_list 和 busy_hash 字段
union {
struct list_head 	 entry; 	 /* L: while idle */
struct hlist_node 	 hentry; 	 /* L: while busy */
};
……
// 被调度的工作项 list,注意只有进入到该列表,工作项才真正被工作队列处理
struct list_head 	 scheduled; 	 /* L: scheduled works */

// 被内核调度的实体,工作者线程在内核调度器看来只是一个 task 而已
struct task_struct 	 *task; 		 /* I: worker task */
struct global_cwq 	 *gcwq; 		 /* I: the associated gcwq */
/* 64 bytes boundary on 64bit, 32 on 32bit */
// 记录上次 active 的时间,用于判定该工作者线程是否可以被 destory 时使用
unsigned long 		 last_active; 	 /* L: last active timestamp */
unsigned int 		 flags; 		 /* X: flags */

// 工作者线程的 id,用 ps 命令在用户空间可以看到具体的值
int 			 id; 		 /* I: worker id */
struct work_struct 	 rebind_work; 	 /* L: rebind worker to cpu */
};

未讨论的主题

本文没有讨论处理 CPU 热插拔和在内存回收路径的处理,这两种情况在 cmwq 中分别使用 trustee 和 rescurer 机制,有兴趣的读者可以自行参考代码或文档

工作者线程池的主体执行是 worker_thread,其执行流程如下:

清单 14. 工作者线程的管理

static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;

// 告诉调度器这是一个工作者线程
worker->task->flags |= PF_WQ_WORKER;
woke_up:
spin_lock_irq(&gcwq->lock);
……
// 让工作者从 idle 状态离开,因为新创建的工作者线程处于 idle 状态,在让该工作者线程工作时,需要从
// idle 状态离开以执行相关的动作
worker_leave_idle(worker);
recheck:

// 检查是否需要更多的工作者线程
// 检查的依据是如果有高优先级的工作,如果工作队列中有工作要做然而该 cpu 的全局队列中却已
// 经没有空闲处理内核线程,那就有必要处理了
if (!need_more_worker(gcwq))
goto sleep;

// may_start_working 检查 gcwq 中是否有 idle 的工作者线程
// manage_workers 在后面详述
if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
goto recheck;

// 确保工作者线程的被调度 list 为空
BUG_ON(!list_empty(&worker->scheduled));

// 设置标记表明工作者线程即将处理相关的工作项,类似于一个 busy 标记
worker_clr_flags(worker, WORKER_PREP);

// 基本流程为,先将工作项合并到工作者线程的被调度 list,然后依次处理被调度 list 的工作
do {
struct work_struct *work =
list_first_entry(&gcwq->worklist,
struct work_struct, entry);

if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
// 注意这里只是代码路径上的显示的优化,本质上并不需要该路径,else 的部分才是代码
// 逻辑的的所在,应此可以忽略这部分
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
// 将 gcwq 的工作项移到工作线程的被调度列表,随后工作者线程将依序处理被调度 list
// 处理单项工作项时使用的是 process_one_work
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(gcwq));

worker_set_flags(worker, WORKER_PREP, false);

// 如果没有工作项需要处理,让工作者线程进入睡眠状态
sleep:
……
}
manage_workers 中处理需要被destroy 的工作者线程,也决定是否需要创建新的工作者线程:在 maybe_destroy_workers中去判定当工作线程数目是否被认定太多 ( 认定工作者线程过多的本质是个策略问题,实现者认为如果 idle 的工作者多余 1/4 个 busy工作者就表示工作者线程过多 ),且该工作线程已经进入 idle 状态 5 分钟,则认定该工作者线程可以被 destroy;而maybe_create_worker 决定是否需要创建新的工作者线程来为工作队列服务,判定的条件为如果有高优先级的工作,或工作队列中有工作要做但该
CPU 的全局队列中却已经没有空闲处理内核线程,那就有必要去创建新的工作者线程了。

并发可管理工作队列的前景

并发可管理工作队列进入 mainline 的时间并不长,但已经快速替换了老的工作队列接口以及慢工作机制 (slow workmechanism),但这并不是它的唯一目标,它的长期目标则是希望在内核中提供一个通用的线程池机制,这样,工作队列的适用范围将更为普遍。

参考资料

学习

查看文章“sched: prepare for cmwq, take#2”,里面描述了在内核调度器中的 hook。

查看文章“Concurrency-managed workqueues”,Jonathan Corbet 详细的描述了 cmwq 出现的原有,并初步综述了 Tejun Heo 提出的解决方案的原理以及面临的挑战。

查看文章“Working on workqueues”,里面对新接口进行了解释。

参考 Concurrency Managed Workqueue (cmwq),这是 cmwq 的主要贡献者 Tejun Heo 对 cmwq 各方面的一个描述。

developerWorks Linux 专区 寻找为 Linux 开发人员(包括 Linux
新手入门)准备的更多参考资料,查阅我们 最受欢迎的文章和教程

在 developerWorks 上查阅所有 Linux 技巧 和 Linux
教程。

随时关注 developerWorks 技术活动网络广播
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: