leveldb研究2- 存储分析,数据库日志文件格式,数据文件的格式和生成
2013-01-09 21:22
756 查看
leveldb研究2- 存储分析
Posted on 2012-03-09 11:44 小明 阅读(1128) 评论(1) 编辑 收藏 所属分类: 分布式计算对于一个db来说,存储是至关重要的问题。
运行上一篇的测试程序后,会发现leveldb会生成以下文件:
SST文件:数据文件 -- sstable格式
*.log: 数据库日志文件 -- 顺序记录所有数据库操作,用来恢复数据
CURRENT: 文本文件,表明当面的manifest文件
LOCK:空文件,数据库锁,防止多进程访问
LOG: 日志文件,文本格式
LOG.old:上一次的日志文件
MANIFEST: 数据库状态文件
leveldb研究3-数据库日志文件格式
Posted on 2012-03-09 16:00 小明 阅读(1173) 评论(1) 编辑 收藏 所属分类: 分布式计算leveldb在每次数据库操作之前都会把操作记录下来。
主要实现在db\log_format.h,db\log_reader.h,db\log_reader.cc,db\log_write.h,db\log_write.cc中。我们来具体看看实现。
日志格式
db\log_format.h
log是分块的,每块为32K,每条记录的记录头为7个字节,前四个为CRC,然后是长度(2个字节),最后是记录类型(1个字节)
---------------------------------------
BLOCK1|BLOCK2|BLOCK3|...|BLOCKN
---------------------------------------
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
static const int kMaxRecordType = kLastType;
static const int kBlockSize = 32768;
// Header is checksum (4 bytes), type (1 byte), length (2 bytes).
static const int kHeaderSize = 4 + 1 + 2;
} // namespace log
} // namespace leveldb
写日志操作
db\log_writer.cc
请注意这里的处理,由于1条记录可能超过一个BLOCK的大小,所以需要分成多个片段写入。
//增加一条记录
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_; //当前剩余多少字节
assert(leftover >= 0);
if (leftover < kHeaderSize) { //不够文件头大小7bytes
// 转入新的block
if (leftover > 0) {
//用0来填充空白
assert(kHeaderSize == 7);
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
//avail:除掉头还算多少字节
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
//实际写入大小
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length); //记录是否结束
if (begin && end) {
type = kFullType; //完整记录
} else if (begin) {
type = kFirstType; //开头
} else if (end) {
type = kLastType; //结尾
} else {
type = kMiddleType; //中间
}
//写入
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
//实际写入日志文件
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
// 记录头
char buf[kHeaderSize];
buf[4] = static_cast<char>(n & 0xff);
buf[5] = static_cast<char>(n >> 8);
buf[6] = static_cast<char>(t);
// 计算CRC
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// 写入头部
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
//写入记录片段
s = dest_->Append(Slice(ptr, n));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + n;
return s;
}
读日志操作
这里可以看出使用BLOCK的好处,能够减少文件IO次数,读日志基本上就是写日志反向过程。
//读取记录,scratch为缓冲,record是结果
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) { //需要跳过文件头部信息,目前未实现
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false//是否是碎片记录
; // Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
//从文件中读取一个BLOCK
const unsigned int record_type = ReadPhysicalRecord(&fragment);
switch (record_type) {
case kFullType: //完整Record
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType: //Record开始
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType://Record中间
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType://Record结尾
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof://文件结束
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "partial record without end(3)");
scratch->clear();
}
return false;
case kBadRecord://坏记录
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {//无法识别
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}
//从文件中读取
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
//读入一个BLOCK
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
continue;
} else if (buffer_.size() == 0) {
// End of file
return kEof;
} else {
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "truncated record at end of file");
return kEof;
}
}
// 解析record头
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}
// 检查CRC
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}
}
leveldb研究4- 数据文件的格式和生成
Posted on 2012-03-12 18:21 小明 阅读(1222) 评论(1) 编辑 收藏 所属分类: 分布式计算leveldb使用SSTable格式来保存数据。
格式为:(当前没有META BLOCK)
SSTABLE = |DATA BLOCK1|DATA BLOCK2|...|DATA BLOCK N|META BLOCK1|...|META BLOCK N|META INDEX BLOCK|DATA INDEX BLOCK|Footer|
DATA BLOCK = |KeyValues|Restart arrays|array size|Compress Type|CRC
Footer(定长) = META INDEX BLOCK offset | DATA Index Block offset| Magic Numbers
比较细节的地方是数据块的压缩,针对key使用了前缀压缩法。
下面看看具体的实现。
//builder.cc
//dbname:数据库名称
//env:OS接口
//iter:指向MemTable的一个iterator
Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
TableCache* table_cache,
Iterator* iter,
FileMetaData* meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();
//生成文件名:格式 "0000x.sst"
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
//创建一个可写文件
s = env->NewWritableFile(fname, &file);
if (!s.ok()) {
return s;
}
//TableBuilder负责table生成和写入
TableBuilder* builder = new TableBuilder(options, file);
//META:最小key
meta->smallest.DecodeFrom(iter->key());
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
//META:最大key
meta->largest.DecodeFrom(key);
//增加数据到builder
builder->Add(key, iter->value());
}
// Finish and check for builder errors
if (s.ok()) {
//完成写入
s = builder->Finish();
if (s.ok()) {
meta->file_size = builder->FileSize();
assert(meta->file_size > 0);
}
} else {
builder->Abandon();
}
delete builder;
// Finish and check for file errors
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
//sync & close,写入磁盘
s = file->Close();
}
delete file;
file = NULL;
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(),
meta->number,
meta->file_size);
s = it->status();
delete it;
}
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (s.ok() && meta->file_size > 0) {
// Keep it
} else {
env->DeleteFile(fname);
}
return s;
}
} // namespace leveldb
我们来看看TableBuilder类,主要的细节都在这个类中实现了
TableBuilder中含有一个Rep的数据结构的指针,主要是用于保存builder的一些状态和数据。为什么不在TableBuilder头文件中直接定义这些变量?主要是不想暴露过多的细节给使用者,真是一个很好的做法。
struct TableBuilder::Rep {
Options options;
Options index_block_options;
WritableFile* file; //sstable文件指针
uint64_t offset;
Status status;
BlockBuilder data_block; //数据块
BlockBuilder index_block; //索引块
std::string last_key;//上一次的key,用于比较和建立索引
int64_t num_entries; //
bool closed; // 是否结束
bool pending_index_entry; //是否要新增索引块
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
};
新加一条记录:
//增加一条数据记录
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
//检查是不是顺序添加
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) { //是否生成新的index block
//检查当前是否是一个新的BLOCK
assert(r->data_block.empty());
//根据当前的key和上一个DATA BLOCK的最后一个主键生成最短的索引
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
//增加新的INDEX BLOCK,但不立即写入
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
//检查是否已经达到BLOCK SIZE,默认4K
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
//写一个DATA BLOCK
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
}
//写BLOCK
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
//文件格式: 数据+类型(1个字节)+ CRC(4个字节)
assert(ok());
Rep* r = rep_;
//生成binary
Slice raw = block->Finish();
//压缩数据
Slice block_contents;
CompressionType type = r->options.compression;
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer+1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
r->compressed_output.clear();
block->Reset();
}
完成文件的写入:
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle metaindex_block_handle;
BlockHandle index_block_handle;
if (ok()) {
//写入META INDEX BLOCK
BlockBuilder meta_index_block(&r->options);
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
//写入索引块
WriteBlock(&r->index_block, &index_block_handle);
}
if (ok()) {
//写入Footer,包含META INDEX BLOCK和INDEX HANDLE的offset
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
这里面有两个类BlockBuilder和BlockHandle,BlockBuilder负责把数据按照一定格式进行序列化,而BlockHandle负责记录offset,size等,可以理解为BLOCK的文件中指针。
我们看看BlockBuilder的实现,这里leveldb实现了前缀压缩法,因为一个BLOCK的key很接近,所以前后两个key相差不会很大,所以采取了<shared_size><non_shared_size><value_size><non_shared_data><value_data>的格式,节省了空间。
其中size采用了变长格式,很有意思的格式,主要是针对小整形做的一个优化,用最多8个字节来表示4个字节的整形,每个byte的最高一个bit用来指示还有没有后续数据,如果最高位为0,则表示没有后续的bytes.这样小于7F的数据只需要一个字节来表示。
可以参考这篇文章具体看实现variant32格式。
//完成写入
Slice BlockBuilder::Finish() {
// 写入restart数组,每隔options_->block_restart_interval(default:16)生成一个restart offset
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}
//写入restart的大小
PutFixed32(&buffer_, restarts_.size());
finished_ = true;
return Slice(buffer_);
}
void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= options_->block_restart_interval);
assert(buffer_.empty() // No values yet?
|| options_->comparator->Compare(key, last_key_piece) > 0);
size_t shared = 0;
//counter_内部计数器,用于记录当前restart后的个数
if (counter_ < options_->block_restart_interval) {
//看看当前的key和上一个有多少相同的bytes
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// Restart compression
restarts_.push_back(buffer_.size());
counter_ = 0;
}
const size_t non_shared = key.size() - shared;
// 写入 "<shared><non_shared><value_size>" to 缓冲
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());
// 写入 non_shared data和value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
// 设置 last_key_ 等于 当前的key
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
}
相关文章推荐
- python数据分析 第7天 数据的加载,存储,与文件格式
- 利用Python进行数据分析--数据加载、存储与文件格式
- 关于Java Web 使用 iText 将数据库中的 表 的数据 生成 PDF 格式文件(升級版本)
- 利用python进入数据分析之数据加载、存储、文件格式
- 利用python进行数据分析-数据加载、存储与文件格式1
- 使用JDBC获取数据库数据,并生成json格式文件(省市区三级联动)
- leveldb代码阅读(13)——数据文件的格式和生成
- Python 数据分析(一) 本实验将学习 pandas 基础,数据加载、存储与文件格式,数据规整化,绘图和可视化的知识
- Python 数据分析(一) 本实验将学习 pandas 基础,数据加载、存储与文件格式,数据规整化,绘图和可视化的知识
- 利用python进行数据分析-数据加载、存储与文件格式2
- 【Python数据分析与展示】(七)数据加载存储和文件格式
- 利用python进行数据分析之数据加载存储与文件格式
- innodb存储引擎之二进制日志文件ROW和STATEMENT格式以及重做日志文件分析与系统恢复详解(未完待续)
- leveldb代码阅读(12)——数据库日志文件格式
- leveldb研究9- 流程分析:打开数据库,写数据,读数据,随机写
- 利用存储过程数据库中的数据生成txt文件
- python数据分析之:数据加载,存储与文件格式
- 数据加载、存储与文件格式 利用Python进行数据分析 第6章
- oracle之物理数据库结构概述(数据文件、重做日志文件,控制文件等各种数据库文件)
- 大智慧日线数据文件格式分析