您的位置:首页 > 数据库

Michael.W谈hyperledger Fabric第27期-详细带读Fabric的源码12-历史状态数据库在代码级别的行为实现

2019-03-22 22:56 591 查看
版权声明:原创内容,转载请注明作者。 https://blog.csdn.net/michael_wgy_/article/details/88752305

Michael.W谈hyperledger Fabric第27期-详细带读Fabric的源码12-历史状态数据库在代码级别的行为实现

1 什么是历史状态数据库?

我前面有的帖子《Michael.W谈hyperledger Fabric第23期-详细带读Fabric的源码8-peer节点账本读写的底层机制与RWSet读写集合》中提到了历史状态索引这个模块。该模块的的底层依赖就是历史状态数据库,也称为历史数据库。

那历史状态数据库究竟是干嘛的呢?
如果想得到一个用户账户中的交易历史,状态数据库是无法满足的,那么就要从创世区块开始遍历,遇到与该账户相关的交易就记录一下。
这个过程是耗时的。历史状态数据库就是对这一过程进行精简
它并不记录具体的状态变动结果,而只记录某个账户的数据在哪里被更新了(记录交易ID)。
这样大大节省了存储空间。
综上所述,我们可以从历史状态服务器中拿到交易ID,然后在区块索引模块中根据交易ID在区块链中找到对应源数据。

历史状态数据库相关源码文件位置:core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go

2 如何表示一笔交易被改变?

在不存储具体改变值的情况下,如何标识某一个key在某一笔交易中被改变?
答:依靠组合键
先看源码:

var savePointKey = []byte{0x00}
var emptyValue = []byte{}

func (historyDB *historyDB) Commit(block *common.Block) error {
// 获得该区块的区块编号
blockNo := block.Header.Number
// 初始化tranNo
var tranNo uint64
// 获取一个事务对象,一个状态容器。
dbBatch := leveldbhelper.NewUpdateBatch()

logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
historyDB.dbName, blockNo, len(block.Data.Data))

// 从该区块的元数据中获取交易的有效码序列
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])

// 如果有效码序列为空,表名该区块为创世区块。为其按照正确格式重新初始化有效码序列
if len(txsFilter) == 0 {
txsFilter = util.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}

// 遍历该区块中的Data,即遍历该区块中的所有交易
for _, envBytes := range block.Data.Data {
// 如果该交易为无效交易,跳过
if txsFilter.IsInvalid(int(tranNo)) {
logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
historyDB.dbName, tranNo)
tranNo++
continue
}
// 将交易反序列化成*common.Envelope格式对象
env, err := putils.GetEnvelopeFromBlock(envBytes)
if err != nil {
return err
}
// 从*common.Envelope格式对象中获取payload
payload, err := putils.GetPayload(env)
if err != nil {
return err
}
// 从payload中将ChannelHeader反序列化
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return err
}

if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
// 如果该交易为模拟交易
// 从common.Envelope对象中获取ChaincodeAction对象。ChaincodeAction对象包含了调用智能合约时所发出的事件
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return err
}

// 为该交易初始化一个新的读写集合序列
txRWSet := &rwsetutil.TxRwSet{}

// 获取到该交易中的读写集合序列,并反序列化
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return err
}

// 开始遍历该交易的读写集合序列
for _, nsRWSet := range txRWSet.NsRwSets {
// 获取该读写集合的namespace
ns := nsRWSet.NameSpace
// 遍历该读写集合中的写集合[1]
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
// 获取键
writeKey := kvWrite.Key

// 利用该交易的namespace、写集的键、区块编号和在该区块中的交易编号,组合成一个组合键[2]
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)

// 将该组合键加入到前面创建的状态容器中,其所对应的值为空值。可见历史数据库,只使用了组合键来保存交易的改变状态,并且只记录哪个key在哪笔交易中状态被改变,不记录具体是怎么改变的。
dbBatch.Put(compositeHistoryKey, emptyValue)
}
}

} else {
// 如果该交易不是模拟交易,直接log
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}
// tranNo自加1
tranNo++
}

// 在levelDB中标识该区块结束
height := version.NewHeight(blockNo, tranNo)
dbBatch.Put(savePointKey, height.ToBytes())

// 对上面的事务进行提交,即写入数据库。
if err := historyDB.db.WriteBatch(dbBatch, true); err != nil {
return err
}

logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo)
return nil
}
  • [1] 写集合的结构为:
    type KVWrite struct {
    // 对应的键值
    Key      string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"`
    // 操作类型,即这个写操作是更新数据还是删除数据
    IsDelete bool   `protobuf:"varint,2,opt,name=is_delete,json=isDelete" json:"is_delete,omitempty"`
    // 对应的值
    Value    []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
    }
  • [2] 关于历史数据库的前缀组合键的构建过程位于文件:core/ledger/kvledger/history/historydb/histmgr_helper.go
    func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte {
    // 定义一个空的byte切片
    var compositeKey []byte
    // 加入namespace
    compositeKey = append(compositeKey, []byte(ns)...)
    compositeKey = append(compositeKey, compositeKeySep...)
    // 加入对应的键值
    compositeKey = append(compositeKey, []byte(key)...)
    compositeKey = append(compositeKey, compositeKeySep...)
    // 加入砌块编号
    compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(blocknum)...)
    // 加入交易编号
    compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(trannum)...)
    return compositeKey
    }

3 如何查询某key的变动历史

历史数据库的查询实现源码位于:core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb_query_executer.go

// levelDB查询类
type LevelHistoryDBQueryExecutor struct {
// 一个HistoryDB接口实例化对象
historyDB  *historyDB
// 用于持久化区块和检索区块的对象。实现了BlockStore接口
blockStore blkstorage.BlockStore
}

func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) {
// 判断是否启用了历史数据库
if ledgerconfig.IsHistoryDBEnabled() == false {
return nil, errors.New("History tracking not enabled - historyDatabase is false")
}

var compositeStartKey []byte
var compositeEndKey []byte
// 生成namespace~key~形式的前缀组合键(startkey)
compositeStartKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, false)
// 生成namespace~key~0xff形式的前缀组合键(endkey)
compositeEndKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, true)
// 利用levelDB的前缀匹配功能找到对应的迭代器。范围内的数据可以通过该迭代器获得。
dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
// 返回一个用于迭代的扫描器对象
return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}

关于levelDB前缀组合键的生成源码:

func ConstructPartialCompositeHistoryKey(ns string, key string, endkey bool) []byte {
var compositeKey []byte
// 加入namespace
compositeKey = append(compositeKey, []byte(ns)...)
compositeKey = append(compositeKey, compositeKeySep...)
// 加入对应键值
compositeKey = append(compositeKey, []byte(key)...)
compositeKey = append(compositeKey, compositeKeySep...)
if endkey {
// 如果endkey为true,在compositeKey的结尾加入0xff
compositeKey = append(compositeKey, []byte{0xff}...)
}
// 返回组合键compositeKey
return compositeKey
}

下面来看一下用于迭代的扫描器定义:

// 实现了对历史数据的迭代
type historyScanner struct {
// 前缀组合键
compositePartialKey []byte //compositePartialKey includes namespace~key
namespace           string
key                 string
// 利用levelDB的前缀匹配功能返回的数据迭代器
dbItr               iterator.Iterator
// 区块索引的接口
blockStore          blkstorage.BlockStore
}
// 创建一个扫描器对象
func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}
// 利用扫描器从迭代器中获取数据
func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
// 判断迭代器是否迭代完毕
if !scanner.dbItr.Next() {
return nil, nil
}
// 获取迭代器中当前KV键值对的键。键的形式为:namespace~key~blocknum~trannum
historyKey := scanner.dbItr.Key()

// 以前缀组合键为前缀为分割符,返回后半部分的组合键:blocknum~trannum
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
// 获取区块编号
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
// 获取交易编号
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)
// 利用区块索引的接口实现对象。根据区块编号和交易编号从区块文件中找到对应的交易数据
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}

// 从返回的交易数据中获得该笔交易的ID、写集的value值、时间戳和删除标志
queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
// 返回
return queryResult, nil
}

ps:
本人热爱图灵,热爱中本聪,热爱V神,热爱一切被梨花照过的姑娘。
以下是我个人的公众号,如果有技术问题可以关注我的公众号来跟我交流。
同时我也会在这个公众号上每周更新我的原创文章,喜欢的小伙伴或者老伙计可以支持一下!
如果需要转发,麻烦注明作者。十分感谢!

公众号名称:后现代泼痞浪漫主义奠基人

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐