您的位置:首页 > 编程语言

leveldb代码阅读(17)——流程分析:写数据(详细版本)

2016-01-15 15:25 501 查看
原文地址:http://www.blogjava.net/sandy/archive/2012/03/21/leveldb10.html

总体来说,leveldb的写操作有两个步骤,首先是针对log的append操作,然后是对memtable的插入操作。

影响写性能的因素有:

1. write_buffer_size

2. kL0_SlowdownWritesTrigger and kL0_StopWritesTrigger.提高这两个值,能够增加写的性能,但是降低读的性能

看看WriteOptions有哪些参数可以指定

struct WriteOptions {

//设置sync=true,leveldb会调用fsync(),这会降低插入性能

//同时会增加数据的安全性

//Default: false

bool sync;

WriteOptions()

: sync(false) {

}

};
首先把Key,value转成WriteBatch

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

WriteBatch batch;

batch.Put(key, value);

return Write(opt, &batch);

}
接下来就是真正的插入了

这里使用了两把锁,主要是想提高并发能力,减少上锁的时间。

首先是检查是否可写,然后append log,最后是插入memtable

<db/dbimpl.cc>

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {

Status status;

//加锁

MutexLock l(&mutex_);

LoggerId self;

//拿到写log的权利

AcquireLoggingResponsibility(&self);

//检查是否可写

status = MakeRoomForWrite(false); // May temporarily release lock and wait

uint64_t last_sequence = versions_->LastSequence();

if (status.ok()) {

WriteBatchInternal::SetSequence(updates, last_sequence + 1);

last_sequence += WriteBatchInternal::Count(updates);

// Add to log and apply to memtable. We can release the lock during

// this phase since the "logger_" flag protects against concurrent

// loggers and concurrent writes into mem_.

{

assert(logger_ == &self);

mutex_.Unlock();

//IO操作:写入LOG

status = log_->AddRecord(WriteBatchInternal::Contents(updates));

if (status.ok() && options.sync) {

status = logfile_->Sync();

}

//插入memtable

if (status.ok()) {

status = WriteBatchInternal::InsertInto(updates, mem_);

}

mutex_.Lock();

assert(logger_ == &self);

}

//设置新的seqence number

versions_->SetLastSequence(last_sequence);

}

//释放写LOG锁

ReleaseLoggingResponsibility(&self);

return status;

}
写流量控制:

<db/dbimpl.cc>

Status DBImpl::MakeRoomForWrite(bool force) {

mutex_.AssertHeld();

assert(logger_ != NULL);

bool allow_delay = !force;

Status s;

while (true) {

if (!bg_error_.ok()) {

// Yield previous error

s = bg_error_;

break;

} else if (

allow_delay &&

versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {

mutex_.Unlock();

//如果level0的文件大于kL0_SlowdownWritesTrigger阈值,则sleep 1s,这样给compaction更多的CPU

env_->SleepForMicroseconds(1000);

allow_delay = false; // Do not delay a single write more than once

mutex_.Lock();

} else if (!force &&

(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {

//可写

break;

} else if (imm_ != NULL) {

// imm_:之前的memtable 没有被compaction,需要等待

bg_cv_.Wait();

} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {

// level0文件个数大于kL0_StopWritesTrigger,需要等待

Log(options_.info_log, "waiting

\n");

bg_cv_.Wait();

} else {

//生成新的额memtable和logfile,把当前memtable传给imm_

assert(versions_->PrevLogNumber() == 0);

uint64_t new_log_number = versions_->NewFileNumber();

WritableFile* lfile = NULL;

s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);

if (!s.ok()) {

break;

}

delete log_;

delete logfile_;

logfile_ = lfile;

logfile_number_ = new_log_number;

log_ = new log::Writer(lfile);

imm_ = mem_;

has_imm_.Release_Store(imm_);

mem_ = new MemTable(internal_comparator_);

mem_->Ref();

force = false; // Do not force another compaction if have room

// 发起compaction,dump imm_

MaybeScheduleCompaction();

}

}

return s;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: