您的位置:首页 > 其它

百度的bfs中raft协议实现(2)

2018-01-23 18:00 423 查看
raft中另外一个问题就是log存储了,像360开源的floyd直接是使用的leveldb,选举状态相关存储也是leveldb。

leveldb作为一个嵌入式的kv库自然是能很好的完成这样的功能,本着从学习的角度读了下bfs的log相关代码。

抛开代码,如果自己设计,应该从什么角度考虑?
1.log存储的东西有什么?
 对raft而言,需要存储的是大量的(seq, value)和极少量的(key, value)键值对
2.log读写方面有什么特点?
 对于seq,有大量的顺序写入(用户操作),有少量的随机范围读取(日志复制)
 对于key,有少量但较多的随机写(更新状态),有大量的随机读
3.大概方案?
 对key来说挺简单,毕竟key很少,所以文件仅仅是落地做可靠用,一个内存map就可以处理好读写,
 初始化=重新依次读入内存
 文件存储套路方面和内存分配器有一点点类似,对于一个存储块,如何知道读完了然后读下一个?

3.1很直观的方式就是前面存一个包含长度和校验码的header,同样也带来问题:  
 对本需求来说,无法快速定位到某个seq。也许map定位是可以的,但对于海量数据,内存cache显然是不行的。
   如果header有损坏怎么跳过本条数据到下一个?
 可否在header/foot尾部加一段magic?可以,但如果v比较长,采用朴素的memcmp要比较很久。
 本质原因说起来就是没法快速定位下一个header,但header本身其实是定长的,基于这个理由可以得出这个方案
3.2header单独存储为索引文件,多包含一个offset:
 这样可以解决header损坏的问题,毕竟可以skip一个sizeof(header)长度读下一个,同样的带来多一次seek代价
 两个文件原子性如何保证?显然和断点下载的方式一样:先写数据,后记录offset。offset不成功则数据被忽略

 如何解决定位seq?定位seq意味着定位header(主要是offset),根据存储特点,log文件实际上是序号递增的顺序写,
 并且索引文件我们可以根据file_size/header_size快速计算索引有多少个,这样算起来seq=n的索引文件可以直接得到 header偏移=(n-base)
* sizeof(header)
 当然还有其他问题,比如索引、数据文件是要有上限的。也就是每个文件是有序号记录区间的,这里可以使用内存记录信息,二分定位即可

下面只拿出重点源码分析,大体思路和上述相似,某些细节有所不同。

 首先看下成员有什么:

class LogDB {
private:
Mutex mu_;
ThreadPool* thread_pool_;//后台线程,定期dump key记录到文件

std::string dbpath_;
int64_t snapshot_interval_;//dump间隔
int64_t log_size_;//log文件最大大小
std::map<std::string, std::string> markers_;// key记录内存cache
int64_t next_index_; // smallest_index_ <= db < largest_index_,下一个文件id
int64_t smallest_index_;    // smallest index in db, -1 indicates empty db,最小文件id,-1表示首次,这个也记录在key文件里

typedef std::map<int64_t, std::pair<FILE*, FILE*> > FileCache;// 起始seq -> 数据、索引文件指针
FILE* write_log_;       // log file ends with '.log',当前数据文件指针
FILE* write_index_;     // index file ends with '.idx',当前索引文件指针
FileCache read_log_;    // file cache, index -> (idx_fp, log_fp)
FILE* marker_log_;      // marker file names 'marker.mak',key文件
};

 打开db,显然是做重建工作 
void LogDB::Open(const std::string& path, const DBOption& option, LogDB** dbptr) {
*dbptr = NULL;

LogDB* logdb = new LogDB();
logdb->dbpath_ = path + "/";
logdb->snapshot_interval_ = option.snapshot_interval * 1000;
logdb->log_size_ = option.log_size << 20;
mkdir(logdb->dbpath_.c_str(), 0755);
if(!logdb->RecoverMarker()) {//这里就是上面分析的,yi读取key到内存
LOG(WARNING, "[LogDB] RecoverMarker failed");
delete logdb;
return;
}
auto it = logdb->markers_.find(".smallest_index_");
if (it != logdb->markers_.end()) {
logdb->smallest_index_ = std::atol(it->second.c_str());//确定最小的文件序号
}
if (!logdb->BuildFileCache()) {//初始化seq到文件指针的关系
LOG(WARNING, "[LogDB] BuildFileCache failed");
delete logdb;
return;
}
logdb->thread_pool_ = new ThreadPool(10);
logdb->WriteMarkerSnapshot();//dump一个快照
*dbptr = logdb;
return;
}

 这里重点说不同点,对key的更新本来考虑key不多可以使用随机读写, 但在bfs里依然采用的是顺序写的方式:

StatusCode LogDB::WriteMarkerNoLock(const std::string& key, const std::string& value) {
if (marker_log_ == NULL) {
marker_log_ = fopen((dbpath_ + "marker.mak").c_str(), "a");//key的文件为marker.mak
if (marker_log_ == NULL) {
LOG(WARNING, "[LogDB] open marker.mak failed %s", strerror(errno));
return kWriteError;
}
}
std::string data;
uint32_t len = 4 + key.length() + 4 + value.length();
data.append(reinterpret_cast<char*>(&len), sizeof(len));
EncodeMarker(MarkerEntry(key, value), &data);//kv合并在连续存储里
if (fwrite(data.c_str(), 1, data.length(), marker_log_) != data.length()
|| fflush(marker_log_) != 0) {//直接追加写
LOG(WARNING, "[LogDB] WriteMarker failed key = %s value = %s", key.c_str(), value.c_str());
return kWriteError;
}
fflush(marker_log_);//Q:log这块使用的都是fflush,根据man文档说明fflush只是把crt的提供的用户缓冲区刷盘(可能是聚合了调用一次write),并不保证落地。落地失败怎么办?
markers_[key] = value;//更新缓存
return kOK;
}

 再结合Key的定期dump快照的做法:

void LogDB::WriteMarkerSnapshot() {
MutexLock lock(&mu_);
FILE* fp = fopen((dbpath_ + "marker.tmp").c_str(), "w");//覆盖写入一个临时文件
if (fp == NULL) {
LOG(WARNING, "[LogDB] open marker.tmp failed %s", strerror(errno));
return;
}
std::string data;
for (std::map<std::string, std::string>::iterator it = markers_.begin();
it != markers_.end(); ++it) {//写入缓存内所有的key value
MarkerEntry marker(it->first, it->second);
uint32_t len = 4 + (it->first).length() + 4 + (it->second).length();
data.clear();
data.append(reinterpret_cast<char*>(&len), sizeof(len));
EncodeMarker(marker, &data);
if (fwrite(data.c_str(), 1, data.length(), fp) != data.length() || fflush(fp) != 0) {
LOG(WARNING, "[LogDB] write marker.tmp failed %s", strerror(errno));
fclose(fp);
return;
}
}
fclose(fp);
if (marker_log_) {//关闭key文件的句柄,因为文件变了,下次重新打开
fclose(marker_log_);
marker_log_ = NULL;
}
rename((dbpath_ + "marker.tmp").c_str(), (dbpath_ + "marker.mak").c_str());//新dump替换旧的
marker_log_ = fopen((dbpath_ + "marker.mak").c_str(), "a");
if (marker_log_ == NULL) {
LOG(WARNING, "[LogDB] open marker.mak failed %s", strerror(errno));
return;
}
LOG(INFO, "[LogDB] WriteMarkerSnapshot done");
thread_pool_->DelayTask(snapshot_interval_, std::bind(&LogDB::WriteMarkerSnapshot, this));//定期dump
}

 结合这个做法可以看出bfs对key并没有使用随机写(尽管一共没几个key),而是直接追加写,然后定期dump方式。    恢复时即使有重复key,因为在最后也会获取正确的值。这个过程类似于redis的aof+rewrite策略。
   看看seq的读取:

StatusCode LogDB::Read(int64_t index, std::string* entry) {
MutexLock lock(&mu_);
if (read_log_.empty() || index >= next_index_ || index < smallest_index_) {//序号有效性判断
LOG(INFO, "[LogDB] empty = %d index = %ld next_index = %ld smallest_index = %ld",
read_log_.empty(), index, next_index_, smallest_index_);
return kNsNotFound;
}
FileCache::iterator it = read_log_.lower_bound(index);//二分查找index落在哪个log区间
if (it == read_log_.end() || (it != read_log_.begin() && index != it->first)) {
--it;
}//end()表明index大于最后一个文件起始点,index在最后一个文件;!=begin && index!=it->first表示index小于it所指文件,故前移一个
if (index < it->first) {//index应该>=文件起始序号
LOG(WARNING, "[LogDB] Read cannot find index file %ld ", index);
return kReadError;
}
FILE* idx_fp = (it->second).first;//文件指针缓存
FILE* log_fp = (it->second).second;
// find entry offset
int offset = 16 * (index - it->first);//相对文件起始序号偏移,一个header就是index+offset 2个int64,正好16
int64_t read_index = -1;
int64_t entry_offset = -1;

if (fseek(idx_fp, offset, SEEK_SET) != 0) {//调整索引文件读指针
LOG(WARNING, "[LogDB] Read cannot find index file %ld ", index);
return kReadError;
}
StatusCode s = ReadIndex(idx_fp, index, &read_index, &entry_offset);//数据文件offset
if (s != kOK) {
return s;
}
if(fseek(log_fp, entry_offset, SEEK_SET) != 0) {//数据文件指针
LOG(WARNING, "[LogDB] Read %ld with invalid offset %ld ", index, entry_offset);
return kReadError;
}
int ret = ReadOne(log_fp, entry);//读取一次
if (ret <= 0) {
LOG(WARNING, "[LogDB] Read log error %ld ", index);
return kReadError;
}
return kOK;
}

写入就很简单,一直追加文件末尾写,大于log_size就切换文件,也是按上面说先写数据,后写索引。

总结:整体来看,由于全是顺序写,写的性能不错。读的方面也可以快速定位seek点,但针对使用场景还是有可以优化

的地方,比如根据index获取日志时可以批量顺序读取某个range的日志。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: