您的位置:首页 > 运维架构 > Linux

work queue in Linux 2.3.36

2012-08-10 16:40 399 查看
新work queue工作机制

工作队列(workqueue)是Linux kernel中将工作推后执行的一种机制。这种机制和BH或Tasklets不同之处在于工作队列是把推后的工作交由一个内核线程去执行,因此工作队列的优势就在于它允许重新调度甚至睡眠。

Linux的work queue在2.6.0 到2.6.19以及到2.6.36工作队列发生了一些变化。本文主要对新版本做一些分析。

虽然自从2.6.0之后,Linux对work queue进行了优化,但是kernel用到create_workqueue的模块越来越多,而调用create_workqueue会在每个cpu上都创建一个work_thread, 每个cpu都分配一个cpu_workqueue_struct以及workqueue_struct,而如果没被queue_work的话根本没机会工作,这样仍然相当浪费内存资源,而且加重了cpu loading。另外,同一个work queue上的每个work都是按照串行执行的,假如其中一个work的调度程序睡眠了,那么后面的work也将无法工作。

自从2.6.36以后,work queue的机制发生了很大变化,所有的work queue都被合并成

一个work queue,work thread也不是和work queue一一关联,work何时工作紧紧按照工作的重要性以及时间紧迫性来划分。也就是说新机制是按照cpu数量来创建work thread,而不是work queue。

下面我们还是通过代码分析吧:

初始化workqueus及创建work threads:

系统启动时调用init_workqueus()@kernel/kernel/workqueue.c

static int __initinit_workqueues(void)
{
unsigned int cpu;
int i;

cpu_notifier(workqueue_cpu_callback,CPU_PRI_WORKQUEUE);

/* initialize gcwqs */
/* 前面有说过,新机制将workqueues都排到gcwq上管理了,
每个cpu各一个gcwq。*/
for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq =get_gcwq(cpu);
spin_lock_init(&gcwq->lock);
INIT_LIST_HEAD(&gcwq->worklist);
gcwq->cpu = cpu;
gcwq->flags |=GCWQ_DISASSOCIATED;
INIT_LIST_HEAD(&gcwq->idle_list);
for (i = 0; i <BUSY_WORKER_HASH_SIZE; i++)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
init_timer_deferrable(&gcwq->idle_timer);
gcwq->idle_timer.function =idle_worker_timeout;
gcwq->idle_timer.data =(unsigned long)gcwq;
setup_timer(&gcwq->mayday_timer,gcwq_mayday_timeout,
(unsigned long)gcwq);
ida_init(&gcwq->worker_ida);
gcwq->trustee_state =TRUSTEE_DONE;
init_waitqueue_head(&gcwq->trustee_wait);
}

/* create the initial worker */
for_each_online_gcwq_cpu(cpu) {
struct global_cwq *gcwq =get_gcwq(cpu);
struct worker *worker;
if (cpu != WORK_CPU_UNBOUND)
gcwq->flags &=~GCWQ_DISASSOCIATED;
/* 开机启动初始化后创建workthread主要是这里实现*/
worker = create_worker(gcwq,true);
BUG_ON(!worker);
spin_lock_irq(&gcwq->lock);
start_worker(worker);
spin_unlock_irq(&gcwq->lock);
}
/* 创建系统开机后默认的workqueue,平常我们调用的
schedule_work()其实就是用的system_wq这个work queue,可
参考schedule_work()实现。*/
system_wq =alloc_workqueue("events", 0, 0);
system_long_wq =alloc_workqueue("events_long", 0, 0);
system_nrt_wq =alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq =alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq =alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0);
BUG_ON(!system_wq || !system_long_wq ||!system_nrt_wq ||
!system_unbound_wq || !system_freezable_wq);
return 0;
}
early_initcall(init_workqueues);


再看create_worker是如何创建work thread的:

static structworker *create_worker(struct global_cwq *gcwq, bool bind)
{
bool on_unbound_cpu = gcwq->cpu ==WORK_CPU_UNBOUND;
struct worker *worker = NULL;
int id = -1;

spin_lock_irq(&gcwq->lock);
while (ida_get_new(&gcwq->worker_ida,&id)) {
spin_unlock_irq(&gcwq->lock);
if(!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
goto fail;
spin_lock_irq(&gcwq->lock);
}
spin_unlock_irq(&gcwq->lock);
/* 为work分配空间,初始化worker*/
worker = alloc_worker();
if (!worker)
goto fail;
worker->gcwq = gcwq;
worker->id = id;
/* kthread_create_on_node和 kthread_create都可以创建worker_thread,区别在于是否和cpu绑定,前者别是不依赖于CPU而工作,可以在任何CPU上工作,而后者表示分别在各个CPU上创建一个workthread来工作。从ps命令里就可以看到像kworker/0:0,kworker/1:0, kworker/u:0这样的进程就是这里创建的workthread了!*/
if (!on_unbound_cpu)
worker->task =kthread_create_on_node(worker_thread, worker,
cpu_to_node(gcwq->cpu),"kworker/%u:%d", gcwq->cpu, id);
else
worker->task =kthread_create(worker_thread, worker, "kworker/u:%d", id);
if (IS_ERR(worker->task))
goto fail;

/*
*A rogue worker will become a regular one if CPU comes
*online later on.  Make sure every workerhas
*PF_THREAD_BOUND set.
*/
if (bind && !on_unbound_cpu)
kthread_bind(worker->task,gcwq->cpu);
else {
worker->task->flags |=PF_THREAD_BOUND;
if (on_unbound_cpu)
worker->flags |=WORKER_UNBOUND;
}

return worker;
fail:
if (id >= 0) {
spin_lock_irq(&gcwq->lock);
ida_remove(&gcwq->worker_ida,id);
spin_unlock_irq(&gcwq->lock);
}
kfree(worker);
return NULL;
}


处理works:

由kthread_create_on_node()或 kthread_create()创建了work thread之后,它就开始运行起来了:

static intworker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq =worker->gcwq;

/* tell the scheduler that this is aworkqueue worker */
worker->task->flags |=PF_WQ_WORKER;
/* 最后的代码用gotowoke_up来表示work thread是一个无限循环。*/
woke_up:
spin_lock_irq(&gcwq->lock);
/* DIE can be set only while we're idle,checking here is enough */
if (worker->flags & WORKER_DIE) {
spin_unlock_irq(&gcwq->lock);
worker->task->flags &=~PF_WQ_WORKER;
return 0;
}
worker_leave_idle(worker);
recheck:
/* no more worker necessary? */
/* 如果有高优先级的work需要处理,而且当前已经没有空闲的workthread可以来处理掉这个高优先级work,那下一步就要创建新的workthread来处理掉,读者可自行分析need_more_woker()的实现。这里就体现了新机制对于高优先级先处理的方法。*/
if (!need_more_worker(gcwq))
goto sleep;
/*新建一个workthread,可以看出,新的机制已经不想老的那样不管如何情况只要creatework queue就创建work thread,浪费内存资源。在manage_workers() -> maybe_create_worker ()-> create_worker (), create_worker()前面分析过了,它会creatework thread!*/
/* do we need to manage? */
if (unlikely(!may_start_working(gcwq))&& manage_workers(worker))
goto recheck;
/*
*->scheduled list can only be filled while a worker is
*preparing to process a work or actually processing it.
*Make sure nobody diddled with it while I was sleeping.
*/
BUG_ON(!list_empty(&worker->scheduled));

/*
*When control reaches this point, we're guaranteed to have
*at least one idle worker or that someone else has already
*assumed the manager role.
*/
worker_clr_flags(worker, WORKER_PREP);

do {
struct work_struct *work =
list_first_entry(&gcwq->worklist,
struct work_struct, entry);
/* 在创建里新的work thead去处理高优先级的work之后,终于轮到处理自己的work了。核心在process_one_work().*/
if (likely(!(*work_data_bits(work)& WORK_STRUCT_LINKED))) {
/* optimization path, notstrictly necessary */
process_one_work(worker,work);
if(unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work,&worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(gcwq));
worker_set_flags(worker, WORKER_PREP,false);
sleep:
/*在休眠之前,再一次判断当前有没有新的work需要处理。所以即使本work睡眠了,其他work也可以继续工作,这样就不会存在老的机制那样一个worksleep会阻塞其他work执行。*/
if(unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
goto recheck;

/*
*gcwq->lock is held and there's no work to process and no
*need to manage, sleep.  Workers are wokenup only while
*holding gcwq->lock or from local cpu, so setting the
*current state before releasing gcwq->lock is enough to
*prevent losing any event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_INTERRUPTIBLE);
spin_unlock_irq(&gcwq->lock);
schedule();
goto woke_up;
}


再来看看系统如何将work给处理掉:

static voidprocess_one_work(struct worker *worker, struct work_struct *work)
__releases(&gcwq->lock)
__acquires(&gcwq->lock)
{
struct cpu_workqueue_struct *cwq =get_work_cwq(work);
struct global_cwq *gcwq = cwq->gcwq;
struct hlist_head *bwh =busy_worker_head(gcwq, work);
bool cpu_intensive = cwq->wq->flags& WQ_CPU_INTENSIVE;
/* 取出用户driver设置的函数*/
work_func_t f = work->func;
int work_color;
struct worker *collision;
#ifdefCONFIG_LOCKDEP
/*
*It is permissible to free the struct work_struct from
*inside the function that is called from it, this we need to
*take into account for lockdep too.  Toavoid bogus "held
*lock freed" warnings as well as problems when looking into
*work->lockdep_map, make a copy and use that here.
*/
struct lockdep_map lockdep_map =work->lockdep_map;
#endif
/*
*A single work shouldn't be executed concurrently by
*multiple workers on a single cpu.  Checkwhether anyone is
*already processing the work.  If so,defer the work to the
*currently executing one.
*/
collision =__find_worker_executing_work(gcwq, bwh, work);
if (unlikely(collision)) {
move_linked_works(work,&collision->scheduled, NULL);
return;
}

/* claim and process */
debug_work_deactivate(work);
hlist_add_head(&worker->hentry,bwh);
worker->current_work = work;
worker->current_cwq = cwq;
work_color = get_work_color(work);

/* record the current cpu number in thework data and dequeue */
set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry);

/*
*If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
*wake up another worker; otherwise, clear HIGHPRI_PENDING.
*/
/* 如果全局的gcwq有高优先级的work需要处理,唤醒它执行!*/
if (unlikely(gcwq->flags &GCWQ_HIGHPRI_PENDING)) {
struct work_struct *nwork = list_first_entry(&gcwq->worklist,
structwork_struct, entry);
if(!list_empty(&gcwq->worklist) &&
get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
/*唤醒高优先级的work所对应的workthread来工作。*/
wake_up_worker(gcwq);
else
gcwq->flags &=~GCWQ_HIGHPRI_PENDING;
}

/*
*CPU intensive works don't participate in concurrency
*management.  They're the scheduler'sresponsibility.
*/
/*如果当前有对时间敏感的work,那么如果有空闲的workthread的话,也要唤醒相应work thread来工作。*/
if (unlikely(cpu_intensive))
worker_set_flags(worker, WORKER_CPU_INTENSIVE,true);

spin_unlock_irq(&gcwq->lock);

work_clear_pending(work);
lock_map_acquire_read(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
trace_workqueue_execute_start(work);
/* 历经千辛万苦,终于跑到要调用的work functionpointer了!!!*/
f(work);
/*
*While we must be careful to not use "work" after this, the trace
*point will only record its address.
*/
/* 后面就是一些删除work,资源清楚释放,标志重设的工作了。*/
trace_workqueue_execute_end(work);
lock_map_release(&lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);

if (unlikely(in_atomic() ||lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG:workqueue leaked lock or atomic: "
"%s/0x%08x/%d\n",
current->comm, preempt_count(),task_pid_nr(current));
printk(KERN_ERR "    last function: ");
print_symbol("%s\n",(unsigned long)f);
debug_show_held_locks(current);
dump_stack();
}
spin_lock_irq(&gcwq->lock);
/* clear cpu intensive status */
if (unlikely(cpu_intensive))
worker_clr_flags(worker,WORKER_CPU_INTENSIVE);
/* we're done with it, release */
hlist_del_init(&worker->hentry);
worker->current_work = NULL;
worker->current_cwq = NULL;
cwq_dec_nr_in_flight(cwq, work_color,false);
}


创建work queue:

Work thread如何处理掉work已经分析完了,然而对于前面init_workqueues()提到的system_wq是如何得到的还不清楚,另外一个问题:为什么说work thread不依赖于work queue了,下面我们来分析alloc_workqueue():

#define alloc_workqueue(name, flags, max_active)            \
__alloc_workqueue_key((name), (flags),(max_active), NULL, NULL)
structworkqueue_struct *__alloc_workqueue_key(const char *name,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
unsigned int cpu;

/*
*Workqueues which may be used during memory reclaim should
*have a rescuer to guarantee forward progress.
*/
/*WQ_MEM_RECLAIM表示当前内存资源是否紧张,都要执行我这个work.*/
if (flags & WQ_MEM_RECLAIM)
flags |= WQ_RESCUER;

/*
*Unbound workqueues aren't concurrency managed and should be
*dispatched to workers immediately.
*/
/* WQ_UNBOUND 表示work不依赖于如何CPU,可以在任意CPU上运行。*/
if (flags & WQ_UNBOUND)
flags |= WQ_HIGHPRI;
/* max_active 限制任意一个CPU上能同时执行的最大work数量。*/
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active =wq_clamp_max_active(max_active, flags, name);
/* 分配 workqueue_struct,将当前workqueue相对应的信息如name, flags等保存起来,其实我们已经知道,在workthread中,这些信息会被用到。*/
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
goto err;
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
atomic_set(&wq->nr_cwqs_to_flush,0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
wq->name = name;
lockdep_init_map(&wq->lockdep_map,lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
if (alloc_cwqs(wq) < 0)
goto err;
/* 初始化per cpu上的cpu_workqueue_struct信息。*/
for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq =get_cwq(cpu, wq);
struct global_cwq *gcwq =get_gcwq(cpu);
BUG_ON((unsigned long)cwq &WORK_STRUCT_FLAG_MASK);
cwq->gcwq = gcwq;
cwq->wq = wq;
cwq->flush_color = -1;
cwq->max_active = max_active;
INIT_LIST_HEAD(&cwq->delayed_works);
}

if (flags & WQ_RESCUER) {
struct worker *rescuer;
if(!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
goto err;
wq->rescuer = rescuer =alloc_worker();
if (!rescuer)
goto err;
rescuer->task =kthread_create(rescuer_thread, wq, "%s", name);
if (IS_ERR(rescuer->task))
goto err;
rescuer->task->flags |=PF_THREAD_BOUND;
wake_up_process(rescuer->task);
}

/*
*workqueue_lock protects global freeze state and workqueues
*list.  Grab it, set max_activeaccordingly and add the new
*workqueue to workqueues list.
*/
spin_lock(&workqueue_lock);
if (workqueue_freezing &&wq->flags & WQ_FREEZABLE)
for_each_cwq_cpu(cpu, wq)
get_cwq(cpu,wq)->max_active = 0;
/* 将当前wq添加到workqueues里去。*/
list_add(&wq->list,&workqueues);
spin_unlock(&workqueue_lock);
return wq;
err:
if (wq) {
free_cwqs(wq);
free_mayday_mask(wq->mayday_mask);
kfree(wq->rescuer);
kfree(wq);
}
return NULL;
}
EXPORT_SYMBOL_GPL(__alloc_workqueue_key);


新的机制虽然仍然保留了create_workqueue()和 create_singlethread_workqueue()这两个接口,但他们的实现最终其实调用的都是alloc_workqueue(),只是传的flags不一样。如前面所说,新机制的work queue里只有flags才会影响调度的顺序,work queue已经不重要了。

#definecreate_workqueue(name)                              \
alloc_workqueue((name),WQ_MEM_RECLAIM, 1)
#definecreate_freezable_workqueue(name)                \
alloc_workqueue((name),WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 1)
#definecreate_singlethread_workqueue(name)                  \
alloc_workqueue((name),WQ_UNBOUND | WQ_MEM_RECLAIM, 1)


挂起work:

再看work queue如何触发work:

queue_work –> queue_work_on-> __queue_work

static void__queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned int work_flags;
unsigned long flags;

debug_work_activate(work);

/* if dying, only works from the sameworkqueue are allowed */
if (unlikely(wq->flags & WQ_DYING)&&
WARN_ON_ONCE(!is_chained_work(wq)))
return;

/* determine gcwq to use */
/* 根据flags获取相应gcwq*/
if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq;

if (unlikely(cpu ==WORK_CPU_UNBOUND))
cpu =raw_smp_processor_id();

/*
* It's multi cpu.  If @wq is non-reentrant and @work
* was previously on a different cpu, it mightstill
* be running there, in which case the workneeds to
* be queued on that cpu to guaranteenon-reentrance.
*/
gcwq = get_gcwq(cpu);
if (wq->flags &WQ_NON_REENTRANT &&
(last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
struct worker *worker;

spin_lock_irqsave(&last_gcwq->lock,flags);

worker =find_worker_executing_work(last_gcwq, work);

if (worker &&worker->current_cwq->wq == wq)
gcwq = last_gcwq;
else {
/* meh... notrunning there, queue here */
spin_unlock_irqrestore(&last_gcwq->lock,flags);
spin_lock_irqsave(&gcwq->lock,flags);
}
} else
spin_lock_irqsave(&gcwq->lock,flags);
} else {
gcwq = get_gcwq(WORK_CPU_UNBOUND);
spin_lock_irqsave(&gcwq->lock,flags);
}

/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);
trace_workqueue_queue_work(cpu, cwq,work);

BUG_ON(!list_empty(&work->entry));

cwq->nr_in_flight[cwq->work_color]++;
work_flags =work_color_to_flags(cwq->work_color);

if (likely(cwq->nr_active <cwq->max_active)) {
trace_workqueue_activate_work(work);
cwq->nr_active++;
worklist =gcwq_determine_ins_pos(gcwq, cwq);
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist =&cwq->delayed_works;
}
/* 将当前work放到队列上等待执行。*/
insert_work(cwq, work, worklist,work_flags);

spin_unlock_irqrestore(&gcwq->lock,flags);
}

static voidinsert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work,struct list_head *head,
unsigned int extra_flags)
{
struct global_cwq *gcwq = cwq->gcwq;

/* we own @work, set data and link */
set_work_cwq(work, cwq, extra_flags);

/*
*Ensure that we get the right work->data if we see the
*result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();

list_add_tail(&work->entry, head);

/*
*Ensure either worker_sched_deactivated() sees the above
*list_add_tail() or we see zero nr_running to avoid workers
*lying around lazily while there are works to be processed.
*/
smp_mb();
/* 如果当前有高优先级的work或者已经没有空闲的workthread了,well,那就再创建一个workthread来处理。*/
if (__need_more_worker(gcwq))
wake_up_worker(gcwq);
}


至此,对work queue的工作机制都分析完了。可以看出,新的机制相对来说更灵活,而且基本上不会浪费内存资源,导致系统过量负载。

或许,不久的将来,create_workqueue()接口都将不复存在….

Reference:

http://lwn.net/Articles/403891/

http://gqf2008.iteye.com/blog/447060

kernel/documentation/Workqueue.txt

2012/08/10
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: