您的位置:首页 > 数据库

leveldb代码阅读(16)——流程分析:打开数据库(详细版本)

2016-01-15 15:23 411 查看
原文地址:http://www.blogjava.net/sandy/archive/2012/03/20/leveldb9.html

leveldb 是通过Open函数来打开/新建数据库。

static Status Open(const Options& options,

const std::string& name,

DB** dbptr);
其中options指定一些选项。

struct Options {

// -------------------

// 影响行为的参数

//comparator用于指定key的排列方式,默认按照字节排序

const Comparator* comparator;

//如果不存在则创建

// Default: false

bool create_if_missing;

// 如果存在则失败

// Default: false

bool error_if_exists;

// 是否做严格的检查

// Default: false

bool paranoid_checks;

// env: os 封装

// Default: Env::Default()

Env* env;

// log file,默认和database相同路径

// Default: NULL

Logger* info_log;

// -------------------

// 影响性能的参数

// 写缓冲大小,增加会提高写的性能,但是会增加启动的时间,因为有更多的数据需要恢复

//

// Default: 4MB

size_t write_buffer_size;

// 最大打开的文件个数,用于TableCache

//

// Default: 1000

int max_open_files;

// Control over blocks (user data is stored in a set of blocks, and

// a block is the unit of reading from disk).

// 指定Block cache,默认leveldb会自动创建8MB的internal cache

// Default: NULL

Cache* block_cache;

//SST file中的Block size,为压缩之前的数据

//

// Default: 4K

size_t block_size;

// SST file 中的restart pointer的间隔,参见SST的文件格式

//

// Default: 16

int block_restart_interval;

// 压缩类型,默认为google的snappy压缩

CompressionType compression;

// Create an Options object with default values for all fields.

Options();

};
具体看看Open的实现:

<db/dbimpl.cc>

Status DB::Open(const Options& options, const std::string& dbname,

DB** dbptr) {

*dbptr = NULL;

//实例化对象:DBImpl

DBImpl* impl = new DBImpl(options, dbname);

//加锁

impl->mutex_.Lock();

VersionEdit edit;

//从log中恢复数据,生成新的SST file

Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists

if (s.ok()) {

//创建新的log file

uint64_t new_log_number = impl->versions_->NewFileNumber();

WritableFile* lfile;

s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),

&lfile);

if (s.ok()) {

edit.SetLogNumber(new_log_number);

impl->logfile_ = lfile;

impl->logfile_number_ = new_log_number;

impl->log_ = new log::Writer(lfile);

//生成新的manifest文件

s = impl->versions_->LogAndApply(&edit, &impl->mutex_);

}

if (s.ok()) {

//删除失效文件

impl->DeleteObsoleteFiles();

//进行compaction

impl->MaybeScheduleCompaction();

}

}

impl->mutex_.Unlock();

if (s.ok()) {

*dbptr = impl;

} else {

delete impl;

}

return s;

}

因为上次关闭数据库的时候,内存的数据可能并没有写入SST文件,所以要从*.log中读取记录,并写入新的SST文件。

<db/dbimpl.cc>

Status DBImpl::Recover(VersionEdit* edit) {

mutex_.AssertHeld();

//创建folder

env_->CreateDir(dbname_);

assert(db_lock_ == NULL);

//生成LOCK文件并锁定

Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);

if (!s.ok()) {

return s;

}

if (!env_->FileExists(CurrentFileName(dbname_))) {

if (options_.create_if_missing) {

//新建database

s = NewDB();

if (!s.ok()) {

return s;

}

} else {

return Status::InvalidArgument(

dbname_, "does not exist (create_if_missing is false)");

}

} else {

if (options_.error_if_exists) {

return Status::InvalidArgument(

dbname_, "exists (error_if_exists is true)");

}

}

//重建manifest信息

s = versions_->Recover();

if (s.ok()) {

SequenceNumber max_sequence(0);

//得到上次的log file

const uint64_t min_log = versions_->LogNumber();

const uint64_t prev_log = versions_->PrevLogNumber();

std::vector<std::string> filenames;

s = env_->GetChildren(dbname_, &filenames);

if (!s.ok()) {

return s;

}

uint64_t number;

FileType type;

std::vector<uint64_t> logs;

for (size_t i = 0; i < filenames.size(); i++) {

if (ParseFileName(filenames[i], &number, &type)

&& type == kLogFile

&& ((number >= min_log) || (number == prev_log))) {

logs.push_back(number);

}

}

// Recover in the order in which the logs were generated

std::sort(logs.begin(), logs.end());

for (size_t i = 0; i < logs.size(); i++) {

//从*.log中恢复数据

s = RecoverLogFile(logs[i], edit, &max_sequence);

// The previous incarnation may not have written any MANIFEST

// records after allocating this log number. So we manually

// update the file number allocation counter in VersionSet.

versions_->MarkFileNumberUsed(logs[i]);

}

if (s.ok()) {

if (versions_->LastSequence() < max_sequence) {

versions_->SetLastSequence(max_sequence);

}

}

}

return s;

}
继续看RecoverLogFile的实现:

<db/dbimpl.cc>

Status DBImpl::RecoverLogFile(uint64_t log_number,

VersionEdit* edit,

SequenceNumber* max_sequence) {

//LogReporter:出现坏数据的时候报告

struct LogReporter : public log::Reader::Reporter {

Env* env;

Logger* info_log;

const char* fname;

Status* status; // NULL if options_.paranoid_checks==false

virtual void Corruption(size_t bytes, const Status& s) {

Log(info_log, "%s%s: dropping %d bytes; %s",

(this->status == NULL ? "(ignoring error) " : ""),

fname, static_cast<int>(bytes), s.ToString().c_str());

if (this->status != NULL && this->status->ok()) *this->status = s;

}

};

mutex_.AssertHeld();

//打开Log file用于顺序读取

std::string fname = LogFileName(dbname_, log_number);

SequentialFile* file;

Status status = env_->NewSequentialFile(fname, &file);

if (!status.ok()) {

MaybeIgnoreError(&status);

return status;

}

LogReporter reporter;

reporter.env = env_;

reporter.info_log = options_.info_log;

reporter.fname = fname.c_str();

reporter.status = (options_.paranoid_checks ? &status : NULL);

// log::Reader读取数据

log::Reader reader(file, &reporter, true/*checksum*/,

0/*initial_offset*/);

Log(options_.info_log, "Recovering log #%llu",

(unsigned long long) log_number);

std::string scratch;

Slice record;

WriteBatch batch;

MemTable* mem = NULL;

//遍历log file,读取记录

while (reader.ReadRecord(&record, &scratch) &&

status.ok()) {

if (record.size() < 12) {

reporter.Corruption(

record.size(), Status::Corruption("log record too small"));

continue;

}

WriteBatchInternal::SetContents(&batch, record);

if (mem == NULL) {

//新建MemTable用于保存数据

mem = new MemTable(internal_comparator_);

mem->Ref();

}

//插入memtable

status = WriteBatchInternal::InsertInto(&batch, mem);

MaybeIgnoreError(&status);

if (!status.ok()) {

break;

}

const SequenceNumber last_seq =

WriteBatchInternal::Sequence(&batch) +

WriteBatchInternal::Count(&batch) - 1;

if (last_seq > *max_sequence) {

*max_sequence = last_seq;

}

if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {

//写入SST file:level 0

status = WriteLevel0Table(mem, edit, NULL);

if (!status.ok()) {

break;

}

//释放并删除memtable

mem->Unref();

mem = NULL;

}

}

if (status.ok() && mem != NULL) {

status = WriteLevel0Table(mem, edit, NULL);

// Reflect errors immediately so that conditions like full

// file-systems cause the DB::Open() to fail.

}

if (mem != NULL) mem->Unref();

delete file;

return status;

}
至此完成SST file的写入。

接下来看看manifest文件的重建

mainfest的重建有两步,第一步是调用VersionSet::Recover函数恢复到上次的manifest,然后使用VersionSet::LogAndApply把新增的SST文件记录也写入manifest文件中。

<db/version_set.cc>

Status VersionSet::Recover() {

struct LogReporter : public log::Reader::Reporter {

Status* status;

virtual void Corruption(size_t bytes, const Status& s) {

if (this->status->ok()) *this->status = s;

}

};

// 读取CURRENT文件,获取最新的MANIFEST文件

std::string current;

Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);

if (!s.ok()) {

return s;

}

if (current.empty() || current[current.size()-1] != '\n') {

return Status::Corruption("CURRENT file does not end with newline");

}

current.resize(current.size() - 1);

std::string dscname = dbname_ + "/" + current;

SequentialFile* file;

//打开当前MANIFEST文件

s = env_->NewSequentialFile(dscname, &file);

if (!s.ok()) {

return s;

}

bool have_log_number = false;

bool have_prev_log_number = false;

bool have_next_file = false;

bool have_last_sequence = false;

uint64_t next_file = 0;

uint64_t last_sequence = 0;

uint64_t log_number = 0;

uint64_t prev_log_number = 0;

Builder builder(this, current_);

{

LogReporter reporter;

reporter.status = &s;

//使用log::Reader读取log记录:VersionEdit

log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);

Slice record;

std::string scratch;

while (reader.ReadRecord(&record, &scratch) && s.ok()) {

VersionEdit edit;

s = edit.DecodeFrom(record);

if (s.ok()) {

if (edit.has_comparator_ &&

edit.comparator_ != icmp_.user_comparator()->Name()) {

s = Status::InvalidArgument(

edit.comparator_ + "does not match existing comparator ",

icmp_.user_comparator()->Name());

}

}

if (s.ok()) {

//应用Edit到VersionSet

builder.Apply(&edit);

}

if (edit.has_log_number_) {

log_number = edit.log_number_;

have_log_number = true;

}

if (edit.has_prev_log_number_) {

prev_log_number = edit.prev_log_number_;

have_prev_log_number = true;

}

if (edit.has_next_file_number_) {

next_file = edit.next_file_number_;

have_next_file = true;

}

if (edit.has_last_sequence_) {

last_sequence = edit.last_sequence_;

have_last_sequence = true;

}

}

}

delete file;

file = NULL;

if (s.ok()) {

if (!have_next_file) {

s = Status::Corruption("no meta-nextfile entry in descriptor");

} else if (!have_log_number) {

s = Status::Corruption("no meta-lognumber entry in descriptor");

} else if (!have_last_sequence) {

s = Status::Corruption("no last-sequence-number entry in descriptor");

}

if (!have_prev_log_number) {

prev_log_number = 0;

}

MarkFileNumberUsed(prev_log_number);

MarkFileNumberUsed(log_number);

}

if (s.ok()) { //生成新的version,并设为current version

Version* v = new Version(this);

builder.SaveTo(v);

// Install recovered version

Finalize(v);

AppendVersion(v);

manifest_file_number_ = next_file;

next_file_number_ = next_file + 1;

last_sequence_ = last_sequence;

log_number_ = log_number;

prev_log_number_ = prev_log_number;

}

return s;

}

Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {

if (edit->has_log_number_) {

assert(edit->log_number_ >= log_number_);

assert(edit->log_number_ < next_file_number_);

} else {

edit->SetLogNumber(log_number_);

}

if (!edit->has_prev_log_number_) {

edit->SetPrevLogNumber(prev_log_number_);

}

edit->SetNextFile(next_file_number_);

edit->SetLastSequence(last_sequence_);

//使用VersionEdit创建新的Version

Version* v = new Version(this);

{

Builder builder(this, current_);

builder.Apply(edit);

builder.SaveTo(v);

}

Finalize(v);

// Initialize new descriptor log file if necessary by creating

// a temporary file that contains a snapshot of the current version.

std::string new_manifest_file;

Status s;

//创建新的manifest文件

if (descriptor_log_ == NULL) {

// No reason to unlock *mu here since we only hit this path in the

// first call to LogAndApply (when opening the database).

assert(descriptor_file_ == NULL);

new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);

edit->SetNextFile(next_file_number_);

s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);

if (s.ok()) {

descriptor_log_ = new log::Writer(descriptor_file_);

s = WriteSnapshot(descriptor_log_);

}

}

// Unlock during expensive MANIFEST log write

{

mu->Unlock();

// 写入manifest log

if (s.ok()) {

std::string record;

edit->EncodeTo(&record);

s = descriptor_log_->AddRecord(record);

if (s.ok()) {

s = descriptor_file_->Sync();

}

}

// If we just created a new descriptor file, install it by writing a

// new CURRENT file that points to it.

if (s.ok() && !new_manifest_file.empty()) {

s = SetCurrentFile(env_, dbname_, manifest_file_number_);

}

mu->Lock();

}

// 设置新的version

if (s.ok()) {

AppendVersion(v);

log_number_ = edit->log_number_;

prev_log_number_ = edit->prev_log_number_;

} else {

delete v;

if (!new_manifest_file.empty()) {

delete descriptor_log_;

delete descriptor_file_;

descriptor_log_ = NULL;

descriptor_file_ = NULL;

env_->DeleteFile(new_manifest_file);

}

}

return s;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: