您的位置:首页 > 数据库

leveldb研究2- 存储分析,数据库日志文件格式,数据文件的格式和生成

2013-01-09 21:22 756 查看

leveldb研究2- 存储分析

Posted on 2012-03-09 11:44 小明 阅读(1128) 评论(1) 编辑 收藏 所属分类: 分布式计算


对于一个db来说,存储是至关重要的问题。

运行上一篇的测试程序后,会发现leveldb会生成以下文件:

SST文件:数据文件 -- sstable格式

*.log: 数据库日志文件 -- 顺序记录所有数据库操作,用来恢复数据

CURRENT: 文本文件,表明当面的manifest文件

LOCK:空文件,数据库锁,防止多进程访问

LOG: 日志文件,文本格式

LOG.old:上一次的日志文件

MANIFEST: 数据库状态文件

leveldb研究3-数据库日志文件格式

Posted on 2012-03-09 16:00 小明 阅读(1173) 评论(1) 编辑 收藏 所属分类: 分布式计算


leveldb在每次数据库操作之前都会把操作记录下来。

主要实现在db\log_format.h,db\log_reader.h,db\log_reader.cc,db\log_write.h,db\log_write.cc中。我们来具体看看实现。

日志格式

db\log_format.h

log是分块的,每块为32K,每条记录的记录头为7个字节,前四个为CRC,然后是长度(2个字节),最后是记录类型(1个字节)

---------------------------------------

BLOCK1|BLOCK2|BLOCK3|...|BLOCKN

---------------------------------------

enum RecordType {

// Zero is reserved for preallocated files

kZeroType = 0,

kFullType = 1,

// For fragments

kFirstType = 2,

kMiddleType = 3,

kLastType = 4

};

static const int kMaxRecordType = kLastType;

static const int kBlockSize = 32768;

// Header is checksum (4 bytes), type (1 byte), length (2 bytes).

static const int kHeaderSize = 4 + 1 + 2;

} // namespace log

} // namespace leveldb

写日志操作

db\log_writer.cc

请注意这里的处理,由于1条记录可能超过一个BLOCK的大小,所以需要分成多个片段写入。

//增加一条记录

Status Writer::AddRecord(const Slice& slice) {

const char* ptr = slice.data();

size_t left = slice.size();

// Fragment the record if necessary and emit it. Note that if slice

// is empty, we still want to iterate once to emit a single

// zero-length record

Status s;

bool begin = true;

do {

const int leftover = kBlockSize - block_offset_; //当前剩余多少字节

assert(leftover >= 0);

if (leftover < kHeaderSize) { //不够文件头大小7bytes

// 转入新的block

if (leftover > 0) {

//用0来填充空白

assert(kHeaderSize == 7);

dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));

}

block_offset_ = 0;

}

// Invariant: we never leave < kHeaderSize bytes in a block.

assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

//avail:除掉头还算多少字节

const size_t avail = kBlockSize - block_offset_ - kHeaderSize;

//实际写入大小

const size_t fragment_length = (left < avail) ? left : avail;

RecordType type;

const bool end = (left == fragment_length); //记录是否结束

if (begin && end) {

type = kFullType; //完整记录

} else if (begin) {

type = kFirstType; //开头

} else if (end) {

type = kLastType; //结尾

} else {

type = kMiddleType; //中间

}

//写入

s = EmitPhysicalRecord(type, ptr, fragment_length);

ptr += fragment_length;

left -= fragment_length;

begin = false;

} while (s.ok() && left > 0);

return s;

}

//实际写入日志文件

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {

assert(n <= 0xffff); // Must fit in two bytes

assert(block_offset_ + kHeaderSize + n <= kBlockSize);

// 记录头

char buf[kHeaderSize];

buf[4] = static_cast<char>(n & 0xff);

buf[5] = static_cast<char>(n >> 8);

buf[6] = static_cast<char>(t);

// 计算CRC

uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);

crc = crc32c::Mask(crc); // Adjust for storage

EncodeFixed32(buf, crc);

// 写入头部

Status s = dest_->Append(Slice(buf, kHeaderSize));

if (s.ok()) {

//写入记录片段

s = dest_->Append(Slice(ptr, n));

if (s.ok()) {

s = dest_->Flush();

}

}

block_offset_ += kHeaderSize + n;

return s;

}

读日志操作

这里可以看出使用BLOCK的好处,能够减少文件IO次数,读日志基本上就是写日志反向过程。

//读取记录,scratch为缓冲,record是结果

bool Reader::ReadRecord(Slice* record, std::string* scratch) {

if (last_record_offset_ < initial_offset_) { //需要跳过文件头部信息,目前未实现

if (!SkipToInitialBlock()) {

return false;

}

}

scratch->clear();

record->clear();

bool in_fragmented_record = false//是否是碎片记录

; // Record offset of the logical record that we're reading

// 0 is a dummy value to make compilers happy

uint64_t prospective_record_offset = 0;

Slice fragment;

while (true) {

uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();

//从文件中读取一个BLOCK

const unsigned int record_type = ReadPhysicalRecord(&fragment);

switch (record_type) {

case kFullType: //完整Record

if (in_fragmented_record) {

// Handle bug in earlier versions of log::Writer where

// it could emit an empty kFirstType record at the tail end

// of a block followed by a kFullType or kFirstType record

// at the beginning of the next block.

if (scratch->empty()) {

in_fragmented_record = false;

} else {

ReportCorruption(scratch->size(), "partial record without end(1)");

}

}

prospective_record_offset = physical_record_offset;

scratch->clear();

*record = fragment;

last_record_offset_ = prospective_record_offset;

return true;

case kFirstType: //Record开始

if (in_fragmented_record) {

// Handle bug in earlier versions of log::Writer where

// it could emit an empty kFirstType record at the tail end

// of a block followed by a kFullType or kFirstType record

// at the beginning of the next block.

if (scratch->empty()) {

in_fragmented_record = false;

} else {

ReportCorruption(scratch->size(), "partial record without end(2)");

}

}

prospective_record_offset = physical_record_offset;

scratch->assign(fragment.data(), fragment.size());

in_fragmented_record = true;

break;

case kMiddleType://Record中间

if (!in_fragmented_record) {

ReportCorruption(fragment.size(),

"missing start of fragmented record(1)");

} else {

scratch->append(fragment.data(), fragment.size());

}

break;

case kLastType://Record结尾

if (!in_fragmented_record) {

ReportCorruption(fragment.size(),

"missing start of fragmented record(2)");

} else {

scratch->append(fragment.data(), fragment.size());

*record = Slice(*scratch);

last_record_offset_ = prospective_record_offset;

return true;

}

break;

case kEof://文件结束

if (in_fragmented_record) {

ReportCorruption(scratch->size(), "partial record without end(3)");

scratch->clear();

}

return false;

case kBadRecord://坏记录

if (in_fragmented_record) {

ReportCorruption(scratch->size(), "error in middle of record");

in_fragmented_record = false;

scratch->clear();

}

break;

default: {//无法识别

char buf[40];

snprintf(buf, sizeof(buf), "unknown record type %u", record_type);

ReportCorruption(

(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),

buf);

in_fragmented_record = false;

scratch->clear();

break;

}

}

}

return false;

}

//从文件中读取

unsigned int Reader::ReadPhysicalRecord(Slice* result) {

while (true) {

if (buffer_.size() < kHeaderSize) {

if (!eof_) {

// Last read was a full read, so this is a trailer to skip

buffer_.clear();

//读入一个BLOCK

Status status = file_->Read(kBlockSize, &buffer_, backing_store_);

end_of_buffer_offset_ += buffer_.size();

if (!status.ok()) {

buffer_.clear();

ReportDrop(kBlockSize, status);

eof_ = true;

return kEof;

} else if (buffer_.size() < kBlockSize) {

eof_ = true;

}

continue;

} else if (buffer_.size() == 0) {

// End of file

return kEof;

} else {

size_t drop_size = buffer_.size();

buffer_.clear();

ReportCorruption(drop_size, "truncated record at end of file");

return kEof;

}

}

// 解析record头

const char* header = buffer_.data();

const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;

const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;

const unsigned int type = header[6];

const uint32_t length = a | (b << 8);

if (kHeaderSize + length > buffer_.size()) {

size_t drop_size = buffer_.size();

buffer_.clear();

ReportCorruption(drop_size, "bad record length");

return kBadRecord;

}

if (type == kZeroType && length == 0) {

// Skip zero length record without reporting any drops since

// such records are produced by the mmap based writing code in

// env_posix.cc that preallocates file regions.

buffer_.clear();

return kBadRecord;

}

// 检查CRC

if (checksum_) {

uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));

uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);

if (actual_crc != expected_crc) {

// Drop the rest of the buffer since "length" itself may have

// been corrupted and if we trust it, we could find some

// fragment of a real log record that just happens to look

// like a valid log record.

size_t drop_size = buffer_.size();

buffer_.clear();

ReportCorruption(drop_size, "checksum mismatch");

return kBadRecord;

}

}

buffer_.remove_prefix(kHeaderSize + length);

// Skip physical record that started before initial_offset_

if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <

initial_offset_) {

result->clear();

return kBadRecord;

}

*result = Slice(header + kHeaderSize, length);

return type;

}

}

leveldb研究4- 数据文件的格式和生成

Posted on 2012-03-12 18:21 小明 阅读(1222) 评论(1) 编辑 收藏 所属分类: 分布式计算


leveldb使用SSTable格式来保存数据。

格式为:(当前没有META BLOCK)

SSTABLE = |DATA BLOCK1|DATA BLOCK2|...|DATA BLOCK N|META BLOCK1|...|META BLOCK N|META INDEX BLOCK|DATA INDEX BLOCK|Footer|

DATA BLOCK = |KeyValues|Restart arrays|array size|Compress Type|CRC

Footer(定长) = META INDEX BLOCK offset | DATA Index Block offset| Magic Numbers

比较细节的地方是数据块的压缩,针对key使用了前缀压缩法。

下面看看具体的实现。

//builder.cc

//dbname:数据库名称

//env:OS接口

//iter:指向MemTable的一个iterator

Status BuildTable(const std::string& dbname,

Env* env,

const Options& options,

TableCache* table_cache,

Iterator* iter,

FileMetaData* meta) {

Status s;

meta->file_size = 0;

iter->SeekToFirst();

//生成文件名:格式 "0000x.sst"

std::string fname = TableFileName(dbname, meta->number);

if (iter->Valid()) {

WritableFile* file;

//创建一个可写文件

s = env->NewWritableFile(fname, &file);

if (!s.ok()) {

return s;

}

//TableBuilder负责table生成和写入

TableBuilder* builder = new TableBuilder(options, file);

//META:最小key

meta->smallest.DecodeFrom(iter->key());

for (; iter->Valid(); iter->Next()) {

Slice key = iter->key();

//META:最大key

meta->largest.DecodeFrom(key);

//增加数据到builder

builder->Add(key, iter->value());

}

// Finish and check for builder errors

if (s.ok()) {

//完成写入

s = builder->Finish();

if (s.ok()) {

meta->file_size = builder->FileSize();

assert(meta->file_size > 0);

}

} else {

builder->Abandon();

}

delete builder;

// Finish and check for file errors

if (s.ok()) {

s = file->Sync();

}

if (s.ok()) {

//sync & close,写入磁盘

s = file->Close();

}

delete file;

file = NULL;

if (s.ok()) {

// Verify that the table is usable

Iterator* it = table_cache->NewIterator(ReadOptions(),

meta->number,

meta->file_size);

s = it->status();

delete it;

}

}

// Check for input iterator errors

if (!iter->status().ok()) {

s = iter->status();

}

if (s.ok() && meta->file_size > 0) {

// Keep it

} else {

env->DeleteFile(fname);

}

return s;

}

} // namespace leveldb

我们来看看TableBuilder类,主要的细节都在这个类中实现了

TableBuilder中含有一个Rep的数据结构的指针,主要是用于保存builder的一些状态和数据。为什么不在TableBuilder头文件中直接定义这些变量?主要是不想暴露过多的细节给使用者,真是一个很好的做法。

struct TableBuilder::Rep {

Options options;

Options index_block_options;

WritableFile* file; //sstable文件指针

uint64_t offset;

Status status;

BlockBuilder data_block; //数据块

BlockBuilder index_block; //索引块

std::string last_key;//上一次的key,用于比较和建立索引

int64_t num_entries; //

bool closed; // 是否结束

bool pending_index_entry; //是否要新增索引块

BlockHandle pending_handle; // Handle to add to index block

std::string compressed_output;

Rep(const Options& opt, WritableFile* f)

: options(opt),

index_block_options(opt),

file(f),

offset(0),

data_block(&options),

index_block(&index_block_options),

num_entries(0),

closed(false),

pending_index_entry(false) {

index_block_options.block_restart_interval = 1;

}

};

新加一条记录:

//增加一条数据记录

void TableBuilder::Add(const Slice& key, const Slice& value) {

Rep* r = rep_;

assert(!r->closed);

if (!ok()) return;

if (r->num_entries > 0) {

//检查是不是顺序添加

assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);

}

if (r->pending_index_entry) { //是否生成新的index block

//检查当前是否是一个新的BLOCK

assert(r->data_block.empty());

//根据当前的key和上一个DATA BLOCK的最后一个主键生成最短的索引

r->options.comparator->FindShortestSeparator(&r->last_key, key);

std::string handle_encoding;

r->pending_handle.EncodeTo(&handle_encoding);

//增加新的INDEX BLOCK,但不立即写入

r->index_block.Add(r->last_key, Slice(handle_encoding));

r->pending_index_entry = false;

}

r->last_key.assign(key.data(), key.size());

r->num_entries++;

r->data_block.Add(key, value);

const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();

//检查是否已经达到BLOCK SIZE,默认4K

if (estimated_block_size >= r->options.block_size) {

Flush();

}

}

//写一个DATA BLOCK

void TableBuilder::Flush() {

Rep* r = rep_;

assert(!r->closed);

if (!ok()) return;

if (r->data_block.empty()) return;

assert(!r->pending_index_entry);

WriteBlock(&r->data_block, &r->pending_handle);

if (ok()) {

r->pending_index_entry = true;

r->status = r->file->Flush();

}

}

//写BLOCK

void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {

//文件格式: 数据+类型(1个字节)+ CRC(4个字节)

assert(ok());

Rep* r = rep_;

//生成binary

Slice raw = block->Finish();

//压缩数据

Slice block_contents;

CompressionType type = r->options.compression;

switch (type) {

case kNoCompression:

block_contents = raw;

break;

case kSnappyCompression: {

std::string* compressed = &r->compressed_output;

if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&

compressed->size() < raw.size() - (raw.size() / 8u)) {

block_contents = *compressed;

} else {

// Snappy not supported, or compressed less than 12.5%, so just

// store uncompressed form

block_contents = raw;

type = kNoCompression;

}

break;

}

}

handle->set_offset(r->offset);

handle->set_size(block_contents.size());

r->status = r->file->Append(block_contents);

if (r->status.ok()) {

char trailer[kBlockTrailerSize];

trailer[0] = type;

uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());

crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type

EncodeFixed32(trailer+1, crc32c::Mask(crc));

r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));

if (r->status.ok()) {

r->offset += block_contents.size() + kBlockTrailerSize;

}

}

r->compressed_output.clear();

block->Reset();

}

完成文件的写入:

Status TableBuilder::Finish() {

Rep* r = rep_;

Flush();

assert(!r->closed);

r->closed = true;

BlockHandle metaindex_block_handle;

BlockHandle index_block_handle;

if (ok()) {

//写入META INDEX BLOCK

BlockBuilder meta_index_block(&r->options);

// TODO(postrelease): Add stats and other meta blocks

WriteBlock(&meta_index_block, &metaindex_block_handle);

}

if (ok()) {

if (r->pending_index_entry) {

r->options.comparator->FindShortSuccessor(&r->last_key);

std::string handle_encoding;

r->pending_handle.EncodeTo(&handle_encoding);

r->index_block.Add(r->last_key, Slice(handle_encoding));

r->pending_index_entry = false;

}

//写入索引块

WriteBlock(&r->index_block, &index_block_handle);

}

if (ok()) {

//写入Footer,包含META INDEX BLOCK和INDEX HANDLE的offset

Footer footer;

footer.set_metaindex_handle(metaindex_block_handle);

footer.set_index_handle(index_block_handle);

std::string footer_encoding;

footer.EncodeTo(&footer_encoding);

r->status = r->file->Append(footer_encoding);

if (r->status.ok()) {

r->offset += footer_encoding.size();

}

}

return r->status;

}

这里面有两个类BlockBuilder和BlockHandle,BlockBuilder负责把数据按照一定格式进行序列化,而BlockHandle负责记录offset,size等,可以理解为BLOCK的文件中指针。

我们看看BlockBuilder的实现,这里leveldb实现了前缀压缩法,因为一个BLOCK的key很接近,所以前后两个key相差不会很大,所以采取了<shared_size><non_shared_size><value_size><non_shared_data><value_data>的格式,节省了空间。

其中size采用了变长格式,很有意思的格式,主要是针对小整形做的一个优化,用最多8个字节来表示4个字节的整形,每个byte的最高一个bit用来指示还有没有后续数据,如果最高位为0,则表示没有后续的bytes.这样小于7F的数据只需要一个字节来表示。

可以参考这篇文章具体看实现variant32格式。

//完成写入

Slice BlockBuilder::Finish() {

// 写入restart数组,每隔options_->block_restart_interval(default:16)生成一个restart offset

for (size_t i = 0; i < restarts_.size(); i++) {

PutFixed32(&buffer_, restarts_[i]);

}

//写入restart的大小

PutFixed32(&buffer_, restarts_.size());

finished_ = true;

return Slice(buffer_);

}

void BlockBuilder::Add(const Slice& key, const Slice& value) {

Slice last_key_piece(last_key_);

assert(!finished_);

assert(counter_ <= options_->block_restart_interval);

assert(buffer_.empty() // No values yet?

|| options_->comparator->Compare(key, last_key_piece) > 0);

size_t shared = 0;

//counter_内部计数器,用于记录当前restart后的个数

if (counter_ < options_->block_restart_interval) {

//看看当前的key和上一个有多少相同的bytes

const size_t min_length = std::min(last_key_piece.size(), key.size());

while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {

shared++;

}

} else {

// Restart compression

restarts_.push_back(buffer_.size());

counter_ = 0;

}

const size_t non_shared = key.size() - shared;

// 写入 "<shared><non_shared><value_size>" to 缓冲

PutVarint32(&buffer_, shared);

PutVarint32(&buffer_, non_shared);

PutVarint32(&buffer_, value.size());

// 写入 non_shared data和value

buffer_.append(key.data() + shared, non_shared);

buffer_.append(value.data(), value.size());

// 设置 last_key_ 等于 当前的key

last_key_.resize(shared);

last_key_.append(key.data() + shared, non_shared);

assert(Slice(last_key_) == key);

counter_++;

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐