leveldb研究9- 流程分析:打开数据库,写数据,读数据,随机写
2012-12-29 11:29
525 查看
leveldb 是通过Open函数来打开/新建数据库。
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
其中options指定一些选项。
struct Options {
// -------------------
// 影响行为的参数
//comparator用于指定key的排列方式,默认按照字节排序
const Comparator* comparator;
//如果不存在则创建
// Default: false
bool create_if_missing;
// 如果存在则失败
// Default: false
bool error_if_exists;
// 是否做严格的检查
// Default: false
bool paranoid_checks;
// env: os 封装
// Default: Env::Default()
Env* env;
// log file,默认和database相同路径
// Default: NULL
Logger* info_log;
// -------------------
// 影响性能的参数
// 写缓冲大小,增加会提高写的性能,但是会增加启动的时间,因为有更多的数据需要恢复
//
// Default: 4MB
size_t write_buffer_size;
// 最大打开的文件个数,用于TableCache
//
// Default: 1000
int max_open_files;
// Control over blocks (user data is stored in a set of blocks, and
// a block is the unit of reading from disk).
// 指定Block cache,默认leveldb会自动创建8MB的internal cache
// Default: NULL
Cache* block_cache;
//SST file中的Block size,为压缩之前的数据
//
// Default: 4K
size_t block_size;
// SST file 中的restart pointer的间隔,参见SST的文件格式
//
// Default: 16
int block_restart_interval;
// 压缩类型,默认为google的snappy压缩
CompressionType compression;
// Create an Options object with default values for all fields.
Options();
};
具体看看Open的实现:
<db/dbimpl.cc>
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = NULL;
//实例化对象:DBImpl
DBImpl* impl = new DBImpl(options, dbname);
//加锁
impl->mutex_.Lock();
VersionEdit edit;
//从log中恢复数据,生成新的SST file
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
//创建新的log file
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
//生成新的manifest文件
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
//删除失效文件
impl->DeleteObsoleteFiles();
//进行compaction
impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
} else {
delete impl;
}
return s;
}
因为上次关闭数据库的时候,内存的数据可能并没有写入SST文件,所以要从*.log中读取记录,并写入新的SST文件。
<db/dbimpl.cc>
Status DBImpl::Recover(VersionEdit* edit) {
mutex_.AssertHeld();
//创建folder
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);
//生成LOCK文件并锁定
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
//新建database
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}
//重建manifest信息
s = versions_->Recover();
if (s.ok()) {
SequenceNumber max_sequence(0);
//得到上次的log file
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
logs.push_back(number);
}
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
//从*.log中恢复数据
s = RecoverLogFile(logs[i], edit, &max_sequence);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
}
return s;
}
继续看RecoverLogFile的实现:
<db/dbimpl.cc>
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence) {
//LogReporter:出现坏数据的时候报告
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};
mutex_.AssertHeld();
//打开Log file用于顺序读取
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
// log::Reader读取数据
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = NULL;
//遍历log file,读取记录
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == NULL) {
//新建MemTable用于保存数据
mem = new MemTable(internal_comparator_);
mem->Ref();
}
//插入memtable
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
//写入SST file:level 0
status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) {
break;
}
//释放并删除memtable
mem->Unref();
mem = NULL;
}
}
if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
}
if (mem != NULL) mem->Unref();
delete file;
return status;
}
至此完成SST file的写入。
接下来看看manifest文件的重建
mainfest的重建有两步,第一步是调用VersionSet::Recover函数恢复到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件记录也写入manifest文件中。
<db/version_set.cc>
Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// 读取CURRENT文件,获取最新的MANIFEST文件
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
//打开当前MANIFEST文件
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
//使用log::Reader读取log记录:VersionEdit
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + "does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
//应用Edit到VersionSet
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) { //生成新的version,并设为current version
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
//使用VersionEdit创建新的Version
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
//创建新的manifest文件
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// 写入manifest log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
// 设置新的version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = NULL;
descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}
return s;
}
总体来说,leveldb的写操作有两个步骤,首先是针对log的append操作,然后是对memtable的插入操作。
影响写性能的因素有:
1. write_buffer_size
2. kL0_SlowdownWritesTrigger and kL0_StopWritesTrigger.提高这两个值,能够增加写的性能,但是降低读的性能
看看WriteOptions有哪些参数可以指定
struct WriteOptions {
//设置sync=true,leveldb会调用fsync(),这会降低插入性能
//同时会增加数据的安全性
//Default: false
bool sync;
WriteOptions()
: sync(false) {
}
};
首先把Key,value转成WriteBatch
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
接下来就是真正的插入了
这里使用了两把锁,主要是想提高并发能力,减少上锁的时间。
首先是检查是否可写,然后append log,最后是插入memtable
<db/dbimpl.cc>
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
//加锁
MutexLock l(&mutex_);
LoggerId self;
//拿到写log的权利
AcquireLoggingResponsibility(&self);
//检查是否可写
status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) {
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock during
// this phase since the "logger_" flag protects against concurrent
// loggers and concurrent writes into mem_.
{
assert(logger_ == &self);
mutex_.Unlock();
//IO操作:写入LOG
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
status = logfile_->Sync();
}
//插入memtable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
assert(logger_ == &self);
}
//设置新的seqence number
versions_->SetLastSequence(last_sequence);
}
//释放写LOG锁
ReleaseLoggingResponsibility(&self);
return status;
}
写流量控制:
<db/dbimpl.cc>
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(logger_ != NULL);
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
mutex_.Unlock();
//如果level0的文件大于kL0_SlowdownWritesTrigger阈值,则sleep 1s,这样给compaction更多的CPU
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
//可写
break;
} else if (imm_ != NULL) {
// imm_:之前的memtable 没有被compaction,需要等待
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// level0文件个数大于kL0_StopWritesTrigger,需要等待
Log(options_.info_log, "waiting
\n");
bg_cv_.Wait();
} else {
//生成新的额memtable和logfile,把当前memtable传给imm_
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
// 发起compaction,dump imm_
MaybeScheduleCompaction();
}
}
return s;
}
leveldb读数据
先看看ReadOptions有哪些参数可以指定:
// Options that control read operations
struct ReadOptions {
// 是否检查checksum
// Default: false
bool verify_checksums;
// 是否将此次结果放入cache
// Default: true
bool fill_cache;
//是否指定snapshot,否则读取当前版本
// Default: NULL
const Snapshot* snapshot;
ReadOptions()
: verify_checksums(false),
fill_cache(true),
snapshot(NULL) {
}
};
下面看看读取的详细过程:
查询memtable=>查询previous memtable(imm_)=>查询文件(缓冲)
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
//设置snapshot
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
LookupKey lkey(key, snapshot);
//先查询memtable
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) { //然后查询previous memtable:imm_
// Done
} else {
//从文件中读取
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
//是否有文件需要被compaction,参见allowed_seek
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}
重点来看看从version中读取:
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
//从level0向高层查找,如果再低级level中查到,则不再查询
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
//本层文件数为空,则返回
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
//level0特殊处理,因为key是重叠,所有符合条件的文件必须被查找
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// 二分法查找,某个key只可能属于一个文件
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
//没有查到
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) { //遍历本层符合条件的文件
if (last_file_read != NULL && stats->seek_file == NULL) {
//seek_file只记录第一个
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
//从table cache中读取
Iterator* iter = vset_->table_cache_->NewIterator(
options,
f->number,
f->file_size);
iter->Seek(ikey);
const bool done = GetValue(ucmp, iter, user_key, value, &s);
if (!iter->status().ok()) { //查找到
s = iter->status();
delete iter;
return s;
} else {
delete iter;
if (done) {
return s;
}
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
继续跟踪:TableCache
Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
Table** tableptr) {
if (tableptr != NULL) {
*tableptr = NULL;
}
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
//从LRU cache中查找
Cache::Handle* handle = cache_->Lookup(key);
if (handle == NULL) {
/加载文件
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = NULL;
Table* table = NULL;
Status s = env_->NewRandomAccessFile(fname, &file);
if (s.ok()) {
s = Table::Open(*options_, file, file_size, &table);
}
if (!s.ok()) {
assert(table == NULL);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
return NewErrorIterator(s);
}
//插入Cache
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
//从Table对象中生成iterator
Iterator* result = table->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != NULL) {
*tableptr = table;
}
return result;
}
准备工作:
1. 下载Snappy库
Download source code from: http://code.google.com/p/snappy
编译并安装
./configure & make & sudo make install
2. 编译leveldb自带的db_bench
make db_bench
注意:在ubuntu 11.04上编译会出错,修改makefile:
$(CXX) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@
为
$(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@ $(LDFLAGS)
为了获取更多的信息,我写了一个简单的测试程序来测试写性能。
g++ src/ldbbench.cpp libleveldb.a -I../leveldb/include -o ldb_test -pthread -lsnappy
我隔10w条记录统计一下运行时间和各层level的个数。
#include <iostream>
#include <cstdlib>
#include <sys/time.h>
#include "leveldb/db.h"
#include "leveldb/env.h"
using namespace std;
using namespace leveldb;
static inline double micro_time(){
struct timeval tim;
double ret;
gettimeofday(&tim, NULL);
ret = tim.tv_sec+(tim.tv_usec/1000000.0);
return ret;
}
int main() {
srand ( time(NULL) );
DB *db ;
Options op;
op.create_if_missing = true;
Status s = DB::Open(op,"/tmp/testdb",&db);
Env * env = Env::Default();
WritableFile *file;
env->NewWritableFile("/tmp/bench.csv",&file);
if(s.ok()){
cout << "create successfully" << endl;
WriteOptions wop;
for(int j=0;j<100;++j){
double start = micro_time();
double cost;
for(int i=0;i<100000;++i){
char key[100];
char value[100];
sprintf(key,"%d_%d",i,rand());
sprintf(value,"%d",rand());
db->Put(wop,key,value);
}
cost = micro_time()-start;
cout << "write successfully:" << j << ",costs "<<cost<<endl;
// report the status
{
//output stats information
string value;
char buffer[40];
for(int i=0;i<7;++i){
sprintf(buffer,"leveldb.num-files-at-level%d",i);
db->GetProperty(buffer,&value);
file->Append(value+",");
}
sprintf(buffer,"%f",cost);
file->Append(buffer);
file->Append("\n");
file->Sync();
}
}
cout << "write completed" << endl;
}
delete db;
file->Close();
delete file;
return 0;
}
得到结果如下:
可以看出 插入时间不稳定,一旦level 0 的文件个数达到8(leveldb在level0 sst file到达8会做流量控制),就会严重的影响插入速度。
数据如下: 前7栏为各level的文件个数,最后一栏为插入时间(单位second).
0,0,0,0,0,0,0,0.312044
0,0,1,0,0,0,0,0.339661
0,1,1,0,0,0,0,0.336554
1,1,1,0,0,0,0,0.338470
2,1,1,0,0,0,0,0.319139
4,1,1,0,0,0,0,0.322158
5,1,1,0,0,0,0,0.411267
6,1,1,0,0,0,0,0.452211
7,1,1,0,0,0,0,0.392227
4,6,1,0,0,0,0,0.599982
5,6,1,0,0,0,0,0.392222
6,6,1,0,0,0,0,0.426607
7,6,1,0,0,0,0,0.450604
0,9,7,0,0,0,0,1.884518
1,9,7,0,0,0,0,0.420226
2,8,8,0,0,0,0,0.395083
3,8,8,0,0,0,0,0.418100
4,7,9,0,0,0,0,0.421611
6,7,9,0,0,0,0,0.415739
7,7,9,0,0,0,0,0.407361
1,14,10,0,0,0,0,2.226791
2,14,10,0,0,0,0,0.401517
3,14,10,0,0,0,0,0.373305
4,13,11,0,0,0,0,0.419741
5,13,11,0,0,0,0,0.409911
6,12,12,0,0,0,0,0.410904
7,12,12,0,0,0,0,0.429305
0,19,15,0,0,0,0,3.586968
2,19,15,0,0,0,0,0.443083
3,18,16,0,0,0,0,0.403899
4,18,16,0,0,0,0,0.427664
5,17,17,0,0,0,0,0.398022
6,16,19,0,0,0,0,0.373106
7,16,19,0,0,0,0,0.381070
0,16,27,0,0,0,0,3.997287
1,16,27,0,0,0,0,0.415576
2,15,29,0,0,0,0,0.395088
3,15,29,0,0,0,0,0.421756
4,15,29,0,0,0,0,0.423345
5,14,30,0,0,0,0,0.443051
6,13,32,0,0,0,0,0.409214
0,21,35,0,0,0,0,3.724305
1,21,35,0,0,0,0,0.394496
2,20,36,0,0,0,0,0.400312
3,20,36,0,0,0,0,0.440494
4,19,36,0,0,0,0,0.401116
5,19,36,0,0,0,0,0.368698
6,19,36,0,0,0,0,0.392624
7,18,37,0,0,0,0,0.421263
0,20,45,0,0,0,0,5.280940
1,20,45,0,0,0,0,0.445995
2,19,46,0,0,0,0,0.427433
3,19,46,0,0,0,0,0.396355
4,19,46,0,0,0,0,0.412447
6,18,47,0,0,0,0,0.425992
7,18,47,0,0,0,0,0.409269
0,22,54,0,0,0,0,4.659271
1,22,54,0,0,0,0,0.353135
2,22,54,0,0,0,0,0.412604
3,22,54,0,0,0,0,0.387365
4,21,55,0,0,0,0,0.447579
5,20,56,0,0,0,0,0.423402
6,20,56,0,0,0,0,0.392983
7,19,58,0,0,0,0,0.372202
0,22,66,0,0,0,0,5.072227
1,22,66,0,0,0,0,0.389874
2,22,66,0,0,0,0,0.375599
4,22,66,0,0,0,0,0.405292
5,22,66,0,0,0,0,0.404367
6,22,66,0,0,0,0,0.394260
7,22,66,0,0,0,0,0.401855
0,24,77,0,0,0,0,5.980508
1,24,77,0,0,0,0,0.388424
2,24,77,0,0,0,0,0.429406
3,23,78,0,0,0,0,0.412908
4,23,78,0,0,0,0,0.428574
5,23,78,0,0,0,0,0.403336
6,22,79,0,0,0,0,0.394216
8,13,89,0,0,0,0,5.377096
1,23,89,0,0,0,0,0.816229
2,23,89,0,0,0,0,0.437396
3,23,89,0,0,0,0,0.399540
4,22,90,0,0,0,0,0.437927
5,22,90,0,0,0,0,0.424814
6,22,90,0,0,0,0,0.411747
7,21,92,0,0,0,0,0.384908
0,25,100,0,0,0,0,6.236974
1,25,100,0,0,0,0,0.403147
3,25,100,0,0,0,0,0.412086
4,25,100,0,0,0,0,0.403978
5,24,102,0,0,0,0,0.398120
6,24,102,0,0,0,0,0.374137
7,24,102,0,0,0,0,0.370625
0,22,102,10,0,0,0,6.692459
1,22,102,10,0,0,0,0.389345
2,22,102,10,0,0,0,0.411086
3,22,102,10,0,0,0,0.404387
4,21,103,10,0,0,0,0.443593
5,21,103,10,0,0,0,0.400221
6,21,103,10,0,0,0,0.414371
static Status Open(const Options& options,
const std::string& name,
DB** dbptr);
其中options指定一些选项。
struct Options {
// -------------------
// 影响行为的参数
//comparator用于指定key的排列方式,默认按照字节排序
const Comparator* comparator;
//如果不存在则创建
// Default: false
bool create_if_missing;
// 如果存在则失败
// Default: false
bool error_if_exists;
// 是否做严格的检查
// Default: false
bool paranoid_checks;
// env: os 封装
// Default: Env::Default()
Env* env;
// log file,默认和database相同路径
// Default: NULL
Logger* info_log;
// -------------------
// 影响性能的参数
// 写缓冲大小,增加会提高写的性能,但是会增加启动的时间,因为有更多的数据需要恢复
//
// Default: 4MB
size_t write_buffer_size;
// 最大打开的文件个数,用于TableCache
//
// Default: 1000
int max_open_files;
// Control over blocks (user data is stored in a set of blocks, and
// a block is the unit of reading from disk).
// 指定Block cache,默认leveldb会自动创建8MB的internal cache
// Default: NULL
Cache* block_cache;
//SST file中的Block size,为压缩之前的数据
//
// Default: 4K
size_t block_size;
// SST file 中的restart pointer的间隔,参见SST的文件格式
//
// Default: 16
int block_restart_interval;
// 压缩类型,默认为google的snappy压缩
CompressionType compression;
// Create an Options object with default values for all fields.
Options();
};
具体看看Open的实现:
<db/dbimpl.cc>
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
*dbptr = NULL;
//实例化对象:DBImpl
DBImpl* impl = new DBImpl(options, dbname);
//加锁
impl->mutex_.Lock();
VersionEdit edit;
//从log中恢复数据,生成新的SST file
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
//创建新的log file
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
//生成新的manifest文件
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
//删除失效文件
impl->DeleteObsoleteFiles();
//进行compaction
impl->MaybeScheduleCompaction();
}
}
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;
} else {
delete impl;
}
return s;
}
因为上次关闭数据库的时候,内存的数据可能并没有写入SST文件,所以要从*.log中读取记录,并写入新的SST文件。
<db/dbimpl.cc>
Status DBImpl::Recover(VersionEdit* edit) {
mutex_.AssertHeld();
//创建folder
env_->CreateDir(dbname_);
assert(db_lock_ == NULL);
//生成LOCK文件并锁定
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) {
return s;
}
if (!env_->FileExists(CurrentFileName(dbname_))) {
if (options_.create_if_missing) {
//新建database
s = NewDB();
if (!s.ok()) {
return s;
}
} else {
return Status::InvalidArgument(
dbname_, "does not exist (create_if_missing is false)");
}
} else {
if (options_.error_if_exists) {
return Status::InvalidArgument(
dbname_, "exists (error_if_exists is true)");
}
}
//重建manifest信息
s = versions_->Recover();
if (s.ok()) {
SequenceNumber max_sequence(0);
//得到上次的log file
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
logs.push_back(number);
}
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
//从*.log中恢复数据
s = RecoverLogFile(logs[i], edit, &max_sequence);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
}
return s;
}
继续看RecoverLogFile的实现:
<db/dbimpl.cc>
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence) {
//LogReporter:出现坏数据的时候报告
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};
mutex_.AssertHeld();
//打开Log file用于顺序读取
std::string fname = LogFileName(dbname_, log_number);
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
MaybeIgnoreError(&status);
return status;
}
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
// log::Reader读取数据
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(options_.info_log, "Recovering log #%llu",
(unsigned long long) log_number);
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = NULL;
//遍历log file,读取记录
while (reader.ReadRecord(&record, &scratch) &&
status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == NULL) {
//新建MemTable用于保存数据
mem = new MemTable(internal_comparator_);
mem->Ref();
}
//插入memtable
status = WriteBatchInternal::InsertInto(&batch, mem);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
WriteBatchInternal::Count(&batch) - 1;
if (last_seq > *max_sequence) {
*max_sequence = last_seq;
}
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
//写入SST file:level 0
status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) {
break;
}
//释放并删除memtable
mem->Unref();
mem = NULL;
}
}
if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
}
if (mem != NULL) mem->Unref();
delete file;
return status;
}
至此完成SST file的写入。
接下来看看manifest文件的重建
mainfest的重建有两步,第一步是调用VersionSet::Recover函数恢复到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件记录也写入manifest文件中。
<db/version_set.cc>
Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// 读取CURRENT文件,获取最新的MANIFEST文件
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname_ + "/" + current;
SequentialFile* file;
//打开当前MANIFEST文件
s = env_->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
//使用log::Reader读取log记录:VersionEdit
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + "does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
//应用Edit到VersionSet
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) { //生成新的version,并设为current version
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
}
return s;
}
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
//使用VersionEdit创建新的Version
Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
//创建新的manifest文件
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
// Unlock during expensive MANIFEST log write
{
mu->Unlock();
// 写入manifest log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
}
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
}
// 设置新的version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = NULL;
descriptor_file_ = NULL;
env_->DeleteFile(new_manifest_file);
}
}
return s;
}
leveldb研究10- 流程分析:写数据
Posted on 2012-03-21 14:41 小明 阅读(1019) 评论(0) 编辑 收藏 所属分类: 分布式计算总体来说,leveldb的写操作有两个步骤,首先是针对log的append操作,然后是对memtable的插入操作。
影响写性能的因素有:
1. write_buffer_size
2. kL0_SlowdownWritesTrigger and kL0_StopWritesTrigger.提高这两个值,能够增加写的性能,但是降低读的性能
看看WriteOptions有哪些参数可以指定
struct WriteOptions {
//设置sync=true,leveldb会调用fsync(),这会降低插入性能
//同时会增加数据的安全性
//Default: false
bool sync;
WriteOptions()
: sync(false) {
}
};
首先把Key,value转成WriteBatch
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
接下来就是真正的插入了
这里使用了两把锁,主要是想提高并发能力,减少上锁的时间。
首先是检查是否可写,然后append log,最后是插入memtable
<db/dbimpl.cc>
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
//加锁
MutexLock l(&mutex_);
LoggerId self;
//拿到写log的权利
AcquireLoggingResponsibility(&self);
//检查是否可写
status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) {
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock during
// this phase since the "logger_" flag protects against concurrent
// loggers and concurrent writes into mem_.
{
assert(logger_ == &self);
mutex_.Unlock();
//IO操作:写入LOG
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
status = logfile_->Sync();
}
//插入memtable
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
assert(logger_ == &self);
}
//设置新的seqence number
versions_->SetLastSequence(last_sequence);
}
//释放写LOG锁
ReleaseLoggingResponsibility(&self);
return status;
}
写流量控制:
<db/dbimpl.cc>
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(logger_ != NULL);
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
mutex_.Unlock();
//如果level0的文件大于kL0_SlowdownWritesTrigger阈值,则sleep 1s,这样给compaction更多的CPU
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
//可写
break;
} else if (imm_ != NULL) {
// imm_:之前的memtable 没有被compaction,需要等待
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// level0文件个数大于kL0_StopWritesTrigger,需要等待
Log(options_.info_log, "waiting
\n");
bg_cv_.Wait();
} else {
//生成新的额memtable和logfile,把当前memtable传给imm_
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
// 发起compaction,dump imm_
MaybeScheduleCompaction();
}
}
return s;
}
leveldb研究11- 流程分析:读数据
Posted on 2012-03-21 17:30 小明 阅读(1099) 评论(0) 编辑 收藏 所属分类: 分布式计算leveldb读数据
先看看ReadOptions有哪些参数可以指定:
// Options that control read operations
struct ReadOptions {
// 是否检查checksum
// Default: false
bool verify_checksums;
// 是否将此次结果放入cache
// Default: true
bool fill_cache;
//是否指定snapshot,否则读取当前版本
// Default: NULL
const Snapshot* snapshot;
ReadOptions()
: verify_checksums(false),
fill_cache(true),
snapshot(NULL) {
}
};
下面看看读取的详细过程:
查询memtable=>查询previous memtable(imm_)=>查询文件(缓冲)
Status DBImpl::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
//设置snapshot
if (options.snapshot != NULL) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
LookupKey lkey(key, snapshot);
//先查询memtable
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) { //然后查询previous memtable:imm_
// Done
} else {
//从文件中读取
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
//是否有文件需要被compaction,参见allowed_seek
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}
重点来看看从version中读取:
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
//从level0向高层查找,如果再低级level中查到,则不再查询
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
//本层文件数为空,则返回
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
//level0特殊处理,因为key是重叠,所有符合条件的文件必须被查找
tmp.reserve(num_files);
for (uint32_t i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// 二分法查找,某个key只可能属于一个文件
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
//没有查到
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (uint32_t i = 0; i < num_files; ++i) { //遍历本层符合条件的文件
if (last_file_read != NULL && stats->seek_file == NULL) {
//seek_file只记录第一个
stats->seek_file = last_file_read;
stats->seek_file_level = last_file_read_level;
}
FileMetaData* f = files[i];
last_file_read = f;
last_file_read_level = level;
//从table cache中读取
Iterator* iter = vset_->table_cache_->NewIterator(
options,
f->number,
f->file_size);
iter->Seek(ikey);
const bool done = GetValue(ucmp, iter, user_key, value, &s);
if (!iter->status().ok()) { //查找到
s = iter->status();
delete iter;
return s;
} else {
delete iter;
if (done) {
return s;
}
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
继续跟踪:TableCache
Iterator* TableCache::NewIterator(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
Table** tableptr) {
if (tableptr != NULL) {
*tableptr = NULL;
}
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
//从LRU cache中查找
Cache::Handle* handle = cache_->Lookup(key);
if (handle == NULL) {
/加载文件
std::string fname = TableFileName(dbname_, file_number);
RandomAccessFile* file = NULL;
Table* table = NULL;
Status s = env_->NewRandomAccessFile(fname, &file);
if (s.ok()) {
s = Table::Open(*options_, file, file_size, &table);
}
if (!s.ok()) {
assert(table == NULL);
delete file;
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
return NewErrorIterator(s);
}
//插入Cache
TableAndFile* tf = new TableAndFile;
tf->file = file;
tf->table = table;
handle = cache_->Insert(key, tf, 1, &DeleteEntry);
}
Table* table = reinterpret_cast<TableAndFile*>(cache_->Value(handle))->table;
//从Table对象中生成iterator
Iterator* result = table->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (tableptr != NULL) {
*tableptr = table;
}
return result;
}
leveldb性能分析 - 随机写
Posted on 2012-03-22 17:32 小明 阅读(1423) 评论(0) 编辑 收藏 所属分类: 分布式计算准备工作:
1. 下载Snappy库
Download source code from: http://code.google.com/p/snappy
编译并安装
./configure & make & sudo make install
2. 编译leveldb自带的db_bench
make db_bench
注意:在ubuntu 11.04上编译会出错,修改makefile:
$(CXX) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@
为
$(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@ $(LDFLAGS)
为了获取更多的信息,我写了一个简单的测试程序来测试写性能。
g++ src/ldbbench.cpp libleveldb.a -I../leveldb/include -o ldb_test -pthread -lsnappy
我隔10w条记录统计一下运行时间和各层level的个数。
#include <iostream>
#include <cstdlib>
#include <sys/time.h>
#include "leveldb/db.h"
#include "leveldb/env.h"
using namespace std;
using namespace leveldb;
static inline double micro_time(){
struct timeval tim;
double ret;
gettimeofday(&tim, NULL);
ret = tim.tv_sec+(tim.tv_usec/1000000.0);
return ret;
}
int main() {
srand ( time(NULL) );
DB *db ;
Options op;
op.create_if_missing = true;
Status s = DB::Open(op,"/tmp/testdb",&db);
Env * env = Env::Default();
WritableFile *file;
env->NewWritableFile("/tmp/bench.csv",&file);
if(s.ok()){
cout << "create successfully" << endl;
WriteOptions wop;
for(int j=0;j<100;++j){
double start = micro_time();
double cost;
for(int i=0;i<100000;++i){
char key[100];
char value[100];
sprintf(key,"%d_%d",i,rand());
sprintf(value,"%d",rand());
db->Put(wop,key,value);
}
cost = micro_time()-start;
cout << "write successfully:" << j << ",costs "<<cost<<endl;
// report the status
{
//output stats information
string value;
char buffer[40];
for(int i=0;i<7;++i){
sprintf(buffer,"leveldb.num-files-at-level%d",i);
db->GetProperty(buffer,&value);
file->Append(value+",");
}
sprintf(buffer,"%f",cost);
file->Append(buffer);
file->Append("\n");
file->Sync();
}
}
cout << "write completed" << endl;
}
delete db;
file->Close();
delete file;
return 0;
}
得到结果如下:
可以看出 插入时间不稳定,一旦level 0 的文件个数达到8(leveldb在level0 sst file到达8会做流量控制),就会严重的影响插入速度。
数据如下: 前7栏为各level的文件个数,最后一栏为插入时间(单位second).
0,0,0,0,0,0,0,0.312044
0,0,1,0,0,0,0,0.339661
0,1,1,0,0,0,0,0.336554
1,1,1,0,0,0,0,0.338470
2,1,1,0,0,0,0,0.319139
4,1,1,0,0,0,0,0.322158
5,1,1,0,0,0,0,0.411267
6,1,1,0,0,0,0,0.452211
7,1,1,0,0,0,0,0.392227
4,6,1,0,0,0,0,0.599982
5,6,1,0,0,0,0,0.392222
6,6,1,0,0,0,0,0.426607
7,6,1,0,0,0,0,0.450604
0,9,7,0,0,0,0,1.884518
1,9,7,0,0,0,0,0.420226
2,8,8,0,0,0,0,0.395083
3,8,8,0,0,0,0,0.418100
4,7,9,0,0,0,0,0.421611
6,7,9,0,0,0,0,0.415739
7,7,9,0,0,0,0,0.407361
1,14,10,0,0,0,0,2.226791
2,14,10,0,0,0,0,0.401517
3,14,10,0,0,0,0,0.373305
4,13,11,0,0,0,0,0.419741
5,13,11,0,0,0,0,0.409911
6,12,12,0,0,0,0,0.410904
7,12,12,0,0,0,0,0.429305
0,19,15,0,0,0,0,3.586968
2,19,15,0,0,0,0,0.443083
3,18,16,0,0,0,0,0.403899
4,18,16,0,0,0,0,0.427664
5,17,17,0,0,0,0,0.398022
6,16,19,0,0,0,0,0.373106
7,16,19,0,0,0,0,0.381070
0,16,27,0,0,0,0,3.997287
1,16,27,0,0,0,0,0.415576
2,15,29,0,0,0,0,0.395088
3,15,29,0,0,0,0,0.421756
4,15,29,0,0,0,0,0.423345
5,14,30,0,0,0,0,0.443051
6,13,32,0,0,0,0,0.409214
0,21,35,0,0,0,0,3.724305
1,21,35,0,0,0,0,0.394496
2,20,36,0,0,0,0,0.400312
3,20,36,0,0,0,0,0.440494
4,19,36,0,0,0,0,0.401116
5,19,36,0,0,0,0,0.368698
6,19,36,0,0,0,0,0.392624
7,18,37,0,0,0,0,0.421263
0,20,45,0,0,0,0,5.280940
1,20,45,0,0,0,0,0.445995
2,19,46,0,0,0,0,0.427433
3,19,46,0,0,0,0,0.396355
4,19,46,0,0,0,0,0.412447
6,18,47,0,0,0,0,0.425992
7,18,47,0,0,0,0,0.409269
0,22,54,0,0,0,0,4.659271
1,22,54,0,0,0,0,0.353135
2,22,54,0,0,0,0,0.412604
3,22,54,0,0,0,0,0.387365
4,21,55,0,0,0,0,0.447579
5,20,56,0,0,0,0,0.423402
6,20,56,0,0,0,0,0.392983
7,19,58,0,0,0,0,0.372202
0,22,66,0,0,0,0,5.072227
1,22,66,0,0,0,0,0.389874
2,22,66,0,0,0,0,0.375599
4,22,66,0,0,0,0,0.405292
5,22,66,0,0,0,0,0.404367
6,22,66,0,0,0,0,0.394260
7,22,66,0,0,0,0,0.401855
0,24,77,0,0,0,0,5.980508
1,24,77,0,0,0,0,0.388424
2,24,77,0,0,0,0,0.429406
3,23,78,0,0,0,0,0.412908
4,23,78,0,0,0,0,0.428574
5,23,78,0,0,0,0,0.403336
6,22,79,0,0,0,0,0.394216
8,13,89,0,0,0,0,5.377096
1,23,89,0,0,0,0,0.816229
2,23,89,0,0,0,0,0.437396
3,23,89,0,0,0,0,0.399540
4,22,90,0,0,0,0,0.437927
5,22,90,0,0,0,0,0.424814
6,22,90,0,0,0,0,0.411747
7,21,92,0,0,0,0,0.384908
0,25,100,0,0,0,0,6.236974
1,25,100,0,0,0,0,0.403147
3,25,100,0,0,0,0,0.412086
4,25,100,0,0,0,0,0.403978
5,24,102,0,0,0,0,0.398120
6,24,102,0,0,0,0,0.374137
7,24,102,0,0,0,0,0.370625
0,22,102,10,0,0,0,6.692459
1,22,102,10,0,0,0,0.389345
2,22,102,10,0,0,0,0.411086
3,22,102,10,0,0,0,0.404387
4,21,103,10,0,0,0,0.443593
5,21,103,10,0,0,0,0.400221
6,21,103,10,0,0,0,0.414371
相关文章推荐
- leveldb代码阅读(16)——流程分析:打开数据库(详细版本)
- leveldb研究2- 存储分析,数据库日志文件格式,数据文件的格式和生成
- leveldb代码阅读(17)——流程分析:写数据(详细版本)
- MyBatis主流程分析之(二)-打开会话和数据库操作
- ASP.NET网络爬虫小研究 HtmlAgilityPack基础,爬取数据保存在数据库中再显示再自己的网页中
- netlink监听网络变化代码(转载)+流程分析(原创+转载)+数据结构以及相关宏的解析(原创)
- sql 2005 创建动态数据报表的整个流程分析.
- android中通过拨号键打开contacts相关界面的流程分析
- 怎么去分析数据库表的模型(数据模型)
- OSISoft实时/历史数据库PI的数据存储机制分析
- 电商用户行为分析大数据平台相关系列8-数据分析流程
- QPBOC交易流程详解--POS与卡片的数据交互进行分析
- SSM框架Jsp页面POST提交的中文数据保存到数据库变成乱码问题的分析
- 将Python和R整合进一个数据分析流程
- 数据分析系列剧第四集:行业研究报告与生产计划
- 如何把股票软件的数据导入到数据库(access,sqlserver,oracle)然后自行统计分析?
- 大数据时代:基于微软案例数据库数据挖掘知识点总结(Microsoft 顺序分析和聚类分析算法)
- SPSS数据分析流程经验总结
- 【转】Visual Studio 2010在数据库生成随机测数据