Linux平台上C语言实现异步队列的两种方法
2014-02-27 13:31
603 查看
Linux上目前有两种事件通知方式,一种是线程条件变量,一种是利用eventfd实现事件通知,下面介绍一下利用这两种方法实现异步队列的方法。
###线程条件变量
####相关函数介绍
pthread_cond_init:初始化一个线程条件变量。
pthread_cond_wait:等待条件触发。
pthread_cond_signal:通知一个线程,线程条件发生。
pthread_cond_timedwait:等待条件触发,可以设置超时时间。
pthread_cond_reltimedwait_np:和pthread_cond_timedwait使用基本相同,区别是使用的是相对时间间隔而不是绝对时间间隔。
pthread_cond_broadcast:通知所有等待线程,线程条件发生。
pthread_cond_destroy:销毁条件变量。
####唤醒丢失问题
如果线程未持有与条件相关联的互斥锁,则调用 pthread_cond_signal() 或 pthread_cond_broadcast() 会产生唤醒丢失错误。满足以下所有条件时,即会出现唤醒丢失问题:
一个线程调用 pthread_cond_signal() 或 pthread_cond_broadcast()
另一个线程已经测试了该条件,但是尚未调用 pthread_cond_wait()
没有正在等待的线程
信号不起作用,因此将会丢失,仅当修改所测试的条件但未持有与之相关联的互斥锁时,才会出现此问题。只要仅在持有关联的互斥锁同时修改所测试的条件,即可调用 pthread_cond_signal() 和 pthread_cond_broadcast(),而无论这些函数是否持有关联的互斥锁。
####线程条件变量使用方法
###eventfd
EFD_CLOEXEC:设置该描述符的
EFD_NONBLOCK:设置描述符为非阻塞模式。
EFD_SEMAPHORE:设置描述符为信号量工作模式,在此模式下,
当内核维护的8字节整型数为0时,
###简单的唤醒队列
下面我们实现一个简单的环形队列:
###线程变量实现的异步队列
###eventfd实现的异步队列
###总结
两种实现方法线程条件变量比较复杂,但是性能略高,而eventfd实现简单,但是性能略低。
###线程条件变量
####相关函数介绍
pthread_cond_init:初始化一个线程条件变量。
pthread_cond_wait:等待条件触发。
pthread_cond_signal:通知一个线程,线程条件发生。
pthread_cond_timedwait:等待条件触发,可以设置超时时间。
pthread_cond_reltimedwait_np:和pthread_cond_timedwait使用基本相同,区别是使用的是相对时间间隔而不是绝对时间间隔。
pthread_cond_broadcast:通知所有等待线程,线程条件发生。
pthread_cond_destroy:销毁条件变量。
####唤醒丢失问题
如果线程未持有与条件相关联的互斥锁,则调用 pthread_cond_signal() 或 pthread_cond_broadcast() 会产生唤醒丢失错误。满足以下所有条件时,即会出现唤醒丢失问题:
一个线程调用 pthread_cond_signal() 或 pthread_cond_broadcast()
另一个线程已经测试了该条件,但是尚未调用 pthread_cond_wait()
没有正在等待的线程
信号不起作用,因此将会丢失,仅当修改所测试的条件但未持有与之相关联的互斥锁时,才会出现此问题。只要仅在持有关联的互斥锁同时修改所测试的条件,即可调用 pthread_cond_signal() 和 pthread_cond_broadcast(),而无论这些函数是否持有关联的互斥锁。
####线程条件变量使用方法
get_resources(int amount) { pthread_mutex_lock(&rsrc_lock); while (resources < amount) { pthread_cond_wait(&rsrc_add, &rsrc_lock); } resources -= amount; pthread_mutex_unlock(&rsrc_lock); } add_resources(int amount) { pthread_mutex_lock(&rsrc_lock); resources += amount; pthread_cond_broadcast(&rsrc_add); pthread_mutex_unlock(&rsrc_lock); }
###eventfd
int eventfd(unsigned int initval, int flags);
eventfd是Linux提供内核态的事件等待/通知机制,内核维护了一个8字节的整型数,该整型数由
initval来初始化,
flags参数可以由以下值位或而来:
EFD_CLOEXEC:设置该描述符的
O_CLOEXEC标志。
EFD_NONBLOCK:设置描述符为非阻塞模式。
EFD_SEMAPHORE:设置描述符为信号量工作模式,在此模式下,
read模式会使整型数减1并返回数值1。
当内核维护的8字节整型数为0时,
read操作会阻塞,如果为fd设置为非阻塞模式,则返回
EAGAIN错误。
###简单的唤醒队列
下面我们实现一个简单的环形队列:
#define default_size 1024 typedef struct queue { int header; int tail; int size; int capcity; void **_buf; } queue_t; queue_t *queue_create(int size) { queue_t *q = malloc(sizeof (queue_t)); if (q != NULL) { if (size > 0) { q->_buf = malloc(size); q->capcity = size; } else { q->_buf = malloc(default_size * sizeof (void *)); q->capcity = default_size; } q->header = q->tail = q->size = 0; } return q; } int queue_is_full(queue_t *q) { return q->size == q->capcity; } int queue_is_empty(queue_t *q) { return q->size == 0; } void queue_push_tail(queue_t *q, void *data) { if (!queue_is_full(q)) { q->_buf[q->tail] = data; q->tail = (q->tail + 1) % q->capcity; q->size++; } } void *queue_pop_head(queue_t *q) { void *data = NULL; if (!queue_is_empty(q)) { data = q->_buf[(q->header)]; q->header = (q->header + 1) % q->capcity; q->size--; } return data; } int *queue_free(queue_t *q) { free(q->_buf); free(q); }
###线程变量实现的异步队列
typedef struct async_queue { pthread_mutex_t mutex; pthread_cond_t cond; int waiting_threads; queue_t *_queue; } async_queue_t; async_queue_t *async_queue_create(int size) { async_queue_t *q = malloc(sizeof (async_queue_t)); q->_queue = queue_create(size); q->waiting_threads = 0; pthread_mutex_init(&(q->mutex), NULL); pthread_cond_init(&(q->cond), NULL); return q; } void async_queue_push_tail(async_queue_t *q, void *data) { if (!queue_is_full(q->_queue)) { pthread_mutex_lock(&(q->mutex)); queue_push_tail(q->_queue, data); if (q->waiting_threads > 0) { pthread_cond_signal(&(q->cond)); } pthread_mutex_unlock(&(q->mutex)); } } void *async_queue_pop_head(async_queue_t *q, struct timeval *tv) { void *retval = NULL; pthread_mutex_lock(&(q->mutex)); if (queue_is_empty(q->_queue)) { q->waiting_threads++; while (queue_is_empty(q->_queue)) { pthread_cond_wait(&(q->cond), &(q->mutex)); } q->waiting_threads--; } retval = queue_pop_head(q->_queue); pthread_mutex_unlock(&(q->mutex)); return retval; } void async_queue_free(async_queue_t *q) { queue_free(q->_queue); pthread_cond_destroy(&(q->cond)); pthread_mutex_destroy(&(q->mutex)); free(q); }
###eventfd实现的异步队列
typedef struct async_queue { int efd; //event fd fd_set rdfds; //for select queue_t *_queue; } async_queue_t; async_queue_t *async_queue_create(int size) { async_queue_t *q = malloc(sizeof (async_queue_t)); q->efd = eventfd(0, EFD_SEMAPHORE|EFD_NONBLOCK); q->_queue = queue_create(size); FD_ZERO(&(q->rdfds)); FD_SET(q->efd, &(q->rdfds)); return q; } void async_queue_push_tail(async_queue_t *q, void *data) { unsigned long long i = 1; if (!queue_is_full(q->_queue)) { queue_push_tail(q->_queue, data); write(q->efd, &i, sizeof (i)); } } void *async_queue_pop_head(async_queue_t *q, struct timeval *tv) { unsigned long long i = 0; void *data = NULL; if (select(q->efd + 1, &(q->rdfds), NULL, NULL, tv) == 0) { return data; } else { read(q->efd, &i, sizeof (i)); return queue_pop_head(q->_queue); } } void async_queue_free(async_queue_t *q) { queue_free(q->_queue); close(q->efd); free(q); }
###总结
两种实现方法线程条件变量比较复杂,但是性能略高,而eventfd实现简单,但是性能略低。
相关文章推荐
- Linux平台上C语言实现异步队列的两种方法
- 20155212 C语言实现linux下pwd命令的两种方法
- Linux平台中使用PHP把word转pdf的实现方法
- Linux2.6内核--中断下半部实现方法 工作队列
- C语言两种方法实现进程间 socket 通信
- TQ2440学习笔记——Linux上I2C驱动的两种实现方法(1)
- Java数组实现循环队列的两种方法
- C++_队列的两种实现方法
- Linux下获得系统时间的C语言的实现方法
- Linux下获得系统时间的C语言的实现方法[转]
- Linux下获得系统时间的C语言的实现方法
- 两种方法实现队列---顺序表和链表
- c语言:实现对于给定的正整数N,依次打印出小于等于N的所有素数。两种方法及其优化
- Linux下获得系统时间的C语言的实现方法
- 生产者消费者模型(Linux系统下的两种实现方法)
- 两种方法实现队满和队空的判断操作(循环队列)
- 队列应用银行排队问题模拟:计算客户的平均停留时间和等待时间以及每个客户的时间信息,两种方法实现
- 异步任务队列的两种处理方法
- C语言,数组实现约瑟夫环问题(两种方法)
- C语言实现斐波那契数列的两种方法(递归和迭代)