您的位置:首页 > 编程语言 > C语言/C++

一个线程安全的 lrucache 实现 --- 读 leveldb 源码

2018-03-10 23:49 330 查看
缓存是计算机的每一个层次中都是一个非常重要的概念,缓存的存在可以大大提高软件的运行速度。Least Recently Used(lru) cache 即最近最久未使用的缓存,多见与页面置换算法,lru 缓存算法在缓存的大小达到最大值之后,换出最早未被使用的缓存。
在阅读  leveldb 的源代码的时候,发现其中的 cache 类正是一个线程安全的 lru-cache 实现,代码非常优雅。笔者读完之后受益良多,希望借助这篇博客,一来可以加深自己的理解,而来可以与大家共享这些优秀代码(PS:因为这个 cache 类主要是为了支持 leveldb 的实现,所以其实现的接口可能与其他常见缓存有所差别)。
leveldb 中的 cache 提供的并不是一个类似于 std::map 的模板类,其有一下几个特点:
键值必须为 string-like (代码中为 Slice 类).
存储的内容为 void *.所以支持存储任何类型的数据,因为存储的是指针,所以 cache 类负责管理其所有 entries 的生命周期以及资源回收。在插入的时候必须明确传入 deleter.
对 entries 的查询需要先获取 Handle(像pthread_t一样的一个对客户端透明的结构,如果客户端不再使用,必须要显示的调用Cache::Release(handle).不直接提供访问 entries 的接口,防止了客户端修改,但引入了一次额外的函数调用(Cache::Value(hanle)).
短短30行代码,实现了一个很精悍的 hash_table.
使用两个双向链表来分别存储被客户端使用的 entries 和未被使用的 entries.
NewId() 接口可以生成一个唯一的 id,多线程环境下可以使用这个 id 与自己的键值拼接起来,防止不同线程之间互相覆写.
使用很直观的 mutex 类提供线程安全性.
虽然巧妙,但是代码的可读性非常之高的,下面让我们来看一下主要的代码(完整代码可以参见 leveldb 源代码或者本人的 github)。class Cache {
public:
Cache() { }

  // 需要调用插入时的 deleter 销毁所有 entries
virtual ~Cache();

// 每个 entry 对应的 handle,客户端只能通过调用 this->Value(handle) 来获取缓存的内容,不可以直接访问缓存
struct Handle { };

// 插入一个 key->value 的映射,支持通过 charge 为不同的缓存设定不同的权重。
// 返回一个 entry 对应的 handle 指针到客户端,客户端不需要使用后必须显示的调用 this->Release(handle),
// 当缓存要被换出的时候,会调用传入的 deleter
virtual Handle *Insert(const Slice &key, void *value, size_t charge,
void (*deleter)(const Slice &key, void *value)) = 0;

// 查询接口,如果缓存不存在,返回 NULL 指针。否则返回 entry 对应的 handle 指针,同样必须调用 this->Release(handle)
virtual Handle *Lookup(const Slice &key) = 0;

// 释放由 Insert 或者 Lokkup 返回的 handle。Release a mapping returned by a previous Lookup().
// NOTE: 1.仅能在一个 handle 上调用一次 this->Release(handle)
// NOTE: 2.handle 在调用该方法后不能再使用(handle 指针会被 free 掉)
virtual void Release(Handle *handle) = 0;

// 获取缓存内容
virtual void *Value(Handle *hanle) = 0;

// 见上面第6点
virtual uint64_t NewId() = 0;

// 清楚所有的未使用缓存
virtual void Prune() { }

// 查看缓存使用总量
virtual size_t TotalCharge() const = 0;

private:
// No copying allowed
Cache(const Cache &);
void operator=(const Cache &);
};因为 leveldb 中支持对定制自己的缓存类,所以其头文件中的 Cache 是一个虚拟基类,更像是一个接口定义。每个接口的含义我都已经加上了注释,为了保证除了 Cache 类,没有人可以操作缓存数据(这是非常重要的一点,因为 Cache 中存储的都是 void *,Cache 类必须要确保它自己唯一管理这些缓存对象生命周期),这里使用了 Handle(句柄)。虽然在查询的时候额外引入了一层间接层,但是可以充分保证数据安全。
下面让我们看一下实现代码:struct LRUHandle { // 这便是返回的 Handle,在返回的时候会 return reinterpret_cast<Cache::Handle*>(handle)
void *value;
void (*deleter)(const Slice &, void *);
LRUHandle *next_hash; // 给自己实现的 hahs_table 使用;
LRUHandle *next; // 双向链表
LRUHandle *prev; // 双向链表
size_t charge; // 权重
size_t key_length; // key 长度
bool in_cache; // 是否被缓存(我们的类可以通过将 capacity 设置为0来关闭缓存)
uint32_t refs; // 引用计数
uint32_t hash; // 我们要将 hash 值缓存起来,防止需要重复 hash 计算
char key_data[1]; // 柔性数组,存储键

Slice key() const {
assert(next != this); // Hold true iff this is the head of an empty list
return Slice(key_data, key_length);
}
};代码中 Slice 是 google 项目常见的一个工具类,其实现非常简单,可以认为它仅仅是对一个字符串的引用(因为它自己不管理内存,咱们在这里可以简单的把它当做 std::string 也是可以的,它可以使用的地方都可以使用 std::string(反过来不成立))。
可以看到我们这个 LRUHandle 类中的数据还是不少的,在后面的实现代码中我们能看到它们的用武之地。下面要看的代码是 levedb 自己实现的一个很短小精悍的 hash_table,其使用开链法解决冲突,代码真的很优雅,很值得一读:class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
~HandleTable() { delete[] list_; }

LRUHandle *Lookup(const Slice &key, uint32_t hash) {
return *FindPointer(key, hash);
}

// Insert into the last position
LRUHandle *Insert(LRUHandle *h) {
LRUHandle **ptr = FindPointer(h->key(), h->hash);
LRUHandle *old = *ptr;
h->next_hash = (old == NULL ? NULL : old->next_hash);
*ptr = h;
if (old == NULL) {
++elems_;
if (elems_ > length_) {
Resize();
}
}
return old;
}

LRUHandle *Remove(const Slice &key, uint32_t hash) {
LRUHandle **ptr = FindPointer(key, hash);
LRUHandle *result = *ptr;
if (result != NULL) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
private:
uint32_t length_; // hash table 的 bucket 数
uint32_t elems_; // hash table 中 entry 数
LRUHandle **list_; // hash bucket 数组

LRUHandle **FindPointer(const Slice &key, uint32_t hash) {
assert(list_ != NULL);
LRUHandle **ptr = &list_[hash & (length_ - 1)]; // 找到对应键的桶,取第一个 entry
while (*ptr != NULL &&
((*ptr)->hash != hash || (*ptr)->key() != key)) {
ptr = &(*ptr)->next_hash; // 注意,我们返回的不是一个 LRUHandle *,而是一个 LRUHandle **(&(LRUHandle::next_hash))
}
return ptr;
}

void Resize() {
uint32_t new_length = 4; // length_ 一定是 2 的幂次,这样可以使用 hash & (length_ - 1) 来算出 hash % length_
while (new_length < elems_) new_length <<= 1; // 算出新 length,因为 length > elems_; 所以装载因子<1
LRUHandle **new_list = new LRUHandle *[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) { // 遍历所有的桶,为桶内元素搬家
LRUHandle *h = list_[i]; // 每个桶的第一个 entry
while (h != NULL) {
LRUHandle *next = h->next_hash; // 先保存起来 next
uint32_t hash = h->hash;
LRUHandle **ptr = &new_list[hash & (new_length-1)]; // 计算在新表中的桶下标,取桶内第一个 entry
h->next_hash = *ptr; // 头插法
*ptr = h;
h = next;
++count;
}
}
assert(count == elems_);
delete[] list_; // 更新 list_ 和 length_
list_ = new_list;
length_ = new_length;
}
};上面的代码真的写的太好了,笔者都有怎么读都读不够的感觉啊,谷歌工程师,真的太强了。简要的说下最重要的两个接口:Resize() 和 FindPointer()。
先说 Resize()。在 hash_table 中扩容的时候,往往有两个常见的选择,一是 bucket 的数量选用一个素数,这样可以保证 hash mod 之后的结果最均匀,还有一种是使用2的幂次结果作为 bucket 数,后者是通过优化 hash % #bucket 这个运算来优化(因为 #bucket = 2^n,hash % #bucket 等价于 hash & (length - 1))。在 leveldb 中选用的是后者来优化。
然后是 FindPointer(),在读这份源码之前,我对链表内元素的插入和删除之后使用两个指针,一前一后,来操作链表,但是作者使用一个 LRUHandle **指针,一个指针就解决了对链表元素的插入和删除,真的太巧妙了。我觉得这个代码都值得单独拿出来讲,所以这里并不准备全部展开,但是推荐大家好好品读,我也会学习这个写法,自己写一个 list 实现,到时候也会发到博客,和大家一起交流。
有了 hash_table,我们可以开始实现了 LRUCache 了:class LRUCache {
public:
LRUCache();
~LRUCache();

void SetCapacity(size_t cap) { capacity_ = cap; }

Cache::Handle *Insert(const Slice &key, uint32_t hash,
void *value, size_t charge,
void (*deleter)(const Slice &, void *));

Cache::Handle *Lookup(const Slice &key, uint32_t hash);
void Release(Cache::Handle *handle);
void Erase(const Slice &key, uint32_t hash);
void Prune();
size_t TotalCharge() const {
MutexLock l(&mutex_);
return usage_;
}

private:
void LRU_Remove(LRUHandle *e);
void LRU_Append(LRUHandle *list, LRUHandle *e);
void Ref(LRUHandle *e);
void Unref(LRUHandle *e);
bool FinishErase(LRUHandle *e);
// Initialized before use.
size_t capacity_;

// Capacity usage guarded by mutex
mutable Mutex mutex_;
size_t usage_;

// 所有未被使用的 entries 都被存储于 lru_ 这个双向链表中,这里使用 dummy head 的方法实现双向链表
// lru_.prev 是最新的 entry,lru_.next 是最早的 entry(也就是说换出缓存的时候就从 lru_.next 开刀)
// lru_ 中的所有 entries 都有 refs == 1 && in_cache == true (仅被 LRUCache 类引用,而且被缓存)
LRUHandle lru_;

// 所有现在被使用(客户端持有至少一个尚未 Release() 的 handle)的 entries 存储在 in_use_ 双向链表中
// in_use_ 中 entries 有 refs >= 2 && in_cache = true
LRUHandle in_use_;

HandleTable table_; // 用于进行 key->handle 的 mapping
};

LRUCache::LRUCache() : usage_(0) { // 初始化两个双向链表的 dummy head
lru_.next = &lru_;
lru_.prev = &lru_;
in_use_.next = &in_use_;
in_use_.prev = &in_use_;
}

LRUCache::~LRUCache() {
assert(in_use_.next == &in_use_); // 如果在 LRUCache 在析构的时候,还有客户端持有尚未 Release() 的 handle,那么就会出问题
LRUHandle *e = lru_.next;
while (e != &lru_) { // 释放所有 entries
LRUHandle *next = e->next;
assert(e->in_cache);
e->in_cache = false;
assert(e->refs == 1);
Unref(e);
e = next;
}
}

void LRUCache::Ref(LRUHandle *e) { // 增加引用计数,必要的话将其从 lru_. 挪到 in_use_
if (e->refs == 1 && e->in_cache) { // ++ 后 refs 大于1,代表有客户端使用
LRU_Remove(e); // 从 lru_ 中删除
LRU_Append(&in_use_, e); // 插入 in_use_ 中
}
e->refs++; // 增加引用计数
}

void LRUCache::Unref(LRUHandle *e) {
assert(e->refs > 0); // Unref() 调用前,这个 handle 应该是被引用的
if (--e->refs == 0) { // 无人引用(包括 LRUCache 类),需要调用其构造时传入的删除器,管理资源
assert(!e->in_cache);
(*e->deleter)(e->key(), e->value);
free(e); // 注意所有的 handle 都是 heap-allocated,必须要free
} else if (e->in_cache && e->refs == 1) { // refs==1 代表仅有 LRUCache 类引用
LRU_Remove(e); // 从 in_use_ 中删除
LRU_Append(&lru_, e); // 插入 lru_ 中
}
}
// 将元素从双向链表中删除(删除并不需要知道其所属的 list)
void LRUCache::LRU_Remove(LRUHandle *e) {
e->prev->next = e->next;
e->next->prev = e->prev;
}
// 将元素插入到双向链表 head.prev
void LRUCache::LRU_Append(LRUHandle *list, LRUHandle *e) {
list->prev->next = e;
e->prev = list->prev;
e->next = list;
list->prev = e;
}

Cache::Handle *LRUCache::Lookup(const Slice &key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle *e = table_.Lookup(key, hash); // 在 hash_table 中查找
if (e != NULL) Ref(e); // 如果有被缓存起来,增加引用计数等
return reinterpret_cast<Cache::Handle*>(e); // 将其 cast 为一个 Handle * 返回,还记得 Cache 类中的接口吧?
}

void LRUCache::Release(Cache::Handle *e) {
MutexLock l(&mutex_);
Unref(reinterpret_cast<LRUHandle*>(e));
}

Cache::Handle *LRUCache::Insert(
const Slice &key, uint32_t hash, void *value, size_t charge,
void (*deleter)(const Slice &, void *)) {
MutexLock l(&mutex_);

LRUHandle *e = reinterpret_cast<LRUHandle*>(
malloc(sizeof(LRUHandle) + (key.size() - 1))); // 这里注意,因为 LRUHandle 中保存了 key_length,所以这里在申请内存的时候,并没有为 '\0'申请内存,如果在 LRUHandle::key_data 上调用 strlen() 会有想不到的后果
e->value = value;
e->charge = charge;
e->deleter = deleter;
e->key_length = key.size();
e->in_cache = false;
e->refs = 1; // 这里将引用计数设为1是代表被 this->Insert() 的调用者引用,并不是代表 LRUCache 引用,所以如果不保存 this->Insert() 的返回结果会有很大的问题
memcpy(e->key_data, key.data(), key.size());

if (capacity_ > 0) { // 需要缓存起来
e->refs++; // LRUCache 类也要引用这个 handle 了
e->in_cache = true;
LRU_Append(&in_use_, e); // 因为这时候 refs == 2,需要放到 in_use_ 中
usage_ += charge;
FinishErase(table_.Insert(e)); // 如果相同键插入两次,会覆盖之前的缓存,并且需要将之前的缓存删除
} else { // 如果将 capacity 设置为0,代表的是关闭缓存
// 如果 handle->next == handle,会无法通过 LRUHandle::key() 中的断言
e->next = NULL;
}
// 如果本次插入导致缓存超出上限,需要换出旧缓存
while (usage_ > capacity_) {
LRUHandle *oldest = lru_.next; // 还记得 lru_.next 存的是最早未使用的缓存吧
assert(oldest->refs == 1);
bool erased = FinishErase(table_.Remove(oldest->key(), oldest->hash)); // 删除调用缓存
if (!erased) { // 如果 erased 是 false,代表 lru_ 中保存有没有被缓存起来的 entry,这是一个严重的问题
assert(erased); // 一条一定会失败的断言
}
}

return reinterpret_cast<Cache::Handle*>(e);
}
// 该接口仅应该在用与 FinishErase(handle_table_.Remove()) 一起使用,因为它做的是删除缓存后的后续工作
bool LRUCache::FinishErase(LRUHandle *e) {
if (e != NULL) {
assert(e->in_cache);
LRU_Remove(e); // 从 lru_ 中删除
e->in_cache = false; // 设置状态
usage_ -= e->charge;
Unref(e); // 一定会进入调用删除器的分支
}
return e != NULL;
}

void LRUCache::Erase(const Slice &key, uint32_t hash) {
MutexLock l(&mutex_);
FinishErase(table_.Remove(key, hash));
}

void LRUCache::Prune() {
MutexLock l(&mutex_);
while (lru_.next != &lru_) { // 遍历 lru_ 中保存的 entries,将其全部删除掉
LRUHandle *e = lru_.next;
assert(e->refs == 1);
bool erased = FinishErase(table_.Remove(e->key(), e->hash));
if (!erased) {
9b24
assert(erased);
}
}
}代码的解释都在注释中,这里我们从设计角度看看,这个 LRUCache 是如何实现的。首先,数据结构上,使用的是双向链表+hash_table,LRU 中要做到可以迅速找到最早未使用的缓存,所以使用了两个双向链表,分别用于保存“在使用(in_use_)”和“未使用(lru_)”的缓存,这样,lru_.next 便指向了最早未使用缓存。而 hash_table 则用于对 key->value 进行检索。而对于线程安全的实现,则是选用直观的 mutex,mutex 不代表慢,大家应该要记住,慢的不是锁,而是锁的竞争。
至此一个线程安全的 LRUCache 类就已经全部实现,在 leveldb 中使用的缓存是 ShardedLRUCache,其实就是聚合多个 LRUCache 实例,真正的逻辑都在 LRUCache 类中。
鉴于自身水平有限,文章中有难免有些错误,欢迎大家指出。也希望大家可以积极留言,与笔者一起讨论编程的那些事。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐