您的位置:首页 > 理论基础 > 数据结构算法

高性能流媒体服务器-nebula之数据结构(9)--双锁并发队列

2016-09-02 14:28 423 查看
这是根据 Maged M. Michael and Michael
L. Scott提出的双锁并发队列算法实现的c++版本,主要应用场景为:单生产者/多消费者或多生产者/多消费者或多生产者/单消费者的情况,在实现中,我们用spinlock作为互斥锁;内存分配采用了无锁内存池进行分配。

<pre name="code" class="cpp">//
//  aqueue.h
//  nebula
//
//  Created by yi.cheng on 15/10/8.
//  Copyright © 2015 year kanshansoft. All rights reserved.
//

#ifndef aqueue_h
#define aqueue_h
#include <core/mem/lfalloctor.h>
#include <core/Errors.h>
#include <core/guard.h>
#include <utils/Log.h>
/**
Two-Lock Concurrent Queue Algorithm

structure node_t {value: data type, next: pointer to node_t}
structure queue_t {Head: pointer to node_t, Tail: pointer to node_t,
H_lock: lock type, T_lock: lock type}

initialize(Q: pointer to queue_t)
node = new_node()		        // Allocate a free node
node->next = NULL               // Make it the only node in the linked list
Q->Head = Q->Tail = node	    // Both Head and Tail point to it
Q->H_lock = Q->T_lock = FREE	// Locks are initially free

enqueue(Q: pointer to queue_t, value: data type)
node = new_node()	    // Allocate a new node from the free list
node->value = value		// Copy enqueued value into node
node->next = NULL       // Set next pointer of node to NULL
lock(&Q->T_lock)		// Acquire T_lock in order to access Tail
Q->Tail->next = node	// Link node at the end of the linked list
Q->Tail = node		    // Swing Tail to node
unlock(&Q->T_lock)		// Release T_lock

dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
lock(&Q->H_lock)	    // Acquire H_lock in order to access Head
node = Q->Head		    // Read Head
new_head = node->next	// Read next pointer
if new_head == NULL	    // Is queue empty?
unlock(&Q->H_lock)	    // Release H_lock before return
return FALSE		    // Queue was empty
endif
*pvalue = new_head->value	// Queue not empty.  Read value before release
Q->Head = new_head	        // Swing Head to next node
unlock(&Q->H_lock)		    // Release H_lock
free(node)			        // Free node
return} TRUE		        // Queue was not empty, dequeue succeeded
**/
namespace anysee {
#define  MAX_QUEUE_ITEM                  1024
template<typename T> class AQueue {
AQueue(const AQueue&);
AQueue& operator=(const AQueue&);
struct  ANode {
INLINE ANode() : next(NULL) {}
T      value;
ANode* next;
};
public:
INLINE AQueue(lfalloctor* const alloctor) : mElements(0),
mAlloctor(alloctor), mHead(NULL), mTail(NULL)
{

}
INLINE void init() {
ANode* dummyHead = LFNew<ANode>(*mAlloctor);
mHead = mTail = dummyHead;
}

INLINE bool isEmpty() const {
return (mHead->next == NULL);
}

INLINE size_t size() const {
return mElements;
}

INLINE ssize_t push(const T& item) {
ssize_t result = OK;
if (mHead == NULL) {
init();
}
if (mElements >= MAX_QUEUE_ITEM) {
ALOGE("queue overflow %d", MAX_QUEUE_ITEM);
result = QUEUE_OVERFLOW;
}
else {
ANode* n = LFNew<ANode>(*mAlloctor);
n->value = item;
mTail->next = n;
mTail = n;
++mElements;
}
return result;
}

INLINE bool get(T& val) {
if(mHead == NULL) {
init();
}
ANode*  item = mHead;
ANode*  newItem = mHead->next;
bool ret = false;
if (newItem != NULL) {
val = newItem->value;
mHead = newItem;
--mElements;
ret = true;
LFDelete(*mAlloctor, item);
}
return ret;
}

INLINE bool peek(T& val) {
bool ret = false;
ANode*  item = mHead->next;
if (item) {
val = item->value;
ret = true;
}
return ret;
}

void clear() {
if (mHead) {
T item;
while (get(item));
LFDelete<ANode>(*mAlloctor, mHead);
mElements = 0;
mHead = mTail = NULL;
}
}
~AQueue() {
clear();
}
private:
ANode*                          mHead;
ANode*                          mTail;
size_t                          mElements;
lfalloctor* const               mAlloctor;
};

template<typename T,  typename LOCKER> class AQueueLock  {
AQueueLock(const AQueueLock&);
AQueueLock& operator=(const AQueueLock&);
public:
INLINE AQueueLock(lfalloctor* const alloctor) : impl_(alloctor)
{

}
INLINE void init() {
impl_.init();
}

INLINE bool isEmpty() {
Guard<LOCKER> l(mLockerHead);
return impl_.isEmpty();
}
INLINE size_t   size() const  {
return impl_.size();
}

INLINE ssize_t push(const T& item) {
Guard<LOCKER> l(mLockerTail);
return impl_.push(item);
}

INLINE bool   get(T& val)  {
Guard<LOCKER> l(mLockerHead);
return impl_.get(val);
}

INLINE bool   peek(T& val)  {
Guard<LOCKER> l(mLockerHead);
return impl_.peek(val);
}

void clear() {
Guard<LOCKER> l(mLockerHead);
impl_.clear();
}

~AQueueLock() {
clear();
}
private:
AQueue<T>     impl_;
LOCKER        mLockerHead;
LOCKER        mLockerTail;
};
}
#endif /* aqueue_h */



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息