高性能流媒体服务器-nebula之数据结构(9)--双锁并发队列
2016-09-02 14:28
423 查看
这是根据 Maged M. Michael and Michael
L. Scott提出的双锁并发队列算法实现的c++版本,主要应用场景为:单生产者/多消费者或多生产者/多消费者或多生产者/单消费者的情况,在实现中,我们用spinlock作为互斥锁;内存分配采用了无锁内存池进行分配。
L. Scott提出的双锁并发队列算法实现的c++版本,主要应用场景为:单生产者/多消费者或多生产者/多消费者或多生产者/单消费者的情况,在实现中,我们用spinlock作为互斥锁;内存分配采用了无锁内存池进行分配。
<pre name="code" class="cpp">// // aqueue.h // nebula // // Created by yi.cheng on 15/10/8. // Copyright © 2015 year kanshansoft. All rights reserved. // #ifndef aqueue_h #define aqueue_h #include <core/mem/lfalloctor.h> #include <core/Errors.h> #include <core/guard.h> #include <utils/Log.h> /** Two-Lock Concurrent Queue Algorithm structure node_t {value: data type, next: pointer to node_t} structure queue_t {Head: pointer to node_t, Tail: pointer to node_t, H_lock: lock type, T_lock: lock type} initialize(Q: pointer to queue_t) node = new_node() // Allocate a free node node->next = NULL // Make it the only node in the linked list Q->Head = Q->Tail = node // Both Head and Tail point to it Q->H_lock = Q->T_lock = FREE // Locks are initially free enqueue(Q: pointer to queue_t, value: data type) node = new_node() // Allocate a new node from the free list node->value = value // Copy enqueued value into node node->next = NULL // Set next pointer of node to NULL lock(&Q->T_lock) // Acquire T_lock in order to access Tail Q->Tail->next = node // Link node at the end of the linked list Q->Tail = node // Swing Tail to node unlock(&Q->T_lock) // Release T_lock dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean lock(&Q->H_lock) // Acquire H_lock in order to access Head node = Q->Head // Read Head new_head = node->next // Read next pointer if new_head == NULL // Is queue empty? unlock(&Q->H_lock) // Release H_lock before return return FALSE // Queue was empty endif *pvalue = new_head->value // Queue not empty. Read value before release Q->Head = new_head // Swing Head to next node unlock(&Q->H_lock) // Release H_lock free(node) // Free node return} TRUE // Queue was not empty, dequeue succeeded **/ namespace anysee { #define MAX_QUEUE_ITEM 1024 template<typename T> class AQueue { AQueue(const AQueue&); AQueue& operator=(const AQueue&); struct ANode { INLINE ANode() : next(NULL) {} T value; ANode* next; }; public: INLINE AQueue(lfalloctor* const alloctor) : mElements(0), mAlloctor(alloctor), mHead(NULL), mTail(NULL) { } INLINE void init() { ANode* dummyHead = LFNew<ANode>(*mAlloctor); mHead = mTail = dummyHead; } INLINE bool isEmpty() const { return (mHead->next == NULL); } INLINE size_t size() const { return mElements; } INLINE ssize_t push(const T& item) { ssize_t result = OK; if (mHead == NULL) { init(); } if (mElements >= MAX_QUEUE_ITEM) { ALOGE("queue overflow %d", MAX_QUEUE_ITEM); result = QUEUE_OVERFLOW; } else { ANode* n = LFNew<ANode>(*mAlloctor); n->value = item; mTail->next = n; mTail = n; ++mElements; } return result; } INLINE bool get(T& val) { if(mHead == NULL) { init(); } ANode* item = mHead; ANode* newItem = mHead->next; bool ret = false; if (newItem != NULL) { val = newItem->value; mHead = newItem; --mElements; ret = true; LFDelete(*mAlloctor, item); } return ret; } INLINE bool peek(T& val) { bool ret = false; ANode* item = mHead->next; if (item) { val = item->value; ret = true; } return ret; } void clear() { if (mHead) { T item; while (get(item)); LFDelete<ANode>(*mAlloctor, mHead); mElements = 0; mHead = mTail = NULL; } } ~AQueue() { clear(); } private: ANode* mHead; ANode* mTail; size_t mElements; lfalloctor* const mAlloctor; }; template<typename T, typename LOCKER> class AQueueLock { AQueueLock(const AQueueLock&); AQueueLock& operator=(const AQueueLock&); public: INLINE AQueueLock(lfalloctor* const alloctor) : impl_(alloctor) { } INLINE void init() { impl_.init(); } INLINE bool isEmpty() { Guard<LOCKER> l(mLockerHead); return impl_.isEmpty(); } INLINE size_t size() const { return impl_.size(); } INLINE ssize_t push(const T& item) { Guard<LOCKER> l(mLockerTail); return impl_.push(item); } INLINE bool get(T& val) { Guard<LOCKER> l(mLockerHead); return impl_.get(val); } INLINE bool peek(T& val) { Guard<LOCKER> l(mLockerHead); return impl_.peek(val); } void clear() { Guard<LOCKER> l(mLockerHead); impl_.clear(); } ~AQueueLock() { clear(); } private: AQueue<T> impl_; LOCKER mLockerHead; LOCKER mLockerTail; }; } #endif /* aqueue_h */
相关文章推荐
- 高性能流媒体服务器-nebula之数据结构(7)--环形无锁队列
- 高性能流媒体服务器-nebula之数据结构(8)--双链表实现的内存中立队列
- 高性能流媒体服务器-nebula之数据结构(1)--hash table介绍
- 高性能流媒体服务器-nebula之数据结构(6)--无节点内存分配的单、双链表
- 高性能流媒体服务器-nebula之数据结构(2)--pairing heap
- 高性能流媒体服务器-nebula之数据结构(4)--动态数组NBAArray
- 高性能流媒体服务器-nebula之数据结构(5)--动态数组DynArray
- 高性能流媒体服务器-nebula之数据结构(3)--AVL树定时器
- Disruptor——一种可替代有界队列完成并发线程间数据交换的高性能解决方案
- EasyDarwin开源流媒体服务器高性能设计之无锁队列
- EasyDarwin开源流媒体服务器高性能设计之无锁队列
- Disruptor:一种高性能的、在并发线程间数据交换领域用于替换有界限队列的方案
- C#数据结构之队列
- 关于linux内核中 等待队列 数据结构之思考
- 优先队列的实现 Java数据结构与算法
- 高性能服务器结构
- Reactor, Proactor并发模式与高性能视频服务器
- 面向数据c++数据结构之基本数据结构(队列)--【美】Jan Harrington 陈博译
- 数据结构与算法学习之队列及队列的相关操作
- [翻译]C#数据结构与算法 – 第五章栈与队列(Part 1)