Michael.W谈hyperledger Fabric第27期-详细带读Fabric的源码12-历史状态数据库在代码级别的行为实现
Michael.W谈hyperledger Fabric第27期-详细带读Fabric的源码12-历史状态数据库在代码级别的行为实现
1 什么是历史状态数据库?
我前面有的帖子《Michael.W谈hyperledger Fabric第23期-详细带读Fabric的源码8-peer节点账本读写的底层机制与RWSet读写集合》中提到了历史状态索引这个模块。该模块的的底层依赖就是历史状态数据库,也称为历史数据库。
那历史状态数据库究竟是干嘛的呢?
如果想得到一个用户账户中的交易历史,状态数据库是无法满足的,那么就要从创世区块开始遍历,遇到与该账户相关的交易就记录一下。
这个过程是耗时的。历史状态数据库就是对这一过程进行精简。
它并不记录具体的状态变动结果,而只记录某个账户的数据在哪里被更新了(记录交易ID)。
这样大大节省了存储空间。
综上所述,我们可以从历史状态服务器中拿到交易ID,然后在区块索引模块中根据交易ID在区块链中找到对应源数据。
历史状态数据库相关源码文件位置:core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go
2 如何表示一笔交易被改变?
在不存储具体改变值的情况下,如何标识某一个key在某一笔交易中被改变?
答:依靠组合键。
先看源码:
var savePointKey = []byte{0x00} var emptyValue = []byte{} func (historyDB *historyDB) Commit(block *common.Block) error { // 获得该区块的区块编号 blockNo := block.Header.Number // 初始化tranNo var tranNo uint64 // 获取一个事务对象,一个状态容器。 dbBatch := leveldbhelper.NewUpdateBatch() logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions", historyDB.dbName, blockNo, len(block.Data.Data)) // 从该区块的元数据中获取交易的有效码序列 txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) // 如果有效码序列为空,表名该区块为创世区块。为其按照正确格式重新初始化有效码序列 if len(txsFilter) == 0 { txsFilter = util.NewTxValidationFlags(len(block.Data.Data)) block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter } // 遍历该区块中的Data,即遍历该区块中的所有交易 for _, envBytes := range block.Data.Data { // 如果该交易为无效交易,跳过 if txsFilter.IsInvalid(int(tranNo)) { logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d", historyDB.dbName, tranNo) tranNo++ continue } // 将交易反序列化成*common.Envelope格式对象 env, err := putils.GetEnvelopeFromBlock(envBytes) if err != nil { return err } // 从*common.Envelope格式对象中获取payload payload, err := putils.GetPayload(env) if err != nil { return err } // 从payload中将ChannelHeader反序列化 chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { return err } if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION { // 如果该交易为模拟交易 // 从common.Envelope对象中获取ChaincodeAction对象。ChaincodeAction对象包含了调用智能合约时所发出的事件 respPayload, err := putils.GetActionFromEnvelope(envBytes) if err != nil { return err } // 为该交易初始化一个新的读写集合序列 txRWSet := &rwsetutil.TxRwSet{} // 获取到该交易中的读写集合序列,并反序列化 if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil { return err } // 开始遍历该交易的读写集合序列 for _, nsRWSet := range txRWSet.NsRwSets { // 获取该读写集合的namespace ns := nsRWSet.NameSpace // 遍历该读写集合中的写集合[1] for _, kvWrite := range nsRWSet.KvRwSet.Writes { // 获取键 writeKey := kvWrite.Key // 利用该交易的namespace、写集的键、区块编号和在该区块中的交易编号,组合成一个组合键[2] compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo) // 将该组合键加入到前面创建的状态容器中,其所对应的值为空值。可见历史数据库,只使用了组合键来保存交易的改变状态,并且只记录哪个key在哪笔交易中状态被改变,不记录具体是怎么改变的。 dbBatch.Put(compositeHistoryKey, emptyValue) } } } else { // 如果该交易不是模拟交易,直接log logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo) } // tranNo自加1 tranNo++ } // 在levelDB中标识该区块结束 height := version.NewHeight(blockNo, tranNo) dbBatch.Put(savePointKey, height.ToBytes()) // 对上面的事务进行提交,即写入数据库。 if err := historyDB.db.WriteBatch(dbBatch, true); err != nil { return err } logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo) return nil }
- [1] 写集合的结构为:
type KVWrite struct { // 对应的键值 Key string `protobuf:"bytes,1,opt,name=key" json:"key,omitempty"` // 操作类型,即这个写操作是更新数据还是删除数据 IsDelete bool `protobuf:"varint,2,opt,name=is_delete,json=isDelete" json:"is_delete,omitempty"` // 对应的值 Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` }
- [2] 关于历史数据库的前缀组合键的构建过程位于文件:core/ledger/kvledger/history/historydb/histmgr_helper.go
func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte { // 定义一个空的byte切片 var compositeKey []byte // 加入namespace compositeKey = append(compositeKey, []byte(ns)...) compositeKey = append(compositeKey, compositeKeySep...) // 加入对应的键值 compositeKey = append(compositeKey, []byte(key)...) compositeKey = append(compositeKey, compositeKeySep...) // 加入砌块编号 compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(blocknum)...) // 加入交易编号 compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(trannum)...) return compositeKey }
3 如何查询某key的变动历史
历史数据库的查询实现源码位于:core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb_query_executer.go
// levelDB查询类 type LevelHistoryDBQueryExecutor struct { // 一个HistoryDB接口实例化对象 historyDB *historyDB // 用于持久化区块和检索区块的对象。实现了BlockStore接口 blockStore blkstorage.BlockStore } func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) { // 判断是否启用了历史数据库 if ledgerconfig.IsHistoryDBEnabled() == false { return nil, errors.New("History tracking not enabled - historyDatabase is false") } var compositeStartKey []byte var compositeEndKey []byte // 生成namespace~key~形式的前缀组合键(startkey) compositeStartKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, false) // 生成namespace~key~0xff形式的前缀组合键(endkey) compositeEndKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, true) // 利用levelDB的前缀匹配功能找到对应的迭代器。范围内的数据可以通过该迭代器获得。 dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey) // 返回一个用于迭代的扫描器对象 return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil }
关于levelDB前缀组合键的生成源码:
func ConstructPartialCompositeHistoryKey(ns string, key string, endkey bool) []byte { var compositeKey []byte // 加入namespace compositeKey = append(compositeKey, []byte(ns)...) compositeKey = append(compositeKey, compositeKeySep...) // 加入对应键值 compositeKey = append(compositeKey, []byte(key)...) compositeKey = append(compositeKey, compositeKeySep...) if endkey { // 如果endkey为true,在compositeKey的结尾加入0xff compositeKey = append(compositeKey, []byte{0xff}...) } // 返回组合键compositeKey return compositeKey }
下面来看一下用于迭代的扫描器定义:
// 实现了对历史数据的迭代 type historyScanner struct { // 前缀组合键 compositePartialKey []byte //compositePartialKey includes namespace~key namespace string key string // 利用levelDB的前缀匹配功能返回的数据迭代器 dbItr iterator.Iterator // 区块索引的接口 blockStore blkstorage.BlockStore } // 创建一个扫描器对象 func newHistoryScanner(compositePartialKey []byte, namespace string, key string, dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner { return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore} } // 利用扫描器从迭代器中获取数据 func (scanner *historyScanner) Next() (commonledger.QueryResult, error) { // 判断迭代器是否迭代完毕 if !scanner.dbItr.Next() { return nil, nil } // 获取迭代器中当前KV键值对的键。键的形式为:namespace~key~blocknum~trannum historyKey := scanner.dbItr.Key() // 以前缀组合键为前缀为分割符,返回后半部分的组合键:blocknum~trannum _, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey) // 获取区块编号 blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:]) // 获取交易编号 tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:]) logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n", scanner.namespace, scanner.key, blockNum, tranNum) // 利用区块索引的接口实现对象。根据区块编号和交易编号从区块文件中找到对应的交易数据 tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum) if err != nil { return nil, err } // 从返回的交易数据中获得该笔交易的ID、写集的value值、时间戳和删除标志 queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key) if err != nil { return nil, err } logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n", scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId) // 返回 return queryResult, nil }
ps:
本人热爱图灵,热爱中本聪,热爱V神,热爱一切被梨花照过的姑娘。
以下是我个人的公众号,如果有技术问题可以关注我的公众号来跟我交流。
同时我也会在这个公众号上每周更新我的原创文章,喜欢的小伙伴或者老伙计可以支持一下!
如果需要转发,麻烦注明作者。十分感谢!
公众号名称:后现代泼痞浪漫主义奠基人
- Michael.W谈hyperledger Fabric第28期-详细带读Fabric的源码13-区块的存储持久化
- 区块链教程Fabric1.0源代码分析Ledger historydb历史数据库
- Hyperledger Fabric的PBFT源码分析(一)
- Hyperledger Fabric 排序节点处理 Broadcast 请求的实现
- BootStrap Table复选框默认选中功能的实现代码(从数据库获取到对应的状态进行判断是否为选中状态)
- Hyperledger Fabric 链码启动过程实现
- Hyperledger Fabric 1.0 从零开始(四)——Fabric源码及镜像文件处理
- 区块链开源实现hyperledger fabric架构详解
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- 区块链之Hyperledger(超级账本)Fabric v1.0 的环境搭建(超详细教程)
- 如何向 Hyperledger Fabric 项目贡献代码
- 区块链之Hyperledger(超级账本)Fabric v1.0 的环境搭建(超详细教程)
- Java为Hyperledger Fabric(超级账本)开发区块链链代码智能合约之环境部署
- 利用JavaScript实现省市的二级联动(代码简洁+详细注释+无数据库)
- BootStrap Table复选框默认选中功能的实现代码(从数据库获取到对应的状态进行判断是否为选中状态)
- 新浪微博自动转发评论 源码 按键精灵实现 详细注释 几十行代码实现 涉及图像识别模拟键盘鼠标
- Hyperledger Fabric处理Peer与Peer之间通信的源码解析
- Hyperledger fabric 源码分析之 peer 服务启动过程
- Hyperledger Fabric密码模块系列之BCCSP(五) - 国密算法实现
- 在HyperLedger Fabric中启用CouchDB作为State Database(区块链数据库)