您的位置:首页 > 产品设计 > UI/UE

work queues

2016-04-22 16:37 309 查看
work queue:

和tasklets类似 工作队列允许内核函数与激活;

并且由worker thread 线程来执行;

The main difference is that deferrable functions run in interrupt context while functions in work queues run in process context.

一个运行在进程上下文一个运行在中断上下文;中断上下文中不可能出现进程切换;可延迟函数和工作队列中的函数都不能访问进程的用户态地址空间;事实上可延迟函数被执行时不可能有任何正在运行的进程。工作队列也是由内核线程执行,因此也不存在他要访问用户空间;
the main data structure associated with a work queue is a description called workqueue_struct;is an array of nr_cpus  elements;the  maximum  number of  CPUs  in the  system ;the  filed  of  the workqueue _struct  structure below  ;

<span style="font-size:10px;">/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
unsigned int		flags;		/* I: WQ_* flags */
union {
struct cpu_workqueue_struct __percpu	*pcpu;
struct cpu_workqueue_struct		*single;
unsigned long				v;
} cpu_wq;				/* I: cwq's */
struct list_head	list;		/* W: list of all workqueues */

struct mutex		flush_mutex;	/* protects wq flushing */
int			work_color;	/* F: current work color */
int			flush_color;	/* F: current flush color */
atomic_t		nr_cwqs_to_flush; /* flush in progress */
struct wq_flusher	*first_flusher;	/* F: first flusher */
struct list_head	flusher_queue;	/* F: flush waiters */
struct list_head	flusher_overflow; /* F: flush overflow list */

mayday_mask_t		mayday_mask;	/* cpus requesting rescue */
struct worker		*rescuer;	/* I: rescue worker */

int			saved_max_active; /* W: saved cwq max_active */
const char		*name;		/* I: workqueue name */
#ifdef CONFIG_LOCKDEP
struct lockdep_map	lockdep_map;
#endif
};


/*
* The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
* work_struct->data are used for flags and thus cwqs need to be
* aligned at two's power of the number of flag bits.
*/
struct cpu_workqueue_struct {
struct global_cwq	*gcwq;		/* I: the associated gcwq */
struct workqueue_struct *wq;		/* I: the owning workqueue */
int			work_color;	/* L: current color */
int			flush_color;	/* L: flushing color */
int			nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
int			nr_active;	/* L: nr of active works */
int			max_active;	/* L: max active works */
struct list_head	delayed_works;	/* L: delayed works */
};

the  list_head  field  in the  cpu_workqueue_struct  is  head  of  a  doubly  linked  list  collection  the  pending  function  of  the  work queue ;

every  work  queue  represent  a  work_struct  data  structure.
 
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};


/*
* Global per-cpu workqueue.  There's one and only one for each cpu
* and all works are queued and processed here regardless of their
* target workqueues.
*/
struct global_cwq {
spinlock_t		lock;		/* the gcwq lock */
struct list_head	worklist;	/* L: list of pending works 所有的work_struct都挂载在这个链表上*/
unsigned int		cpu;		/* I: the associated cpu */
unsigned int		flags;		/* L: GCWQ_* flags */

int			nr_workers;	/* L: total number of workers */
int			nr_idle;	/* L: currently idle ones */

/* workers are chained either in the idle_list or busy_hash */
struct list_head	idle_list;	/* X: list of idle workers */
struct hlist_head	busy_hash[BUSY_WORKER_HASH_SIZE];
/* L: hash of busy workers */

struct timer_list	idle_timer;	/* L: worker idle timeout */
struct timer_list	mayday_timer;	/* L: SOS timer for dworkers */

struct ida		worker_ida;	/* L: for worker IDs */

struct task_struct	*trustee;	/* L: for gcwq shutdown */
unsigned int		trustee_state;	/* L: trustee state */
wait_queue_head_t	trustee_wait;	/* trustee wait */
struct worker		*first_idle;	/* L: first idle worker */
} ____cacheline_aligned_in_smp;

/*
/*
* The poor guys doing the actual heavy lifting.  All on-duty workers
* are either serving the manager role, on idle list or on busy hash.
*/
struct worker {
/* on idle list while idle, on busy hash table while busy */
union {
struct list_head	entry;	/* L: while idle */
struct hlist_node	hentry;	/* L: while busy */
};
//与工作者线程的状态有关,当工作者线程空闲时,使用entry,但忙碌时,使用哈希节点hentry,它们分别对应gcwq的idle_list和busy_hash[]。
struct work_struct	*current_work;	/* L: work being processed */
struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */所对应的cpu_workqueue_struct。
struct list_head	scheduled;	/* L: scheduled works */被调度的工作项,只有进入该链表,工作项才能被工作队列处理。
struct task_struct	*task;		/* I: worker task */被内核调度的线程实体,通过它参与到内核调度当中去,
struct global_cwq	*gcwq;		/* I: the associated gcwq */相关联的gcwq
/* 64 bytes boundary on 64bit, 32 on 32bit */
unsigned long		last_active;	/* L: last active timestamp */最后的活动时间,用于决定该worker的生命周期
unsigned int		flags;		/* X: flags */
int			id;		/* I: worker id */
struct work_struct	rebind_work;	/* L: rebind worker to cpu */
};


 the workqueue  from  Linux  2.6.0 to  2.6.19 are  different  from  Linux  2.6.20 

and later  ,Linux 2.6.36 have  change a lot  in it ,too;

虽然自从2.6.0之后,Linux对work queue进行了优化,但是kernel用到

create_workqueue的模块越来越多,

而调用create_workqueue会在每个cpu上都创建一个work_thread, 

每个cpu都分配一个cpu_workqueue_struct以及workqueue_struct,

而如果没被queue_work的话根本没机会工

作,这样仍然相当浪费内存资源,而且加重了cpu loading。另外,

同一个work queue上的每个work都是按照串行执行的,假如其中一个work

的调度程序睡眠了,那么后面的work也将无法工作。

内核中的每个工作队列都有一个专用的内核线程来为它服务,创建工作队列时,有 2 个选择,可选择系统范围内的 ST,也可选择每 CPU 一个内核线程的 MT,其接口如下:

清单 4. 创建工作队列


create_singlethread_workqueue(name)
 

create_workqueue(name)


其核心是每个工作队列都有专有的内核线程为其服务——


并发可管理工作队列 (Concurrency-managed workqueues)

自从2.6.36以后,work queue的机制发生了很大变化,所有的work queue都被合并成

一个work queue,work thread也不是和work queue一一关联,work何时工作紧紧按

照工作的重要性以及时间紧迫性来划分。

也就是说新机制是按照cpu数量来创建work thread,而不是work queue。

新的 cmwq 在实现上摒弃了这一点,不再有专有的线程与每个工作队列关联,事实上,

现在变成了 Online CPU number + 1 个线程池来为工作队列服务,这样将线程的管理权

实际上从工作队列的使用者交还给了内核。当一个工作项被创建以及排队,将在合适的

时机被传递给其中一个线程,而 cmwq 最有意思的改变是:被提交到相同工作队列,

相同 CPU 的工作项可能并发执行

其原则如下:

与原有的工作队列接口保持兼容,cmwq 只是更改了创建工作队列的接口,很容易移植

到新的接口。

工作队列共享 per-CPU 的线程池,提供灵活的并发级别而不再浪费大量的资源



自动平衡工作者线程池和并发级别,这样工作队列的用户不再需要关注如此多的细节。

the  starter  of  Linux  kernel  ,we will  invoke  the  init_workqueues();in  this  function  ,
at  last  kthread_create(trustee,gcwq,"workqueue_trustee/%d\n", cpu);)and  so on ;please  refer to /kernel/ workqueue.c ;

static int __init init_workqueues(void)
{
unsigned int cpu;
int i;

cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
在 新机制上,将所有的workqueue都排到gcwq上管理每个CPU都有一个gcwq
/* initialize gcwqs */
for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);

spin_lock_init(&gcwq->lock);
INIT_LIST_HEAD(&gcwq->worklist);        // init the list head for work queue
gcwq->cpu = cpu;                       //cpoy the cpu number
gcwq->flags |= GCWQ_DISASSOCIATED;

INIT_LIST_HEAD(&gcwq->idle_list);
for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
INIT_HLIST_HEAD(&gcwq->busy_hash[i]);

init_timer_deferrable(&gcwq->idle_timer);
gcwq->idle_timer.function = idle_worker_timeout;
gcwq->idle_timer.data = (unsigned long)gcwq;

setup_timer(&gcwq->mayday_timer, gcwq_mayday_timeout,
(unsigned long)gcwq);

ida_init(&gcwq->worker_ida);

gcwq->trustee_state = TRUSTEE_DONE;
init_waitqueue_head(&gcwq->trustee_wait);
}

/* create the initial worker */
for_each_online_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu);
struct worker *worker;

if (cpu != WORK_CPU_UNBOUND)
gcwq->flags &= ~GCWQ_DISASSOCIATED;
worker = create_worker(gcwq, true);//开机启动后创建的workthread在此处实现;
BUG_ON(!worker);
spin_lock_irq(&gcwq->lock);
start_worker(worker);//go    start
spin_unlock_irq(&gcwq->lock);
}

/* 创建系统开机后默认的workqueue,我们<span style="font-size: 13.3333px; font-family: Arial, Helvetica, sans-serif;">平常</span><span style="font-family: Arial, Helvetica, sans-serif; font-size: 12px;">调用的</span>
schedule_work()其实就是用的system_wq这个work queue,可
参考schedule_work()实现。*/
system_wq = alloc_workqueue("events", 0, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0);
BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
!system_unbound_wq || !system_freezable_wq);
return 0;
}
系统在启动的时候会为每个gcwq创建一个工作者线程管理者worker,这个worker就是用来管理gcwq上挂载的工作项;初始化gcwq过程当中会创建一个worker,而且worker中又有一个成员task_struct,因此我们可以想像的到在创建worker时会发生什么。
   在 新机制上,将所有的workqueue都排到gcwq上管理每个CPU都有一个gcwq;

/* 创建系统开机后默认的workqueue,平常我们调用的

schedule_work()其实就是用的system_wq这个work queue,可

参考schedule_work()实现。*/

 How to  realize  it  ? the  thinking  what  and  why ?

/**
* create_worker - create a new workqueue worker
* @gcwq: gcwq the new worker will belong to
* @bind: whether to set affinity to @cpu or not
*
* Create a new worker which is bound to @gcwq.  The returned worker
* can be started by calling start_worker() or destroyed using
* destroy_worker().
*
* CONTEXT:
* Might sleep.  Does GFP_KERNEL allocations.
*
* RETURNS:
* Pointer to the newly created worker.
*/
static struct worker *create_worker(struct global_cwq *gcwq, bool bind)
{
bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
struct worker *worker = NULL;
int id = -1;

spin_lock_irq(&gcwq->lock);
while (ida_get_new(&gcwq->worker_ida, &id)) {
spin_unlock_irq(&gcwq->lock);
if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
goto fail;
spin_lock_irq(&gcwq->lock);
}
spin_unlock_irq(&gcwq->lock);

worker = alloc_worker();
if (!worker)
goto fail;

worker->gcwq = gcwq;
worker->id = id;

if (!on_unbound_cpu)  //build  the  work thred ;
worker->task = kthread_create_on_node(worker_thread,
worker,
cpu_to_node(gcwq->cpu),
"kworker/%u:%d", gcwq->cpu, id);
else
worker->task = kthread_create(worker_thread, worker,
"kworker/u:%d", id);
if (IS_ERR(worker->task))
goto fail;

/*
* A rogue worker will become a regular one if CPU comes
* online later on.  Make sure every worker has
* PF_THREAD_BOUND set.
*/
if (bind && !on_unbound_cpu)
kthread_bind(worker->task, gcwq->cpu);
else {
worker->task->flags |= PF_THREAD_BOUND;
if (on_unbound_cpu)
worker->flags |= WORKER_UNBOUND;
}

return worker;
fail:
if (id >= 0) {
spin_lock_irq(&gcwq->lock);
ida_remove(&gcwq->worker_ida, id);
spin_unlock_irq(&gcwq->lock);
}
kfree(worker);
return NULL;
}
由kthread _create_on_node or  kthread_creat  创建的工作线程work_thread ;

**
* worker_thread - the worker thread function
* @__worker: self
*
* The gcwq worker thread function.  There's a single dynamic pool of
* these per each cpu.  These workers process all works regardless of
* their specific target workqueue.  The only exception is works which
* belong to workqueues with a rescuer which will be explained in
* rescuer_thread().
*/
static int worker_thread(void *__worker)
{
struct worker *worker = __worker;
struct global_cwq *gcwq = worker->gcwq;

/* tell the scheduler that this is a workqueue worker */
worker->task->flags |= PF_WQ_WORKER;<span style="color: rgb(102, 102, 102); font-family: 宋体, Arial; font-size: 16px; line-height: 26px; text-indent: 28px;"></span>//告诉调度器这是一个工作者线程<span style="word-wrap: break-word; color: rgb(102, 102, 102); font-size: 16px; line-height: 26px; text-indent: 28px; font-family: 宋体;"></span>
woke_up:
spin_lock_irq(&gcwq->lock);//锁住gcwq,每个cpu只有一个gcwq结构,该CPU的其他线程同样有可能对它进行改变,因此这里需要加锁保护

/* DIE can be set only while we're idle, checking here is enough */
if (worker->flags & WORKER_DIE) {
spin_unlock_irq(&gcwq->lock);
worker->task->flags &= ~PF_WQ_WORKER;
return 0;
}
//首先让工作者线程的管理者从空闲状态退出,因为它现在要work了,这里的主要工作是清除worker的空闲状态标志,减少gcwq的nr_idle数量,并将worker的entry删除并重新初始化。
worker_leave_idle(worker);
recheck:
/* 如果有高优先级的work需要处理,而且当前已经没有空闲的workthread可以来处理掉这个高优先级work,
那下一步就要创建新的workthread来处理掉,读者可自行分析need_more_woker()的实现。这里就体现了新机制对于高优先级先处理的方法。*/
/* no more worker necessary? */
if (!need_more_worker(gcwq))//这里会对gcwq进行检查:gcwq上的worklist上是否挂载有未处理的工作项,如果没有,
说明当前工作者线程无事可做,睡眠。检查当前CPU的gcwq的worker如果有更高优先级的工作要处理,且系统的全局队列中已经没有空闲的worker了,
那么这时应该需要一个新的worker。
goto sleep;
/*新建一个workthread,可以看出,新的机制已经不想老的那样不管如何情况只要creatework queue就创建work thread,
浪费内存资源。在manage_workers() -> maybe_create_worker ()-> create_worker (), create_worker()前面分析过了,
它会creatework thread!*/
/* do we need to manage? */
if (unlikely(!may_start_working(gcwq)) && manage_workers(worker))
goto recheck;

/*
* ->scheduled list can only be filled while a worker is
* preparing to process a work or actually processing it.
* Make sure nobody diddled with it while I was sleeping.
*/
BUG_ON(!list_empty(&worker->scheduled));

/*
* When control reaches this point, we're guaranteed to have
* at least one idle worker or that someone else has already
* assumed the manager role.
*/
worker_clr_flags(worker, WORKER_PREP);

do {
struct work_struct *work =
list_first_entry(&gcwq->worklist,
struct work_struct, entry);
/* 在创建里新的work thead去处理高优先级的work之后,终于轮到处理自己的work了。核心在process_one_work().*/
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work);
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(gcwq));

worker_set_flags(worker, WORKER_PREP, false);
sleep: 
if (unlikely(need_to_manage_workers(gcwq)) && manage_workers(worker))
goto recheck; /*在休眠之前,再一次判断当前有没有新的work需要处理。所以即使本work睡眠了,其他work也可以继续工作,
这样就不会存在老的机制那样一个worksleep会阻塞其他work执行。*/

/*
* gcwq->lock is held and there's no work to process and no
* need to manage, sleep.  Workers are woken up only while
* holding gcwq->lock or from local cpu, so setting the
* current state before releasing gcwq->lock is enough to
* prevent losing any event.
*/
worker_enter_idle(worker);
__set_current_state(TASK_INTERRUPTIBLE);
spin_unlock_irq(&gcwq->lock);
schedule();
goto woke_up;  //实现循环;
}
/**
* process_one_work - process single work
* @worker: self
* @work: work to process
*
* Process @work.  This function contains all the logics necessary to
* process a single work including synchronization against and
* interaction with other workers on the same cpu, queueing and
* flushing.  As long as context requirement is met, any worker can
* call this function to process a work.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock) which is released and regrabbed.
*/
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&gcwq->lock)
__acquires(&gcwq->lock)
{
struct cpu_workqueue_struct *cwq = get_work_cwq(work);
struct global_cwq *gcwq = cwq->gcwq;
struct hlist_head *bwh = busy_worker_head(gcwq, work);
bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE;
work_func_t f = work->func;
int work_color;
struct worker *collision;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct from
* inside the function that is called from it, this we need to
* take into account for lockdep too.  To avoid bogus "held
* lock freed" warnings as well as problems when looking into
* work->lockdep_map, make a copy and use that here.
*/
struct lockdep_map lockdep_map = work->lockdep_map;
#endif
/*
* A single work shouldn't be executed concurrently by
* multiple workers on a single cpu.  Check whether anyone is
* already processing the work.  If so, defer the work to the
* currently executing one.
*/
collision = __find_worker_executing_work(gcwq, bwh, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}

/* claim and process */
debug_work_deactivate(work);
hlist_add_head(&worker->hentry, bwh);
worker->current_work = work;
worker->current_cwq = cwq;
work_color = get_work_color(work);

/* record the current cpu number in the work data and dequeue */
set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry);

/*
* If HIGHPRI_PENDING, check the next work, and, if HIGHPRI,
* wake up another worker; otherwise, clear HIGHPRI_PENDING.
*/
if (unlikely(gcwq->flags & GCWQ_HIGHPRI_PENDING)) {
struct work_struct *nwork = list_first_entry(&gcwq->worklist,
struct work_struct, entry);

if (!list_empty(&gcwq->worklist) &&
get_work_cwq(nwork)->wq->flags & WQ_HIGHPRI)
wake_up_worker(gcwq);
else
gcwq->flags &= ~GCWQ_HIGHPRI_PENDING;
}

/*
* CPU intensive works don't participate in concurrency
* management.  They're the scheduler's responsibility.
*/
if (unlikely(cpu_intensive))
worker_set_flags(worker, WORKER_CPU_INTENSIVE, true);

spin_unlock_irq(&gcwq->lock);

work_clear_pending(work);
lock_map_acquire_read(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
trace_workqueue_execute_start(work);
f(work);
/*
* While we must be careful to not use "work" after this, the trace
* point will only record its address.
*/
trace_workqueue_execute_end(work);
lock_map_release(&lockdep_map);
lock_map_release(&cwq->wq->lockdep_map);

if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
"%s/0x%08x/%d\n",
current->comm, preempt_count(), task_pid_nr(current));
printk(KERN_ERR "    last function: ");
print_symbol("%s\n", (unsigned long)f);
debug_show_held_locks(current);
dump_stack();
}

spin_lock_irq(&gcwq->lock);

/* clear cpu intensive status */
if (unlikely(cpu_intensive))
worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

/* we're done with it, release */
hlist_del_init(&worker->hentry);
worker->current_work = NULL;
worker->current_cwq = NULL;
cwq_dec_nr_in_flight(cwq, work_color, false);
}

/**
* start_worker - start a newly created worker
* @worker: worker to start
*
* Make the gcwq aware of @worker and start it.
*
* CONTEXT:
* spin_lock_irq(gcwq->lock).
*/
static void start_worker(struct worker *worker)
{
worker->flags |= WORKER_STARTED;
worker->gcwq->nr_workers++;
worker_enter_idle(worker);
wake_up_process(worker->task);
}


/**
* worker_enter_idle - enter idle state
* @worker: worker which is entering idle state
*
* @worker is entering idle state. Update stats and idle timer if
* necessary.
*
* LOCKING:
* spin_lock_irq(gcwq->lock).
*/
static void worker_enter_idle(struct worker *worker)
{
struct global_cwq *gcwq = worker->gcwq;

BUG_ON(worker->flags & WORKER_IDLE);
BUG_ON(!list_empty(&worker->entry) &&
(worker->hentry.next || worker->hentry.pprev));

/* can't use worker_set_flags(), also called from start_worker() */
worker->flags |= WORKER_IDLE;
gcwq->nr_idle++;
worker->last_active = jiffies;

/* idle_list is LIFO */
list_add(&worker->entry, &gcwq->idle_list);//加入 gcwq中的idle_list

if (likely(!(worker->flags & WORKER_ROGUE))) {
if (too_many_workers(gcwq) && !timer_pending(&gcwq->idle_timer))
mod_timer(&gcwq->idle_timer,
jiffies + IDLE_WORKER_TIMEOUT);
} else
wake_up_all(&gcwq->trustee_wait);

/* sanity check nr_running */
WARN_ON_ONCE(gcwq->nr_workers == gcwq->nr_idle &&
atomic_read(get_gcwq_nr_running(gcwq->cpu)));
}

list_add(&worker->entry, &gcwq->idle_list);//加入 gcwq中的idle_list


创建work queue:

Work thread如何处理掉work已经分析完了,然而对于前面init_workqueues()提到的system_wq是如何得到的&为什么说work thread不依赖于work queue了?
#define alloc_workqueue(name, flags, max_active)            \
__alloc_workqueue_key((name), (flags),(max_active), NULL, NULL)
structworkqueue_struct *__alloc_workqueue_key(const char *name,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
unsigned int cpu;

/*
*Workqueues which may be used during memory reclaim should
*have a rescuer to guarantee forward progress.
*/
/*WQ_MEM_RECLAIM表示当前内存资源是否紧张,都要执行我这个work.*/
if (flags & WQ_MEM_RECLAIM)
flags |= WQ_RESCUER;

/*
*Unbound workqueues aren't concurrency managed and should be
*dispatched to workers immediately.
*/
/* WQ_UNBOUND 表示work不依赖于如何CPU,可以在任意CPU上运行。*/
if (flags & WQ_UNBOUND)
flags |= WQ_HIGHPRI;
/* max_active 限制任意一个CPU上能同时执行的最大work数量。*/
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active =wq_clamp_max_active(max_active, flags, name);
/* 分配 workqueue_struct,将当前workqueue相对应的信息如name, flags等保存起来,其实我们已经知道,在workthread中,这些信息会被用到。*/
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
goto err;
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->flush_mutex);
atomic_set(&wq->nr_cwqs_to_flush,0);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
wq->name = name;
lockdep_init_map(&wq->lockdep_map,lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
if (alloc_cwqs(wq) < 0)
goto err;
/* 初始化per cpu上的cpu_workqueue_struct信息。*/
for_each_cwq_cpu(cpu, wq) {
struct cpu_workqueue_struct *cwq =get_cwq(cpu, wq);
struct global_cwq *gcwq =get_gcwq(cpu);
BUG_ON((unsigned long)cwq &WORK_STRUCT_FLAG_MASK);
cwq->gcwq = gcwq;
cwq->wq = wq;
cwq->flush_color = -1;
cwq->max_active = max_active;
INIT_LIST_HEAD(&cwq->delayed_works);
}

if (flags & WQ_RESCUER) {
struct worker *rescuer;
if(!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
goto err;
wq->rescuer = rescuer =alloc_worker();
if (!rescuer)
goto err;
rescuer->task =kthread_create(rescuer_thread, wq, "%s", name);
if (IS_ERR(rescuer->task))
goto err;
rescuer->task->flags |=PF_THREAD_BOUND;
wake_up_process(rescuer->task);
}

/*
*workqueue_lock protects global freeze state and workqueues
*list.  Grab it, set max_activeaccordingly and add the new
*workqueue to workqueues list.
*/
spin_lock(&workqueue_lock);
if (workqueue_freezing &&wq->flags & WQ_FREEZABLE)
for_each_cwq_cpu(cpu, wq)
get_cwq(cpu,wq)->max_active = 0;
/* 将当前wq添加到workqueues里去。*/
list_add(&wq->list,&workqueues);
spin_unlock(&workqueue_lock);
return wq;
err:
if (wq) {
free_cwqs(wq);
free_mayday_mask(wq->mayday_mask);
kfree(wq->rescuer);
kfree(wq);
}
return NULL;
}
EXPORT_SYMBOL_GPL(__alloc_workqueue_key);


新的机制虽然仍然保留了create_workqueue()和 create_singlethread_workqueue()这两个接口,但他们的实现最终其实调用的都是alloc_workqueue(),只是传的flags不一样。如前面所说,新机制的work queue里只有flags才会影响调度的顺序,

#definecreate_workqueue(name)                              \

alloc_workqueue((name),WQ_MEM_RECLAIM, 1)

#definecreate_freezable_workqueue(name)                \

alloc_workqueue((name),WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 1)

#definecreate_singlethread_workqueue(name)                  \
alloc_workqueue((name),WQ_UNBOUND | WQ_MEM_RECLAIM, 1)

有两种方法来创建工作项:

静态:DECLARE_WORK(struct work_struct *work,void(*func) (void *));

动态:INIT_WORK(struct work_struct *work,void(*func)(void *));

无论是静态还是动态的方法,它们的主要任务就是初始化好一个work_struct结构:

初始化work_struct的data和entry值,并且将func指向一个可执行函数:

do { \

__init_work((_work), _onstack); \

(_work)->data = (atomic_long_t) WORK_DATA_INIT();
\

INIT_LIST_HEAD(&(_work)->entry); \

PREPARE_WORK((_work), (_func)); \

} while (0);

当一个工作项被初始化好之后,表明它可以进入工作状态了,这时候我们就可以调度它了,如果我们把它交给系统默认线程去执行:

Schedule_work(work);

{

return queue_work(system_wq, work);

}

从代码可知,当调度执行工作项的时候,其实是去queue_work(system_wq,work),而从前面的初始化gcwqs过程中可知,system_wq是在系统启动过程中创建的一个普通的工作队列,也就是说,我们这里初始化的工作项会交给

统启动过程中的普通工作队列的工作者线程去处理。

当然,但我们有足够的理由需要去创建一个新的工作队列的时候我们可以:
Create_workqueue(const char *name);
创建一个名字为name的工作队列;
#define create_workqueue(name)
\
alloc_workqueue((name), WQ_MEM_RECLAIM, 1) //在初始化gcwqs时使用同样的方式创建了几个默认的系统工作队列。

挂起work:

再看work queue如何触发work:

queue_work –> queue_work_on-> __queue_work
static void__queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
struct list_head *worklist;
unsigned int work_flags;
unsigned long flags;

debug_work_activate(work);

/* if dying, only works from the sameworkqueue are allowed */
if (unlikely(wq->flags & WQ_DYING)&&
WARN_ON_ONCE(!is_chained_work(wq)))
return;

/* determine gcwq to use */
/* 根据flags获取相应gcwq*/
if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq;

if (unlikely(cpu ==WORK_CPU_UNBOUND))
cpu =raw_smp_processor_id();

/*
* It's multi cpu.  If @wq is non-reentrant and @work
* was previously on a different cpu, it mightstill
* be running there, in which case the workneeds to
* be queued on that cpu to guaranteenon-reentrance.
*/
gcwq = get_gcwq(cpu);
if (wq->flags &WQ_NON_REENTRANT &&
(last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
struct worker *worker;

spin_lock_irqsave(&last_gcwq->lock,flags);

worker =find_worker_executing_work(last_gcwq, work);

if (worker &&worker->current_cwq->wq == wq)
gcwq = last_gcwq;
else {
/* meh... notrunning there, queue here */
spin_unlock_irqrestore(&last_gcwq->lock,flags);
spin_lock_irqsave(&gcwq->lock,flags);
}
} else
spin_lock_irqsave(&gcwq->lock,flags);
} else {
gcwq = get_gcwq(WORK_CPU_UNBOUND);
spin_lock_irqsave(&gcwq->lock,flags);
}

/* gcwq determined, get cwq and queue */
cwq = get_cwq(gcwq->cpu, wq);
trace_workqueue_queue_work(cpu, cwq,work);

BUG_ON(!list_empty(&work->entry));

cwq->nr_in_flight[cwq->work_color]++;
work_flags =work_color_to_flags(cwq->work_color);

if (likely(cwq->nr_active <cwq->max_active)) {
trace_workqueue_activate_work(work);
cwq->nr_active++;
worklist =gcwq_determine_ins_pos(gcwq, cwq);
} else {
work_flags |= WORK_STRUCT_DELAYED;
worklist =&cwq->delayed_works;
}
/* 将当前work放到队列上等待执行。*/
insert_work(cwq, work, worklist,work_flags);

spin_unlock_irqrestore(&gcwq->lock,flags);
}

static voidinsert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work,struct list_head *head,
unsigned int extra_flags)
{
struct global_cwq *gcwq = cwq->gcwq;

/* we own @work, set data and link */
set_work_cwq(work, cwq, extra_flags);

/*
*Ensure that we get the right work->data if we see the
*result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();

list_add_tail(&work->entry, head);

/*
*Ensure either worker_sched_deactivated() sees the above
*list_add_tail() or we see zero nr_running to avoid workers
*lying around lazily while there are works to be processed.
*/
smp_mb();
/* 如果当前有高优先级的work或者已经没有空闲的workthread了,well,那就再创建一个workthread来处理。*/
if (__need_more_worker(gcwq))
wake_up_worker(gcwq);
}

工作队列是另一种将工作推后执行的形式,它可以把工作交给一个内核线程去执行,这个下半部是在进程上下文中执行的,因此,它可以重新调度还有睡眠。

    区分使用软中断/tasklet还是工作队列比较简单,如果推后的工作不需要睡眠,那么就选择软中断或tasklet,但如果需要一个可以重新调度,可以睡眠,可以获取内存,可以获取信号量,可以执行阻塞式I/O操作时,那么,请选择工作队列吧!

    在老的内核当中(2.6.36之前)工作队列是内核创建一个专门的工作者线程,它有一条任务链表,当设备或者内核某进程有部分任务需要推后处理的时候就把任务挂载在工作者线程的任务链表上,然后会在未来的某个时刻,工作者线程被调度,它就会去处理挂载在任务链表上的任务了。

    内核有一个缺省的工作者线程叫events/n,n是处理器的编号:每个处理器对应一个线程。如单处理器只有一个events/0,而双处理器会多出一个events/1。当然,我们也可以创建我们自己的工作者线程。假如有某个任务会被频繁的触发,那么便可以为它创建一个专门的工作者线程,比如触摸屏CTP。

    然而在2.6.36之后的内核当中对工作队列子系统作了改变,采用的机制改变为并发管理工作队列机制(Concurrency Managed Workqueue (cmwq))。在原来的机制当中,当kernel需要创建一个workqueue(create_workqueue()方式)的时候,它会在每一个cpu上创建一个work_thread,为每一个cpu分配一个struct cpu_workqueue_struct,随着Kernel创建越来越多的workqueue,这将占用大量的的内存资源,并且加重了进程调度的任务量。而在新的工作队列机制中它不再在每次create_workqueue时都为workqueue创建一个work thread,而是在系统启动的时候给每个cpu创建一个work thread,当有任务项work_struct需要处理时,系统会将任务项work_struct交给某个处理器的work thread上去处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: