百度的bfs中raft协议实现(2)
2018-01-23 18:00
423 查看
raft中另外一个问题就是log存储了,像360开源的floyd直接是使用的leveldb,选举状态相关存储也是leveldb。
leveldb作为一个嵌入式的kv库自然是能很好的完成这样的功能,本着从学习的角度读了下bfs的log相关代码。
抛开代码,如果自己设计,应该从什么角度考虑?
1.log存储的东西有什么?
对raft而言,需要存储的是大量的(seq, value)和极少量的(key, value)键值对
2.log读写方面有什么特点?
对于seq,有大量的顺序写入(用户操作),有少量的随机范围读取(日志复制)
对于key,有少量但较多的随机写(更新状态),有大量的随机读
3.大概方案?
对key来说挺简单,毕竟key很少,所以文件仅仅是落地做可靠用,一个内存map就可以处理好读写,
初始化=重新依次读入内存
文件存储套路方面和内存分配器有一点点类似,对于一个存储块,如何知道读完了然后读下一个?
3.1很直观的方式就是前面存一个包含长度和校验码的header,同样也带来问题:
对本需求来说,无法快速定位到某个seq。也许map定位是可以的,但对于海量数据,内存cache显然是不行的。
如果header有损坏怎么跳过本条数据到下一个?
可否在header/foot尾部加一段magic?可以,但如果v比较长,采用朴素的memcmp要比较很久。
本质原因说起来就是没法快速定位下一个header,但header本身其实是定长的,基于这个理由可以得出这个方案
3.2header单独存储为索引文件,多包含一个offset:
这样可以解决header损坏的问题,毕竟可以skip一个sizeof(header)长度读下一个,同样的带来多一次seek代价
两个文件原子性如何保证?显然和断点下载的方式一样:先写数据,后记录offset。offset不成功则数据被忽略
如何解决定位seq?定位seq意味着定位header(主要是offset),根据存储特点,log文件实际上是序号递增的顺序写,
并且索引文件我们可以根据file_size/header_size快速计算索引有多少个,这样算起来seq=n的索引文件可以直接得到 header偏移=(n-base)
* sizeof(header)
当然还有其他问题,比如索引、数据文件是要有上限的。也就是每个文件是有序号记录区间的,这里可以使用内存记录信息,二分定位即可
下面只拿出重点源码分析,大体思路和上述相似,某些细节有所不同。
首先看下成员有什么:
打开db,显然是做重建工作
这里重点说不同点,对key的更新本来考虑key不多可以使用随机读写, 但在bfs里依然采用的是顺序写的方式:
再结合Key的定期dump快照的做法:
结合这个做法可以看出bfs对key并没有使用随机写(尽管一共没几个key),而是直接追加写,然后定期dump方式。 恢复时即使有重复key,因为在最后也会获取正确的值。这个过程类似于redis的aof+rewrite策略。
看看seq的读取:
写入就很简单,一直追加文件末尾写,大于log_size就切换文件,也是按上面说先写数据,后写索引。
总结:整体来看,由于全是顺序写,写的性能不错。读的方面也可以快速定位seek点,但针对使用场景还是有可以优化
的地方,比如根据index获取日志时可以批量顺序读取某个range的日志。
leveldb作为一个嵌入式的kv库自然是能很好的完成这样的功能,本着从学习的角度读了下bfs的log相关代码。
抛开代码,如果自己设计,应该从什么角度考虑?
1.log存储的东西有什么?
对raft而言,需要存储的是大量的(seq, value)和极少量的(key, value)键值对
2.log读写方面有什么特点?
对于seq,有大量的顺序写入(用户操作),有少量的随机范围读取(日志复制)
对于key,有少量但较多的随机写(更新状态),有大量的随机读
3.大概方案?
对key来说挺简单,毕竟key很少,所以文件仅仅是落地做可靠用,一个内存map就可以处理好读写,
初始化=重新依次读入内存
文件存储套路方面和内存分配器有一点点类似,对于一个存储块,如何知道读完了然后读下一个?
3.1很直观的方式就是前面存一个包含长度和校验码的header,同样也带来问题:
对本需求来说,无法快速定位到某个seq。也许map定位是可以的,但对于海量数据,内存cache显然是不行的。
如果header有损坏怎么跳过本条数据到下一个?
可否在header/foot尾部加一段magic?可以,但如果v比较长,采用朴素的memcmp要比较很久。
本质原因说起来就是没法快速定位下一个header,但header本身其实是定长的,基于这个理由可以得出这个方案
3.2header单独存储为索引文件,多包含一个offset:
这样可以解决header损坏的问题,毕竟可以skip一个sizeof(header)长度读下一个,同样的带来多一次seek代价
两个文件原子性如何保证?显然和断点下载的方式一样:先写数据,后记录offset。offset不成功则数据被忽略
如何解决定位seq?定位seq意味着定位header(主要是offset),根据存储特点,log文件实际上是序号递增的顺序写,
并且索引文件我们可以根据file_size/header_size快速计算索引有多少个,这样算起来seq=n的索引文件可以直接得到 header偏移=(n-base)
* sizeof(header)
当然还有其他问题,比如索引、数据文件是要有上限的。也就是每个文件是有序号记录区间的,这里可以使用内存记录信息,二分定位即可
下面只拿出重点源码分析,大体思路和上述相似,某些细节有所不同。
首先看下成员有什么:
class LogDB { private: Mutex mu_; ThreadPool* thread_pool_;//后台线程,定期dump key记录到文件 std::string dbpath_; int64_t snapshot_interval_;//dump间隔 int64_t log_size_;//log文件最大大小 std::map<std::string, std::string> markers_;// key记录内存cache int64_t next_index_; // smallest_index_ <= db < largest_index_,下一个文件id int64_t smallest_index_; // smallest index in db, -1 indicates empty db,最小文件id,-1表示首次,这个也记录在key文件里 typedef std::map<int64_t, std::pair<FILE*, FILE*> > FileCache;// 起始seq -> 数据、索引文件指针 FILE* write_log_; // log file ends with '.log',当前数据文件指针 FILE* write_index_; // index file ends with '.idx',当前索引文件指针 FileCache read_log_; // file cache, index -> (idx_fp, log_fp) FILE* marker_log_; // marker file names 'marker.mak',key文件 };
打开db,显然是做重建工作
void LogDB::Open(const std::string& path, const DBOption& option, LogDB** dbptr) { *dbptr = NULL; LogDB* logdb = new LogDB(); logdb->dbpath_ = path + "/"; logdb->snapshot_interval_ = option.snapshot_interval * 1000; logdb->log_size_ = option.log_size << 20; mkdir(logdb->dbpath_.c_str(), 0755); if(!logdb->RecoverMarker()) {//这里就是上面分析的,yi读取key到内存 LOG(WARNING, "[LogDB] RecoverMarker failed"); delete logdb; return; } auto it = logdb->markers_.find(".smallest_index_"); if (it != logdb->markers_.end()) { logdb->smallest_index_ = std::atol(it->second.c_str());//确定最小的文件序号 } if (!logdb->BuildFileCache()) {//初始化seq到文件指针的关系 LOG(WARNING, "[LogDB] BuildFileCache failed"); delete logdb; return; } logdb->thread_pool_ = new ThreadPool(10); logdb->WriteMarkerSnapshot();//dump一个快照 *dbptr = logdb; return; }
这里重点说不同点,对key的更新本来考虑key不多可以使用随机读写, 但在bfs里依然采用的是顺序写的方式:
StatusCode LogDB::WriteMarkerNoLock(const std::string& key, const std::string& value) { if (marker_log_ == NULL) { marker_log_ = fopen((dbpath_ + "marker.mak").c_str(), "a");//key的文件为marker.mak if (marker_log_ == NULL) { LOG(WARNING, "[LogDB] open marker.mak failed %s", strerror(errno)); return kWriteError; } } std::string data; uint32_t len = 4 + key.length() + 4 + value.length(); data.append(reinterpret_cast<char*>(&len), sizeof(len)); EncodeMarker(MarkerEntry(key, value), &data);//kv合并在连续存储里 if (fwrite(data.c_str(), 1, data.length(), marker_log_) != data.length() || fflush(marker_log_) != 0) {//直接追加写 LOG(WARNING, "[LogDB] WriteMarker failed key = %s value = %s", key.c_str(), value.c_str()); return kWriteError; } fflush(marker_log_);//Q:log这块使用的都是fflush,根据man文档说明fflush只是把crt的提供的用户缓冲区刷盘(可能是聚合了调用一次write),并不保证落地。落地失败怎么办? markers_[key] = value;//更新缓存 return kOK; }
再结合Key的定期dump快照的做法:
void LogDB::WriteMarkerSnapshot() { MutexLock lock(&mu_); FILE* fp = fopen((dbpath_ + "marker.tmp").c_str(), "w");//覆盖写入一个临时文件 if (fp == NULL) { LOG(WARNING, "[LogDB] open marker.tmp failed %s", strerror(errno)); return; } std::string data; for (std::map<std::string, std::string>::iterator it = markers_.begin(); it != markers_.end(); ++it) {//写入缓存内所有的key value MarkerEntry marker(it->first, it->second); uint32_t len = 4 + (it->first).length() + 4 + (it->second).length(); data.clear(); data.append(reinterpret_cast<char*>(&len), sizeof(len)); EncodeMarker(marker, &data); if (fwrite(data.c_str(), 1, data.length(), fp) != data.length() || fflush(fp) != 0) { LOG(WARNING, "[LogDB] write marker.tmp failed %s", strerror(errno)); fclose(fp); return; } } fclose(fp); if (marker_log_) {//关闭key文件的句柄,因为文件变了,下次重新打开 fclose(marker_log_); marker_log_ = NULL; } rename((dbpath_ + "marker.tmp").c_str(), (dbpath_ + "marker.mak").c_str());//新dump替换旧的 marker_log_ = fopen((dbpath_ + "marker.mak").c_str(), "a"); if (marker_log_ == NULL) { LOG(WARNING, "[LogDB] open marker.mak failed %s", strerror(errno)); return; } LOG(INFO, "[LogDB] WriteMarkerSnapshot done"); thread_pool_->DelayTask(snapshot_interval_, std::bind(&LogDB::WriteMarkerSnapshot, this));//定期dump }
结合这个做法可以看出bfs对key并没有使用随机写(尽管一共没几个key),而是直接追加写,然后定期dump方式。 恢复时即使有重复key,因为在最后也会获取正确的值。这个过程类似于redis的aof+rewrite策略。
看看seq的读取:
StatusCode LogDB::Read(int64_t index, std::string* entry) { MutexLock lock(&mu_); if (read_log_.empty() || index >= next_index_ || index < smallest_index_) {//序号有效性判断 LOG(INFO, "[LogDB] empty = %d index = %ld next_index = %ld smallest_index = %ld", read_log_.empty(), index, next_index_, smallest_index_); return kNsNotFound; } FileCache::iterator it = read_log_.lower_bound(index);//二分查找index落在哪个log区间 if (it == read_log_.end() || (it != read_log_.begin() && index != it->first)) { --it; }//end()表明index大于最后一个文件起始点,index在最后一个文件;!=begin && index!=it->first表示index小于it所指文件,故前移一个 if (index < it->first) {//index应该>=文件起始序号 LOG(WARNING, "[LogDB] Read cannot find index file %ld ", index); return kReadError; } FILE* idx_fp = (it->second).first;//文件指针缓存 FILE* log_fp = (it->second).second; // find entry offset int offset = 16 * (index - it->first);//相对文件起始序号偏移,一个header就是index+offset 2个int64,正好16 int64_t read_index = -1; int64_t entry_offset = -1; if (fseek(idx_fp, offset, SEEK_SET) != 0) {//调整索引文件读指针 LOG(WARNING, "[LogDB] Read cannot find index file %ld ", index); return kReadError; } StatusCode s = ReadIndex(idx_fp, index, &read_index, &entry_offset);//数据文件offset if (s != kOK) { return s; } if(fseek(log_fp, entry_offset, SEEK_SET) != 0) {//数据文件指针 LOG(WARNING, "[LogDB] Read %ld with invalid offset %ld ", index, entry_offset); return kReadError; } int ret = ReadOne(log_fp, entry);//读取一次 if (ret <= 0) { LOG(WARNING, "[LogDB] Read log error %ld ", index); return kReadError; } return kOK; }
写入就很简单,一直追加文件末尾写,大于log_size就切换文件,也是按上面说先写数据,后写索引。
总结:整体来看,由于全是顺序写,写的性能不错。读的方面也可以快速定位seek点,但针对使用场景还是有可以优化
的地方,比如根据index获取日志时可以批量顺序读取某个range的日志。
相关文章推荐
- 百度的bfs中raft协议实现(1)
- 以raft协议为基础的模拟zoomkeep选举唯一主机实现
- Raft一致性协议实现源码
- raft协议的go语言实现
- 副本集RAFT协议的实现(2):选主,主从切换
- 副本集RAFT协议的实现(1):heartbeat
- redis集群实现(五) sentinel的架构与raft协议
- redis集群实现(五) sentinel的架构与raft协议
- 基于HTTPS协议的12306抢票软件设计与实现--相关接口以及数据格式
- Websocket协议之php实现
- 代码实现查看网络连接状态 协议 与端口 并且能看到其进程路径
- 使用百度webuploader实现大文件上传
- angularjs实现百度的搜索接口及链接的实现
- 深度学习FPGA实现基础知识3(Altera与百度展开合作在云数据中心使用FPGA加速)
- 利用Python脚本实现ping百度和google的方法
- Objective-C利用协议实现回调函数
- iic协议扩展板和pmw引脚控制舵机转向与超声测距的实验,可实现简易超声雷达
- modbus 协议在RS485实现
- Android 下使用 Http 协议实现多线程断点续传下载
- NEC红外遥控协议理解与实现