高性能无锁(Lock-free) 内存池
2010-08-04 10:42
896 查看
由于懒惰,一直脱到现在才完成,实在是罪过啊!很快会用它来改写我的无锁容器,嗯,如果我不懒惰的话。
稍微解释一下关键问题:
先分配一块内存,然后将内存划分为等大的内存格。每次调用 alloc 就分配一块内存格出去。
可分配内存是个链表,这个链表被直接贮存在未分配的内存里。换句话说,未被分配的内存格里存放了一个指针,这个指针指向下一个未被分配的空闲内存格。
另外,为了我们分配的内存可以被正确释放,还需要一个链表来贮存我们分配的内存列表,这里我把这个链表贮存在我们分配的内存首部。也就是每块分配的内存,前几个字节保存了下一块内存的指针。
我们通过 cas 争用的一个指针指向了链表头,分配内存的过程就是从链表头摘取一个内存格,而释放的过程就是在链表头挂上内存格(注意,都是链表头,因此只需要争用一个指针)。
设计上希望代码支持 64 位,考虑到64位指针本身就是64位,但是当前系统最高应该只使用了 48位,因此使用剩下的部分来作为 ABA 计数。如果你的程序没有使用 256T 以上就应该没有问题吧,嗯——大概。
内存池的初始大小最好是够大,如果在中途分配,可能由于几个线程同时进程分配内存而一下子分配好几块,由于串联可分配内存的操作是比较费时的,为了节约,我把他们全挂上了,如果你希望节约内存的分配量,可以牺牲 cpu时间,放弃多分配的内存。
这个很快会作为一个库的一个组件发布,这个库暂时被命名为 lugce, 谁有更好的名字可以推荐不?呵呵
照例发表源码:
稍微解释一下关键问题:
先分配一块内存,然后将内存划分为等大的内存格。每次调用 alloc 就分配一块内存格出去。
可分配内存是个链表,这个链表被直接贮存在未分配的内存里。换句话说,未被分配的内存格里存放了一个指针,这个指针指向下一个未被分配的空闲内存格。
另外,为了我们分配的内存可以被正确释放,还需要一个链表来贮存我们分配的内存列表,这里我把这个链表贮存在我们分配的内存首部。也就是每块分配的内存,前几个字节保存了下一块内存的指针。
我们通过 cas 争用的一个指针指向了链表头,分配内存的过程就是从链表头摘取一个内存格,而释放的过程就是在链表头挂上内存格(注意,都是链表头,因此只需要争用一个指针)。
设计上希望代码支持 64 位,考虑到64位指针本身就是64位,但是当前系统最高应该只使用了 48位,因此使用剩下的部分来作为 ABA 计数。如果你的程序没有使用 256T 以上就应该没有问题吧,嗯——大概。
内存池的初始大小最好是够大,如果在中途分配,可能由于几个线程同时进程分配内存而一下子分配好几块,由于串联可分配内存的操作是比较费时的,为了节约,我把他们全挂上了,如果你希望节约内存的分配量,可以牺牲 cpu时间,放弃多分配的内存。
这个很快会作为一个库的一个组件发布,这个库暂时被命名为 lugce, 谁有更好的名字可以推荐不?呵呵
照例发表源码:
/* * Copyright (C) 2010 Chen Wang ( China ) * Email: jadedrip@gmail.com * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ #pragma once #include <exception> #include "lockfree.hpp" #if !defined(_MSC_VER) || (_MSC_VER < 1600) # define nullptr NULL #endif namespace lugce { namespace lockfree { template< typename T, int blocksize=255 > class memory_pool { static const int objsize= sizeof(T) < sizeof(intptr_t) ? sizeof(intptr_t) : sizeof(T); static const int64 aba_inc =0x0001000000000000LL; // ABA 计数每次需要增加的值 static const int64 aba_mark =0xFFFF000000000000LL; // ABA Mark static const int64 ptr_mark =0x0000FFFFFFFFFFFFLL; // 指针 Mark public: memory_pool() { char *block=tadem_block(); _first_block=block; _free_head.data=reinterpret_cast<intptr_t>(block)+sizeof(intptr_t)+objsize; // 指向链表头 } ~memory_pool() { // 释放内存块 char * next=_first_block; do{ char *p=next; intptr_t x=*(intptr_t*)p; next=(char*)x; delete[] p; }while(next); } public: /// 申请内存,返回一个指向新内存的指针 T* alloc() { /// 尝试从堆栈中弹出一个空闲索引 atomic_int64 nval; atomic_int64 old; for(;;){ old=_free_head; assert( (_free_head.data & ptr_mark) > 0x10000 ); intptr_t *next=reinterpret_cast<intptr_t*>( _free_head.data & ptr_mark ); // 指向下一块空闲单位的指针 if( *next==0 ){ // 没有空闲,需要创建新块 // 创建新块 create_new_block(); continue; } nval.data=( (old.data + aba_inc) & aba_mark); nval.data|=int64(*next); // ABA 计数 //assert( (nval.data & ptr_mark) > 0x10000 ); if( atomic_cas( &_free_head, old.data, nval.data ) ) break; }; return reinterpret_cast<T*>(old.data & ptr_mark); } void free( const T* ptr ) { intptr_t *p=(intptr_t*)ptr; atomic_int64 nval; atomic_int64 old; // 尝试将其放回链表 do{ old=_free_head; *p=(intptr_t)(old.data & ptr_mark); // 把内容改为下一个空闲索引 assert(*p > 10000); nval.data=((old.data + aba_inc) & aba_mark) | (intptr_t)ptr; assert( (nval.data & ptr_mark) > 0x10000 ); }while( !atomic_cas(&_free_head, old.data, nval.data) ); } private: /// 创建新的内存块 void create_new_block() { char *block=tadem_block(); // 分配内存 atomic_intptr_t *p=(atomic_intptr_t*)_first_block; // 尝试挂接到内存块链表 while( !atomic_cas( p, 0, intptr_t(block) ) ){ p=(atomic_intptr_t*)(p->data); // 移动到链表下一位 } p=(atomic_intptr_t*)( block+sizeof(intptr_t) ); // 让 p 指向链表尾部 // 尝试挂接到空闲内存栈头上 atomic_int64 old; atomic_int64 nval; do{ old=_free_head; p->data=intptr_t(old.data & ptr_mark); // 让链表尾指向当前尾 intptr_t x=*(intptr_t*)(p->data); assert( x==0 || x > 10000 ); assert(p->data>10000); nval.data= ( (old.data + aba_inc) & aba_mark) | reinterpret_cast<int64>(block+sizeof(intptr_t)+objsize); // 新的下块空闲指向本块 assert( (nval.data & ptr_mark) > 0x10000 ); } while( !atomic_cas(&_free_head, old.data, nval.data ) ); } /// 创建新内存块,并将内存串联为链表 char* tadem_block() { char *block=new char[blocksize * objsize+sizeof(intptr_t)]; // 准备一块内存,注意 new 可能抛出异常 char *p=block; *reinterpret_cast<intptr_t*>(p)=0; // 内存的头是对齐的,我们用来保存下一块内存的地址,以构建内存块链表(用来内存池析构时释放内存块) p+=sizeof(intptr_t); *reinterpret_cast<intptr_t*>(p)=0; // 接下来的4个字节,同样是对齐的,作为链表的尾部 p+=objsize; // 把这块内存做成链表 for( int32 i=0; i< blocksize-2; ++i ){ *reinterpret_cast<intptr_t*>(p)=reinterpret_cast<intptr_t>(p)+objsize; // 内容成为指向下一块的空闲单元的指针 p+=objsize; } *reinterpret_cast<intptr_t*>(p)=reinterpret_cast<intptr_t>(block)+sizeof(intptr_t); // 最后一块指向尾节点 return block; } private: char * _first_block; atomic_int64 _free_head; // 下一个空闲块的索引 }; }; };
相关文章推荐
- 再谈 高性能无锁(Lock-free) 内存池
- 高性能无锁(Lock-free) 内存池
- DIOCP开源项目-Delphi高性能无锁队列(lock-free)
- DIOCP开源项目-Delphi高性能无锁队列(lock-free)
- Boost之LockFree
- 关于Lock-free Hash Table的一些链接资料
- request_irq()、free_irq(),以及spin_lock_irqsave和spin_lock区别
- 锁无关的(Lock-Free)数据结构——在避免死锁的同时确保线程继续
- lock-free/wait-free算法以及ABA问题
- C++ 实现高性能内存池
- 再说 lock-free 编程
- something talk about lock from FreeBSD mail list
- Boost.Lockfree官方文档翻译
- CAS lockfree 循环队列
- Lock-free atomic operations in Android
- boost::lockfree::queue
- Lock-Free Data Structures
- 锁无关的(Lock-Free)数据结构,在避免死锁的同时确保线程继续
- Boost.Lockfree官方文档翻译
- Yet another implementation of a lock-free circular array queue