Nuttx 工作队列 work queue
2017-04-19 14:26
190 查看
在Linux操作系统中,工作队列(work queue)是Linux kernel中将工作推后执行的一种机制。这种机制和BH或Tasklets不同之处在于工作队列是把推后的工作交由一个内核线程去执行,因此工作队列的优势就在于它允许重新调度甚至睡眠。
Nuttx操作系统中工作队列的实现和在Linux中类似,Nuttx在内核中创建了内核线程,用于调度执行工作队列中的任务,在工作队列中允许任务睡眠。
Nuttx中工作队列内核线程有高低两种优先级,高优先级的工作队列用于执行一些优先级比较高的任务,比如将内存buffer中的数据换出到sd卡上。低优先的任务做一些相对不是很紧迫的工作,比如内存片回收。
本节首先介绍工作中队列涉及到的数据结构,然后以高优先级工作队列为例,说明工作队列线程的创建,添加任务到工作队列,工作队列任务处理。
delay是线程poll的周期。每次当工作队列poll一次工作队列上的任务,睡眠delay时间。如果工作队列的处理方式采用周期性的poll方式,信号可以唤醒工作队列线程,如果线程处在休眠状态。如果工作队列的处理方式采用信号中断的方式,那么只有当工作队列线程收到信号之后才会从睡眠等待中被唤醒,否则一直处于睡眠状态。
链表头q将任务链接在链表上
worker数组中的每一项代表一个线程。高优先级工作队列只创建一个线程,低优先级的工作队列至少创建了一个工作线程。
注意: 为了展现代码逻辑,文中忽略了对临界资源的保护代码
pid记录了工作队列中某个线程的pid
当busy为真时,表明该线程当前处于运行状态,不能接受信号。
任务(work)链表。一个任务被添加到工作队列中后,连接到
worker是任务的回调函数,用于执行具体的任务。
arg是任务回调函数的参数
qtime记录任务插入工作队列的时间
delay控制任务插入队列后至少经过delay时间才能运行
初始化工作队列的poll周期
初始化工作队列的任务链表头
创建工作队列线程
初始化工作队列中工作所描述的线程的pid和线程的状态
高优先级的工作队列只创建了一个线程,线程的函数名
如果没有定义低优先级的工作队列,那么在高优先级的任务中会周期性的调用内存垃圾回收函数,整理内存碎片。
这个函数比较长,我们分为两部分看,第一部分是对延迟任务的处理。第二部分是任务线程进入睡眠,由睡眠状态到运行状态的处理方式。
首先看第一部分
参数period是工作队列线程期望的poll周期,先赋值给next。next是本次系统真实需要睡眠等待的时间。在第二部分中会用到。
stick 记录本次工作处理的开始时间
从工作队列链表中取出第一任务work(如果链表非空),计算任务从被插入链表到当前经过的时间,如果时间超过了任务期望的延迟时间,那么就要赶快处理该任务。因为任务处理中允许任务睡眠,所以当任务处理完之后,我们有必要再次从工作队列链表头开始寻找已经到期的任务,直到所有的到期任务都被处理完成。对于没有到期的任务,计算出距离到期最近的任务剩余延迟时间。
再看第二部分:
period值为0, 表示工作队列线程只能由信号唤醒。设置线程的状态busy为false,设置接收信号量种类,然后线程进入睡眠状态。此时,如果有新的任务被添加到工作队列中,那么该线程将被唤醒。这种方式对于低优先级的工作队列有用,低优先级的工作队列创建了最少一个线程,第一个线程采用延时poll的机制,剩下其余的采用信号中断的方式,这样既能保证工作队列上的任务不会超时太多(因为第一个线程会周期性Poll),也能保证当有任务插入到工作队列后,其他线程能够被唤醒,立即执行工作队列上(延迟到期)的任务。
如果period非0,那么计算出最小的睡眠时间,让该线程进入睡眠状态。睡眠延迟时间到,线程又开始运行。
qid选择需要添加的工作队列
work是对任务描述的数据结构指针
worker为任务的回调函数
arg是回调函数的参数
delay为延迟时间
通过qid选择将任务添加到高优先级或者低优先级的任务中,然后向工作队列发信号。
添加任务底层函数为
初始化任务的回调函数、回调函数参数和加入链表的时间,最后将任务添加到工作队列链表中。
如果是向高优先级工作队列发信号,那么直接找到高优先级工作队列线程的pid,然后发信号即可。因为高优先级工作队列只有一个线程,即
如果是向低优先级工作队列发信号,则找出低优先级工作队列的线程中的第一个处于睡眠状态的线程,向该线程发送信号。
Nuttx操作系统中工作队列的实现和在Linux中类似,Nuttx在内核中创建了内核线程,用于调度执行工作队列中的任务,在工作队列中允许任务睡眠。
Nuttx中工作队列内核线程有高低两种优先级,高优先级的工作队列用于执行一些优先级比较高的任务,比如将内存buffer中的数据换出到sd卡上。低优先的任务做一些相对不是很紧迫的工作,比如内存片回收。
本节首先介绍工作中队列涉及到的数据结构,然后以高优先级工作队列为例,说明工作队列线程的创建,添加任务到工作队列,工作队列任务处理。
数据结构
工作队列
struct hp_wqueue_s { systime_t delay; /* Delay between polling cycles (ticks) */ struct dq_queue_s q; /* The queue of pending work */ struct kworker_s worker[1]; /* Describes the single high priority worker */ };
struct lp_wqueue_s { systime_t delay; /* Delay between polling cycles (ticks) */ struct dq_queue_s q; /* The queue of pending work */ /* Describes each thread in the low priority queue's thread pool */ struct kworker_s worker[CONFIG_SCHED_LPNTHREADS]; };
delay是线程poll的周期。每次当工作队列poll一次工作队列上的任务,睡眠delay时间。如果工作队列的处理方式采用周期性的poll方式,信号可以唤醒工作队列线程,如果线程处在休眠状态。如果工作队列的处理方式采用信号中断的方式,那么只有当工作队列线程收到信号之后才会从睡眠等待中被唤醒,否则一直处于睡眠状态。
链表头q将任务链接在链表上
worker数组中的每一项代表一个线程。高优先级工作队列只创建一个线程,低优先级的工作队列至少创建了一个工作线程。
注意: 为了展现代码逻辑,文中忽略了对临界资源的保护代码
工作队列线程
struct kworker_s { pid_t pid; /* The task ID of the worker thread */ volatile bool busy; /* True: Worker is not available */ };
pid记录了工作队列中某个线程的pid
当busy为真时,表明该线程当前处于运行状态,不能接受信号。
工作任务
struct work_s { struct dq_entry_s dq; /* Implements a doubly linked list */ worker_t worker; /* Work callback */ FAR void *arg; /* Callback argument */ systime_t qtime; /* Time work queued */ systime_t delay; /* Delay until work performed */ };
任务(work)链表。一个任务被添加到工作队列中后,连接到
lp_wqueue_s.q或者
hp_wqueue_s.q上。
worker是任务的回调函数,用于执行具体的任务。
arg是任务回调函数的参数
qtime记录任务插入工作队列的时间
delay控制任务插入队列后至少经过delay时间才能运行
创建工作线程
int work_hpstart(void) { pid_t pid; g_hpwork.delay = CONFIG_SCHED_HPWORKPERIOD / USEC_PER_TICK; dq_init(&g_hpwork.q); pid = kernel_thread(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY, CONFIG_SCHED_HPWORKSTACKSIZE, (main_t)work_hpthread, (FAR char * const *)NULL); ... g_hpwork.worker[0].pid = pid; g_hpwork.worker[0].busy = true; return pid; }
初始化工作队列的poll周期
初始化工作队列的任务链表头
创建工作队列线程
初始化工作队列中工作所描述的线程的pid和线程的状态
高优先级的工作队列只创建了一个线程,线程的函数名
work_hpthread,线程的pid记录在
g_hpwork.worker[0].pid中。最后标记该线程为busy状态,即该线程处于睡眠过程中,不能接受任何信号。
工作队列线程
static int work_hpthread(int argc, char *argv[]) { for (; ; ) { #ifndef CONFIG_SCHED_LPWORK sched_garbage_collection(); #endif work_process((FAR struct kwork_wqueue_s *)&g_hpwork, g_hpwork.delay, 0); } return OK; /* To keep some compilers happy */
如果没有定义低优先级的工作队列,那么在高优先级的任务中会周期性的调用内存垃圾回收函数,整理内存碎片。
工作处理
void work_process(FAR struct kwork_wqueue_s *wqueue, systime_t period, int wndx)
这个函数比较长,我们分为两部分看,第一部分是对延迟任务的处理。第二部分是任务线程进入睡眠,由睡眠状态到运行状态的处理方式。
首先看第一部分
next = period; stick = clock_systimer(); work = (FAR struct work_s *)wqueue->q.head; while (work) { ctick = clock_systimer(); elapsed = ctick - work->qtime; if (elapsed >= work->delay) { (void)dq_rem((struct dq_entry_s *)work, &wqueue->q); worker = work->worker; if (worker != NULL) { arg = work->arg; work->worker = NULL; worker(arg); flags = enter_critical_section(); work = (FAR struct work_s *)wqueue->q.head; } else { work = (FAR struct work_s *)work->dq.flink; } } else /* elapsed < work->delay */ { elapsed += (ctick - stick); if (elapsed > work->delay) { elapsed = work->delay; } remaining = work->delay - elapsed; if (remaining < next) { next = remaining; } work = (FAR struct work_s *)work->dq.flink; } }
参数period是工作队列线程期望的poll周期,先赋值给next。next是本次系统真实需要睡眠等待的时间。在第二部分中会用到。
stick 记录本次工作处理的开始时间
从工作队列链表中取出第一任务work(如果链表非空),计算任务从被插入链表到当前经过的时间,如果时间超过了任务期望的延迟时间,那么就要赶快处理该任务。因为任务处理中允许任务睡眠,所以当任务处理完之后,我们有必要再次从工作队列链表头开始寻找已经到期的任务,直到所有的到期任务都被处理完成。对于没有到期的任务,计算出距离到期最近的任务剩余延迟时间。
再看第二部分:
#if defined(CONFIG_SCHED_LPWORK) && CONFIG_SCHED_LPNTHREADS > 0 if (period == 0) { sigset_t set; sigemptyset(&set); sigaddset(&set, SIGWORK); wqueue->worker[wndx].busy = false; DEBUGVERIFY(sigwaitinfo(&set, NULL)); wqueue->worker[wndx].busy = true; } else #endif { elapsed = clock_systimer() - stick; if (elapsed < period && next > 0) { remaining = period - elapsed; next = MIN(next, remaining); wqueue->worker[wndx].busy = false; usleep(next * USEC_PER_TICK); wqueue->worker[wndx].busy = true; } } leave_critical_section(flags);
period值为0, 表示工作队列线程只能由信号唤醒。设置线程的状态busy为false,设置接收信号量种类,然后线程进入睡眠状态。此时,如果有新的任务被添加到工作队列中,那么该线程将被唤醒。这种方式对于低优先级的工作队列有用,低优先级的工作队列创建了最少一个线程,第一个线程采用延时poll的机制,剩下其余的采用信号中断的方式,这样既能保证工作队列上的任务不会超时太多(因为第一个线程会周期性Poll),也能保证当有任务插入到工作队列后,其他线程能够被唤醒,立即执行工作队列上(延迟到期)的任务。
如果period非0,那么计算出最小的睡眠时间,让该线程进入睡眠状态。睡眠延迟时间到,线程又开始运行。
添加任务
int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay)
qid选择需要添加的工作队列
work是对任务描述的数据结构指针
worker为任务的回调函数
arg是回调函数的参数
delay为延迟时间
通过qid选择将任务添加到高优先级或者低优先级的任务中,然后向工作队列发信号。
添加任务底层函数为
static void work_qqueue(FAR struct kwork_wqueue_s *wqueue, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay) { ... work->worker = worker; /* Work callback. non-NULL means queued */ work->arg = arg; /* Callback argument */ work->delay = delay; /* Delay until work performed */ work->qtime = clock_systimer(); /* Time work queued */ dq_addlast((FAR dq_entry_t *)work, &wqueue->q); ... }
初始化任务的回调函数、回调函数参数和加入链表的时间,最后将任务添加到工作队列链表中。
向工作队列发信号
int work_signal(int qid) { ... #ifdef CONFIG_SCHED_HPWORK if (qid == HPWORK) { pid = g_hpwork.worker[0].pid; } else #endif #ifdef CONFIG_SCHED_LPWORK if (qid == LPWORK) { int i; for (i = 0; i < CONFIG_SCHED_LPNTHREADS; i++) { if (!g_lpwork.worker[i].busy) { break; } } if (i >= CONFIG_SCHED_LPNTHREADS) { return OK; } pid = g_lpwork.worker[i].pid; } else #endif { return -EINVAL; } ret = kill(pid, SIGWORK); if (ret < 0) { int errcode = errno; return -errcode; } return OK; }
如果是向高优先级工作队列发信号,那么直接找到高优先级工作队列线程的pid,然后发信号即可。因为高优先级工作队列只有一个线程,即
g_hpwork.worker[0]
如果是向低优先级工作队列发信号,则找出低优先级工作队列的线程中的第一个处于睡眠状态的线程,向该线程发送信号。
结束任务
结束工作队列中的任务比较简单,将该任务从工作队列中的任务链表中移除,清除任务的回调函数。相关文章推荐
- 中断下半部_工作队列(work queue)
- Linux中的工作队列(work queue)
- RabbitMQ学习总结 第三篇:工作队列Work Queue
- Linux 工作队列( Work Queue)
- RabbitMQ-work queue工作队列 和 fair dispatch公平分发
- Rabbitmq学习笔记三:工作队列work queue
- 【译】RabbitMQ:工作队列(Work Queue)
- 工作队列work queue
- 实例探讨工作队列(work queue)的工作原理
- RabbitMQ 工作队列
- Linux内核实践之工作队列【转】
- tasklet和工作队列
- RabbitMQ 官方NET教程(二)【工作队列】
- Linux 工作队列和等待队列的区别
- linux内核工作队列讲解和源码详细注释
- Linux 内核中工作队列的操作
- Linux 内核工作队列
- kworker内核工作队列详解
- NuttX_基础组件之队列
- 工作队列分析