您的位置:首页 > 其它

fabric源码解析25——ledger之VersionedDB和HistoryDB

2018-01-05 19:51 573 查看

fabric源码解析25——ledger之VersionedDB和HistoryDB

续接前文

续接《fabric源码解析24》,本文讲述另外两个账本VersionedDB,HistoryDB。为了比较清楚的说明问题,本文将以
peer invoke...
命令调用examples/chaincode/go/下的chaincode_example02/chaincode_example02.go和map/map.go两个chaincode为例(假设两个chaincode已正确部署)。

VersionedDB

VersionedDB账本可以叫做状态账本,这个状态(state),即是官方文档中所提及的“世界状态”,world state,具体即为由每个交易数据最新生成的一个关于交易读写集的有效的键值对。该数据库中有几个重要的对象,如交易模拟器、查询器、验证器、读写集等,其中这些对象所体现的一些概念和操作还是比较难理解的,笔者在这卡了将近两个星期,所以这里强烈建议先阅读一些官方文档的State Database和Read-Write set semantics两处的内容。

VersionedDB使用的state数据库有两个版本,即leveldb版本和couchdb版本,这里只讲leveldb版本,两者的主要区别在于:leveldb只支持基本的以key为基础的查询,而couchdb则支持更丰富的查询手段,即富查询(参看core/ledger/util/couchdb/couchdb_test.go中的TestRichQuery),对于调用者来说也更接近事物逻辑(如可以根据某账户的特征,如颜色,尺寸等特征进行查询,只需要预先定义好)。state数据库也不是直接由账本使用的,而是被一个可称作交易管理者的对象TxMgr持有并管理,账本通过这个交易管理者向state数据库读写交易数据。同时,TxMgr对象也提供其他功能,比如提供交易模拟器、交易查询器、交易验证器,这些“器”将在读写交易数据的时候发挥作用。TxMgr的基础目录在core/ledger/kvledger/txmgmt/下(该章节以此目录为基准),具体的实现为txmgr/lockbasedtxmgr/lockbased_txmgr.go中的
LockBasedTxMgr


在peer结点中使用VersionedDB的过程:用户使用peer结点发起一笔交易,如ACC的部署,是一个明显的会写入数据的交易,再如用户执行
peer chaincode query ...
,则是一个明显的会读出数据的交易。在交易过程中,这些读取和写入的交易数据会通过TxSimulator(交易模拟器)放入一个叫做读写集的容器,在交易返回时,再使用TxSimulator统一读取,然后将读取的数据放入ProposalResponse返回,返回后读集中的数据直接打印,写集中的数据被放入Envelope中再发送给orderer(请参看peer/chaincode/common.go中的
ChaincodeInvokeOrQuery
)。orderer结点依据Envelope生成block后,再返回给peer结点。peer结点接收后,经gossip模块接收并提交到账本对象后,账本对象添加block,会添加到VersionedDB账本中,VersionedDB又会将block中交易的数据最终添加到state数据库中。以执行chaincode_example02.go的
invoke(stub,args)
(该函数简单的查询了A、B两个账户的余额,并由A向B转了一笔钱,即形成了A、B两个账户转账调整后各自新的余额值)为例,粗线条的步骤如下:

交易发起端执行
peer chaincode invoke -n chaincode_example02 -c '{"Args":["invoke","a","b","10"]}'
,通过peer/chaincode/invoke.go中的
chaincodeInvoke(...)
函数向endorser结点发送请求。假设该交易为Tx_A。

core/endorser/endorser.go中,
ProcessProposal
->
txsim, err = e.getTxSimulator(chainID)
,创建了一个交易模拟器。

e.simulateProposal(...,txsim)
->
e.callChaincode(...,txsim)


经过一系列辗转,会在core/chaincode/shim/handler.go中,
handleTransaction
中调用
res := handler.cc.Invoke(stub)
(即chaincode_example02.go中的Invoke) -> 根据第1步的参数,在
Invoke()
中进入
if function == "invoke"
分支从而调用
invoke()
-> 再经过一系列辗转,会在core/chaincode/handler.go中,
enterBusyState
,通过交易模拟器的
txsimulator.GetState()
txsimulator.SetState(...)
,将A、B账户的数据放入读集,把更改的A、B账户的数据放入了写集(其他交易会用到了其他的
DeleteState()
GetStateRangeScanIterator()
ExecuteQuery()
),这一步相当于模拟执行了交易。

重回core/endorser/endorser.go中,
simResult, err = txsim.GetTxSimulationResults()
-> 返回至
ProcessProposal()
->
pResp, err = e.endorseProposal(...,simulationResult,...)
->
e.callChaincode(...)
,获取交易的写集,并进行了第二轮的
callChaincode
,一系列调用后会进入下一步。

core/scc/escc/endorser_onevalidsignature.go中,
Invoke(...)
->
presp, err := utils.CreateProposalResponse(...,results,...)
,results为交易Tx_A的结果集,被放入ProposalResponse返回至第5步的
endorseProposal
,在返回给交易发起端。

peer/chaincode/common.go中,
ChaincodeInvokeOrQuery()
->
env, err := putils.CreateSignedTx(...)
,把返回的ProposalResponse打包成Envelope ->
err = bc.Send(env)
,发送给orderer -> …,orderer处理block过程省略 -> orderer将形成的block发送给peer结点,再经gossip模块散播使用committer模块的过程,这里亦省略。

committer向账本提交block时,最终定位到core/ledger/kvledger/kv_ledger.go中的
Commit(block)
->
err = l.txtmgmt.ValidateAndPrepare(block, true)
l.txtmgmt.Commit()
即是使用交易管理者向state提交交易数据。
ValidateAndPrepare
做两件事,一是使用验证器验证交易的读写集,以确定交易的有效性,二是若交易有效,则将交易的写集中的数据放入数据升级包中,为下一步的
l.txtmgmt.Commit()
提交这批数据做准备。

交易模拟器/交易查询器

交易模拟器TxSimulator和交易查询器QueryExecutor的接口在core/ledger/ledger_interface.go中定义,具体实现为txmgr/lockbasedtxmgr/下lockbased_tx_simulator.go中的
lockBasedTxSimulator
和lockbased_query_executer.go中的
lockBasedQueryExecutor
。通过账本
kvLedger
NewTxSimulator()
NewQueryExecutor()
接口可获取TxSimulator和NewQueryExecutor。两者均直接使用helper.go中的
queryHelper
实现了自身的查询方面的功能,这点从queryHelper这个名字可以看出来。TxSimulator实际上包含QueryExecutor,而QueryExecutor算是TxSimulator在查询功能上的增强和拓展(其中交易查询器的
ExecuteQuery
是富查询接口,因此只支持couchDB版本的VersionedDB),因此直接使用的一般是交易模拟器。除了查询,TxSimulator还提供状态的写入功能,写入分为增加和删除。因此,TxSimulator和QueryExecutor涉及的操作就有查、增、删

参看txmgr/lockbasedtxmgr/lockbased_tx_simulator.go和helper.go,交易模拟器的读、写、删的操作步如下:

GetState(ns, key)
->
q.helper.getState(ns, key)
,依据名字空间和key,从state数据库中获取一个状态,返回并写入读集(返回的是,写入读集的是值的版本)。这里的名字空间,即是chaincodeID,也即对应一个chaincode的每一个交易使用一个读写集,下同。

SetState(ns, key, value)
->
s.rwsetBuilder.AddToWriteSet(ns, key, value)
,将一个值写入写集。

DeleteState(ns, key)
->
SetState(ns, key, nil)
,删除一个值,当给的key的value为nil时,即表示要将此key的值置为nil,也即要删除这个值。

SetStateMultipleKeys(ns, kvs)
->
for ... { SetState(...) }
,一次性设置多个键值对,写入交易的写集。

GetStateMultipleKeys(...)
->
q.helper.getStateMultipleKeys(...)
,依据一个名字空间和一批key,从state数据库中获取一批状态。

GetStateRangeScanIterator(...)
->
q.helper.getStateRangeScanIterator(...)
,依据一个命名空间,开始的key,结束的key,从state数据库中获取一个指定范围的交易查询迭代器。

ExecuteQuery(namespace,query)
->
q.helper.executeQuery(namespace,query)
,这个接口对于leveldb来说不支持。

Done()
->
q.helper.done()
,交易查询器执行完毕。

读写集

TxSimulator之所以叫交易模拟器,就是因为在使用它处理交易的时候,所形成的交易数据并未真正的直接写入state数据库中,而是将交易查、增、删得到的数据暂时放入了一个叫做读写集的地方备用,因此是模拟交易。

一个chaincode的每笔交易都对应一个读写集,读写集实现为rwsetutil/rwset_builder.go中的
RWSetBuilder
,成员只有一个
rwMap map[string]*nsRWs
映射,即以chaincodeID为key,每个chaincode单独有一个
nsRWs
。参看读写集结构图片read_write_set.png,读写集存储三类数据:
KVRead
读值、
KVWrite
写值、
RangeQueryInfo
范围读值(三者均定义在protos/ledger/rwset/kvrwset/下),多个值各自形成
readMap
读集、
writeMap
写集、
rangeQueriesMap
rangeQueriesKeys
组成的范围读集。

读值
KVRead
,每个读值只包含key和值的版本号,而不包含值本身。TxSimulator每次调用
GetState()
接口,都会将读取到的读值写入交易的读集。写值
KVWrite
,每个写值包含key和value,还包含一个此写值是否为删除的标识IsDelete。TxSimulator每次调用
SetState()
DeleteState()
等接口,都会将一个写值写入交易的写集。范围读值
RangeQueryInfo
,每个范围读值是一个范围内所有读值的集合,范围读值何时被写入将在下文提及。
rangeQueriesMap
rangeQueryKey
为map的key,
RangeQueryInfo
为map的value。
rangeQueryKey
中,startKey标识了范围从何开始,endKey标识了范围至何处结束(注意,范围读值中不包含endkey对应的值),itrExhausted标识了是否结束。
RangeQueryInfo
中,其余的成员同
rangeQueryKey
一样,而成员ReadsInfo存储两类数据:原始的范围内的多个读值或范围内的读值的哈希值。范围内的多个原始的读值很容易理解,而范围内读值的哈希值的生成则需要借助其他工具,且往下看。

当用户发起读取一定范围数据的交易时,不是直接将这个范围内所有读值集合打包返回给调用者,而是将一个包含了读值范围信息的的迭代器返回给调用者(这样可以避免数据量过大而导致的处理效率低下),然后调用者使用迭代器的
Next()
一个一个抽取读值。这里涉及到两个迭代器:(1)在交易模拟器的范围内,为txmgr/lockbasedtxmgr/helper.go中的resultsItr,由交易模拟器的
GetStateRangeScanIterator(...)
接口获取,每获取一个迭代器,都会把迭代器记录在queryHelper中的itrs,再次强调,TxSimulator包含QueryExecutor,而提供迭代器也是QueryExecutor对TxSimulator在查询功能上上主要的拓展之一。(2)在核心的chaincode处理范围内,为core/chaincode/shim/chaincode.go中的StateQueryIterator。该迭代器的使用依附于(1)中迭代器所查询出来的结果。

迭代器

resultsItr迭代器管理了一个工具,为rwsetutil/query_results_helper.go中的
RangeQueryResultsHelper
(亦是“人如其名”)。
RangeQueryResultsHelper
中的pendingResults用于暂时存储范围内的读值,merkleTree用于存储一棵默克尔树,该树存储pendingResults对应的哈希值,两者是随着数据的增加同步更新的(其实哈希值是否同步生成是由hashingEnabled决定的,而该值又由账本的配置决定的,该配置默认开启,由core/ledger/ledgerconfig/ledger_config.go中的
IsQueryReadsHashingEnabled()
接口决定)。每次resultsItr迭代器被调用一次
Next()
,读取的读值都会添加到
RangeQueryResultsHelper
中的pendingResults和merkleTree中暂存。

这里跳开粗略的讲一下默克尔树,也叫哈希树。实现为query_results_helper.go中的
merkleTree
,由工具
RangeQueryResultsHelper
持有并管理。在
merkleTree
中,一个map[MerkleTreeLevel][]Hash格式的映射中实现了该树,树的每个结点保存的是哈希值。默克尔树中有两个比较重要的值:Level和maxDegree。Level标识树的层级,默认为1,当树中的Level1有maxDegree+1个结点时,则会进行一个归并哈希值的操作:如两个结点存有hash1,hash2,该操作是将hash1和hash2的值前后连成一个整体,然后对这个整体进行哈希,得到一个新的哈希值hash1-2,然后将hash1-2放入上一层(也就是Level2)里面。下面也是举例子说明默克尔树的操作,对看
merkleTree
update(nextLeafLevelHash)
函数:

//设maxDegree==2,则连续插入1-9共9个哈希值,即调用9次merkleTree的update接口
//[x]-{hashN},x为Level值,hashN代表N的哈希值。则tree的变化为:

1.[1]-{hash1}
2.[1]-{hash1,hash2}
3.[2]-{hash1-3},插入第3个时,因为len(currentLevelHashes) <= m.maxDegree不成立,所有会向下一层,也就是第2层归集哈希值,同时删除第1层的值
4.[1]-{hash4},[2]-{hash1-3}
5.[1]-{hash4,hash5},[2]-{hash1-3}
6.[2]-{hash1-3,hash4-6},插入第hash6时,同第3步原因一样和操作一样
7.[1]-{hash7},[2]-{hash1-3,hash4-6}
8.[1]-{hash7,hash8},[2]-{hash1-3,hash4-6}
9.[3]-{hash1-9},这一步插入hash9时,hash7-9向第2层归集,同时删除第1层的值,此时第2层也够了3个,此时会继续向第3层归集,归集第2
层hash1-3,hash4-6,hash7-9三个哈希值,形成第3层的hash1-9,同时会删除第2层的值


当工具
RangeQueryResultsHelper
工具执行
Done()
时,会将存储的pendingResults和merkleTree一同返回,两者实际上只会返回其中一个,其中一个必为nil。当pendingResults中的存储读值的数量小于等于maxDegree时,是不会触发归并哈希值的操作的,因此此时返回的merkleTree为nil。其他情况下,因为hashingEnabled默认为true,所有只会返回一个归并后的唯一的哈希值。这个哈希值就代表了一个resultsItr迭代器被调用
Next()
而被读出的所有读值。返回一个归并后的哈希值而不返回原始的范围读集,这样在效率上更高。另外从这一点还可以看出,返回范围读值,只可能是被用于验证的目的,原因有二:一是读值中本身就没有key对应的值,而只有版本号。二是这里可以只返回范围读集的哈希值。

以map.go为例,当调用
Invoke(stub)
中的
case "keys":
分支时,
keysIter, err := stub.GetStateByRange(startKey, endKey)
即获取了一个迭代器,然后在
for keysIter.HasNext()
中依次抽取值。keysIter即为
StateQueryIterator
,该迭代器中存储了执行范围查询操作后所查出来的读值集合:(1)在core/chaincode/handler.go的
handleGetStateByRange(...)
中,
rangeIter, err := txContext.txsimulator.GetStateRangeScanIterator(...)
,使用交易模拟器获取一个resultsItr,该迭代器会被放入queryHelper中的itrs。(2)
payload, err = getQueryResponse(...)
,,使用resultsItr这个迭代器将指定范围内的结果查询出来,放入payload。这里就有一个maxResultLimit的限制,即查询范围的起止最多是100个数据。同时也就是说,传入的起止的key所得到的查询结果,不一定是该范围内的全部数据。而指定范围内是否还有更多的数据,会使用payload中的HasMore变量进行标识。(3)payload最终被返回到core/chaincode/shim/chaincode.go中的
handleGetStateByRange()
,被放入StateQueryIterator这个迭代器中。

当交易模拟器将交易模拟完毕,即读写集填充完毕,在交易返回给调用者之前,会一次性将交易读写集中的数据进行抽取,放入到应答信息ProposalResponse中,即上文关于peer结点使用VersionedDB的过程中的第6步,也即调用TxSimulator的
GetTxSimulationResults()
接口:(1)
s.Done()
,只能执行一次。TxSimulator每次执行范围查询时,不会像读取单个值一样直接就把查询的数据放入读集中,而是只将迭代器存储在queryHelper中的itrs,这些迭代器即代表了交易中读取了哪些范围内的数据。等到此时执行Done时,才会依据这些迭代器,一个迭代器对应一个范围读值,将每个范围读取的所有读值填写范围读集中。(2)
s.rwsetBuilder.GetTxReadWriteSet()...
,该函数主要起到一个排序,然后换个包装的作用。先使用
sort.Strings(keys)
对读集、写集、范围读集按照key进行排序,然后将读写集从
RWSetBuilder
中换装到rwsetutil/rwset_proto_util.go中定义的
TxRwSet
里面。

验证器

state数据库是保存的最实时最新的状态,peer结点进行查询操作时,也是从该账本读取的数据。所以在向state数据库提交交易数据前要使用验证器进行验证(对看上文peer结点使用VersionedDB的过程的第8点)。验证的时候,无效的交易将被屏蔽掉而不会提交到state账本中。因此需要一个验证器,这个验证器在validator/statebasedval/state_based_validator.go中实现为
Validator
,其
ValidateAndPrepareBatch(block,domvcc)
接口负责实现验证功能,其中第2个参数domvcc决定了是否进行mvcc验证,该值给的为true。关于mvcc(多版本并发控制),各位可以自行搜索参看一下与数据库相关的mvcc的作用。

存在验证就存在对比,因为验证只能通过对比。对比的内容:key对应的版本号。对比的双方:甲方是block中携带的读集和范围读集,乙方是验证时现从state数据库里查询出的甲方对应的读值和范围读集+由甲方自带的有效交易的写集形成的升级数据包updates(参看下文)。范围读集中虽说可能是哈希值,但是这些哈希值也是在原始读值(包含key和版本)的基础上形成的哈希值,在验证的时候,再从state数据库中读取同样范围内的读值集合,按照同样的方法生成一个新的哈希值,如果两个哈希值不同,则间接说明了当前state数据库中这个范围内的值已经有所变动,即比较哈希值其实也是在比较key对应的版本号。如现在验证的block中一个Envelope中的读集中有两个读值
<read key="K1", version="1">,<read key="K2", version="1">
,有一个范围读值
<queryinfo,startkey=10,endkey=20,true,hash10-20>
(注意乙方的updates在这个例子里面没有体现,更好的例子请参看官方文档Example simulation and validation

章节)。验证的方法只有一个:验证时,依次将K1,K2的version与现从state数据库中现读出K1,K2的版本号相比较,然后将
hash10-20
与现从state数据库中读出startkey-endkey间的读值并按照同样的归并哈希的方法生成的新哈希值
hash10-20'
进行比较。概括点说,就是拿之前模拟交易产生的读集中的版本号与state先有的相应的版本号进行比较。验证的标准只有一个:若以上所有的比较均相同,则判定此交易有效,任何一个比较不同,则判定交易无效。验证的目的只有一个:将有效交易的写集数据放入批量升级包中,准备更新state数据库。

关于理解为何如此判定交易的有效性,有几点说明:

state是存储当前账本所有最新有效交易的世界状态的数据库。这里的关键词是最新有效交易

所有block均为串行化后的数据。state数据库中值的版本号由blockID+TxID组成,blockID为序列递增,TxID则为交易在block中一批交易中的相对位置(即交易在block.Data.Data这个数组中的下标值,亦是相对序列递增)。因此当一个新的有效交易的数据被提交至state数据库中进行更新,该值的版本号与原值的版本号必然不一样,也即更新state数据库的值,值的版本号必然变化。

模拟交易读出的数据都会放入读集中。这里假定调用者在执行chaincode的模拟交易时,读取出来的数据会被使用。这个也很正常,既然读出来了,就有被使用的可能。比如chaincode_example02.go中,即是先查出来A,B账户的余额数据,并在此基础上进行的数据的加减。因此如果读集的数据失效,在此基础上得出的写集中的数据也必然无效。

fabric对交易是并发并行处理的,且自模拟交易产生读写集 -> 数据返回到peer结点 -> 送到orderer -> 再返回peer的gossip -> 再准备提交至state数据库,这个过程会耗费一段时间,而这段时间内,原有交易所使用的读出数据有可能已经被其他稍早点儿的交易改变。比如Tx_1查到的A的余额为100,B的余额为100,A把50元转给B,则形成的读集是A-v1,B-v1,写集是A-50,B-150。这个读写集在到达Validator验证之前,经历了一段时间。就在这一段时间内,稍早点的交易Tx_0已经把A的20元转给了B,A的余额为80(假定版本变为v2),B的余额为120(假定版本变为v2)。如果此时依然判定Tx_1有效,将Tx_1的写集提交至state数据库,则将会覆盖Tx_0的交易,此时A的余额仍然为50(实际应该是30),B的余额仍然为150(实际应该是170),这样肯定不行。

由第4点,可能有人会推出另一个疑问:既然从Tx_1在交易模拟器生成读写集数据到提交前验证之间会耗费一段时间,存在稍早的交易Tx_0已经把state数据更改而使得Tx_1无效的风险,那么Tx_1在通过验证到真正提交到state数据库中依然存在一段时间,这期间也可能存在被稍早的交易Tx_N将state数据更改的风险,这点如何保证呢?其实不存在Tx_N这种情况,因为如第2点所说,读写集是由block带入的,而block均为串行化依次处理的。这样的话只会存在两种情况:(1)Tx_0与Tx_1不在同一个block中,则Tx_0所在的block早于Tx_1所在的block。此时一定是Tx_0所在的block处理完毕之后,才会开始处理Tx_1所在的block,也因此当处理到Tx_1开始验证时,此时从state数据库中取出的版本号必定是Tx_0提交过后的新的版本号v2,也因此Tx_1(版本号为v1)会被判定为无效交易。(2)Tx_0与Tx_1在同一个block中,则Tx_0所在的位置(即下标值)小于Tx_1所在的位置。这种情况会先验证Tx_0交易,如果Tx_0有效,则会把Tx_0的写集放入到一个updates升级包中(但此时这个升级包并未提交到state数据库中),接着会继续验证Tx_1,会发现updates中存在与Tx_1读集相同的key,也即说明在Tx_1之前的Tx_0已经在试图改变Tx_1所使用到的数据,而Tx_0又是有效且早于Tx_1的,因此要判定Tx_1为无效。

验证过程

验证过程还是很值得看的,因为验证的过程不仅可以了解数据的包装结构,还能很清楚的知道什么情况下是对的,什么情况下是错的,都需要哪些对象参与,等等,这些都对于整个业务逻辑和流程的理解都很有帮助。其实committer将block提交至账本处时已经使用自身的验证器执行了一轮验证(参看core/committer/txvalidator/validator.go中的
Validate(block)
),并把非法的无效交易进行记录,不过该轮验证主要集中在验证block的身份、签名、权限等内容。而这里所讲的是账本中对数据的mvcc等验证,以确保有效交易能正确存储到state数据库中,所以验证过程以validator/statebasedval/state_based_validator.go中的
ValidateAndPrepareBatch(block,domvcc)
为起点(domvcc默认为true),从函数名即可理解这个函数分为上下两部分:Validate和PrepareBatch,即验证和准备批量升级包updates。

updates := statedb.NewUpdateBatch()
,准备updates用于存放批量升级包,这个updates就是上文提到的升级包,会以参数的形式传递到每个更具体的验证函数中去,作为乙方的一部分参与验证 ->
txsFilter := util.TxValidationFlags(...)
,从block中抽取出block元数据的BlockMetadataIndex_TRANSACTIONS_FILTER位的数据txsFilter,该数据标记了block中的每笔交易有效性(这里的有效性数据即是committer处验证的结果),若是无效交易,这里的验证将直接跳过。交易的有效值为在protos/peer/transaction.pb.go中定义的如TxValidationCode_VALID的系列常量值。

for txIndex, envBytes := range block.Data.Data { ... }
,依次抽取block中的每个Envelope,开始验证 -> 一系列解压抽取Envelope的工作后,
if txType != common.HeaderType_ENDORSER_TRANSACTION
,判定类型,若不是正常的交易数据,则直接跳过,state只存储交易数据 -> 最重要的一步,
txRWSet, txResult, err := v.validateEndorserTX(env...)
,验证交易,返回交易的读写集和验证结果 ->
txsFilter.SetFlag(txIndex, txResult)
,在txsFilter中存储当前交易的验证结果,
if txRWSet != nil { ... }
,如果读写集不为空,则说明当前交易有效,则
committingTxHeight := version.NewHeight(...)
生成提交版本号(写值自身是不带版本号的,而写值的向state数据提交的版本号就是在这生成的),
addWriteSetToBatch(...)
将有效交易的写集和对应的提交版本号放入批量升级包updates -> for循环结束,
block.Metadata.Metadata[..._TRANSACTIONS_FILTER] = txsFilter
替换block元数据的交易有效性值。

第2步中的
txRWSet, txResult, err := v.validateEndorserTX(env)
处,就是一个分发任务依次验证的过程。自己
txRWSet.FromProtoBytes(respPayload.Results)
把需要验证的读写集抽取出来后,就把任务交给了
v.validateTx(txRWSet, updates)
validateTx
又分拆把验证读集的任务交给
validateReadSet
,把验证范围读集的任务交给
validateRangeQueries
。这些函数均是“人如其名”的存在。

验证读集

validateReadSet
验证读集的方法就是循环的调用
validateKVRead
,一一验证每一个读值。在
validateKVRead
中,(1)
if updates.Exists(ns, kvRead.Key)
即是查看升级包updates中是否有与读值相同的key,若存在,则直接判定交易无效。(2)
versionedValue, err := v.db.GetState(ns, kvRead.Key)
->
committedVersion = versionedValue.Version
,当下现从state数据库中查出读值key在state中的版本号committedVersion ->
if !version.AreSame(...)
,比较两个版本号是否一致,若不一致,则判定当前交易无效,直接返回false,否则返回true。

验证范围读集

validateRangeQueries
验证范围读集的方法就是循环调用
validateRangeQuery
,一一验证每个范围读值。范围读值的验证由于是一系列的值,这一系列的值既要与state中现存的版本号比较,也要与updates中已存在的key比较,还牵扯到可能范围读值中的一个key在updates和state中同时存在(这是这个key就存在两个版本号了)而选哪一个版本号来和范围读值中这个key的版本比较的问题,所以就稍显麻烦。为了描述简洁,下文值提及的key,既代表key本身,也代表key对应的读值。

这里涉及到validator/statebasedval/下的两个工具:

range_query_validator.go中的
rangeQueryValidator
,“人如其名”,范围查询验证器,有
init
接口用于初始化,
validate
接口用于验证。实现为两个版本,一个是用来验证原始范围读值的rangeQueryResultsValidator,一个是用来验证范围读值的归并哈希值的rangeQueryHashValidator,由于多数情况下范围读值中存储的是读值集合的归并哈希值,因此这里只讲后者。

combined_iterator.go中的
combinedIterator
,联合迭代器。这个联合迭代器是供rangeQueryValidator管理使用的。针对上述验证的麻烦之处,联合迭代器联合的就是以updates为数据源生成的迭代器A(参看statedb/statedb.go中的
nsIterator
)和以state数据库为数据源生成的迭代器B(参看statedb/stateleveldb/stateleveldb.go中的
kvScanner
)。当前从A中获取的值存储在updatesItem中,当前从B中获取的值存储在dbItem(类型为VersionedKV,包含key,值,版本号)。当执行联合迭代器的
Next()
获取一个查询的版本号时,联合迭代器会调用
compareKeys()
比较updatesItem和dbItem中的key:updatesItem的key更小,返回updatesItem,A前进一步;dbItem的key更小,返回dbItem,B前进一步;当两个key相等时返回updatesItem,A,B都前进一步。这里的前进一步,指的是迭代器执行一下
Next()
接口迭代到下一个值。

回到
validateRangeQuery
中。(1)
includeEndKey := !rangeQueryInfo.ItrExhausted
,记录下此范围读值是否包含endkey对应的读值。因为范围读值默认是不包含endkey对应的读值的,但是当这个范围读值没有被模拟器读尽时,比如key2-key10只读了前3个就返回了,此时rangeQueryInfo.ItrExhausted值为false时,则说明此时实际的endkey是key4而非key10,且此时范围读值应该包含key4的读值。相反,若ItrExhausted值为true时,则说明模拟器读尽了key2-key10间的8个key,由于范围读值默认的不包含endkey,所以此时范围读值中不包含key10对应的读值。(2)
combinedItr, err := newCombinedIterator(...)
,根据updates,startkey,endkey,includeEndKey,创建一个联合迭代器。(3)
if rangeQueryInfo.GetReadsMerkleHashes() != nil
成立(默认返回的范围读值中是归并后的哈希值) ->
validator = &rangeQueryHashValidator{}
->
validator.init(rangeQueryInfo, combinedItr)
,根据范围读值rangeQueryInfo和生成的联合迭代器combinedItr,同时内建了一个上文提及的RangeQueryResultsHelper工具,创建一个用于验证范围读值的归并哈希值的验证器。(4)
validator.validate()
,使用验证器验证范围读值。粗略的验证过程就是使用RangeQueryResultsHelper工具重新一步步构建出一个与rangeQueryInfo范围相同的默克尔树并不断进行归并哈希的操作,将得到的归并哈希值与rangeQueryInfo携带的哈希值进行比较。

举个例子,当前交易Tx_1验证的一个范围读值(startkey为2,endkey为6)中有Tx_1_key2,Tx_1_key4,Tx_1_key5共3个key的读值(归并形成的哈希值为Tx_1_hash2-6),A里有A_key2的一个准备更新的值,B中在key2-key6这个范围内中B_key2的值被稍早点儿的交易Tx_0已改动过(A_key2的版本一定比B_key2更高,因为A是updates中的,又由于B_key2是被Tx_0改动的,因此B_key2的版本号一定比Tx_1_key2的更高),B_key3为Tx_0新加入的,其余与Tx_1相同,因此有B_key2,B_key3,B_key4,B_key5共4个key。则两种类型的范围验证器验证过程如下(参看validator/statebasedval/range_query_validator.go中两类验证器的
validate()
):

rangeQueryResultsValidator的验证过程:(1)验证Tx_1_key2时,通过比较,发现A和B中都存在key2,将会把A_key2的值返回,A、B都向前前进一步并更新updatesItem(变为nil)和dbItem(变为B_key3对应的值)。因为B_key2肯定是之前的交易提交的旧数据,而A_key2则是当前准备向state提交的更新数据,所以这里返回updatesItem。当比较Tx_1_key2与A_key2时,会发现两者版本不同,此时将判定交易Tx_1无效,对Tx_1的验证也将结束。(2)假如能继续,下一个验证的是Tx_1_key4时,此时updatesItem为nil,dbItem为B_key3对应的VersionedKV,则返回的是dbItem,此时比较Tx_1_key4与B_key3时发现一个是key4,一个是key3,一个是key4,此时也将判定交易Tx_1无效。因为无论这个key3是从updates冒出来的还是从state数据库中冒出来的,都说明key2-key5这个范围内出现了新的数据,而Tx_1中的这个范围的读值将变得无效。因为既然这个范围读值被读出来,就有被当作一个整体使用的可能,比如key2-key5之间全部都是某一公司的财产,此时要读出该公司所有的财产总和并计算出其总和的20%让渡给B公司,模拟交易时得到的是有100万,而到验证前又有一笔10万的入账。若此时还依然以100万的20%进行让渡显然是不合适的。另外其实像突然冒出的key3这种情况,比较专业的说法是数据库方面的幻读(phantom read)现象,读者可以自行搜索,更全的例子,也可参看state_based_validator_test.go中的
TestPhantomValidation
函数。

rangeQueryHashValidator的验证过程:设Tx_1的范围读值的maxDegree为2,则形成的归并哈希后的MaxLevel为2(参看上文归并默克尔树归并哈希的操作)。inMerkle是例子中Tx_1的一个范围读值中的归并数据(包含Tx_1_hash2-6,maxDegree,MaxLevel等信息)。在
for { ... }
循环中,(1)
result, err = itr.Next()
,使用联合迭代器获取一个读值result,此时result是A_key2的值,之后依次会是B_key3,B_key4 ->
v.resultsHelper.AddResult(...)
,使用RangeQueryResultsHelper工具将result依次传化为适当形式后添加到默克尔树中 ->
merkle := v.resultsHelper.GetMerkleSummary()
,获取新建的默克尔树中的归并数据merkle ->
if merkle.MaxLevel < inMerkle.MaxLevel
,当默克尔树添加入A_key2,B_key3时,因为没有触发归并哈希值操作,所以merkle.MaxLevel依然是默认的1,因此会在这里直接continue。(2)当加入B_key4时,此时叶节点已经有3个值,大于maxDegree,则会触发归并哈希值的操作生成一个哈希值hash2-4,merkle.MaxLevel变为2,因此程序会继续向下执行 ->
if lastMatchedIndex == len(merkle.MaxLevelHashes)-1
是-1==0的比较,不成立 ->
lastMatchedIndex++
后变为0 ->
!bytes.Equal(...)
,此时merkle与inMerkle的MaxLevelHashes[0]出应该都是空的,因此相等,程序继续循环。(3)
result, err = itr.Next()
,此次获取的result是B_key5 -> 过程如上,添加B_key5并不会触发merkle新的归并哈希值操作,因此之后的几个if分支均不会进入而返回,程序会进入下一次循环 ->
result, err = itr.Next()
,此次获取的result为nil,因此进入
if result == nil {...}
分支 ->
_, merkle, err = v.resultsHelper.Done()
执行最后一次归并哈希值的操作生成归并数据,此归并数据其实是包含了之前加入的A_key2,B_key3,B_key4,B_key5四个值的归并哈希值,而Tx_1_hash2-6是Tx_1_key2,Tx_1_key4,Tx_1_key5三个值的归并哈希值,如此
equals := inMerkle.Equal(merkle)
比较的话,自然equals为false,因此
return equals, nil
,判定Tx_1交易无效。

回归到state_based_validator.go中的
ValidateAndPrepareBatch(...)
,当
validateEndorserTX
调用的
v.validateTx(txRWSet, updates)
返回为peer.TxValidationCode_VALID,则说明经过上述验证读集、验证范围读集的过程,证明当前交易有效且可以写入state数据库,因此会把交易的读写集以不为nil的形式返回,至此
ValidateAndPrepareBatch
的Validate部分的工作算是完成。接着,在
ValidateAndPrepareBatch(...)
中会进入
if txRWSet != nil {...}
分支,完成PrepareBatch部分的工作。最后,将准备的升级包updates返回。该updates返回后会被存储给
LockBasedTxMgr
的batch成员,供下一步
l.txtmgmt.Commit()
向state数据提交这些升级数据的时候使用(参看peer结点中使用VersionedDB的过程第8点)。在
l.txtmgmt.Commit()
中(txmgr/lockbasedtxmgr/lockbased_txmgr.go),
txmgr.db.ApplyUpdates(...)
即是state数据库使用升级包数据来真正升级状态的。这里传入了两个参数,一个是txmgr.batch,即升级数据包,另一个是以blockID+block最后一个交易的下标序号组成的版本号s_version。

state数据库

state数据库就是一个正常的leveldb数据库,实现为statedb/stateleveldb/stateleveldb.go中的
versionedDB
。提供基本的打开,关闭,查询,写入功能,此外还提供支持范围查询的leveldb数据库的迭代器,多值查询的额外功能。leveldb版本的state数据不支持富查询,即
ExecuteQuery(...)
接口直接返回错误。



GetState
GetStateMultipleKeys
两个接口分别提供单值查询,多值查询。
GetStateRangeScanIterator
接口提供范围查询的迭代器。这些较为简单,不再赘述。



state数据库为
LockBasedTxMgr
配合处理block,因此只支持批量升级数据,即
ApplyUpdates(batch,height)
接口的功能。batch即为上文验证过程中准备的有效交易的升级数据包,height则为一个版本号(即上文的s_version),该版本号不用于具体的某个状态,而是作为state数据库的一个叫做保存点的键值对的值(idStore,BlockStore中都有类似的保存点,也可以叫检查点)。具体的写入操作也是常规的leveldb数据库的批量写入操作,保存点会在每一批升级数据的最后加入,key为savePointKey,value为height。也就是说,当能从state中获取的保存点的height值,且height中的BlockNum为N,则说明当前state数据库已经完整保存了序号为N的block中的有效交易。

重启恢复

同idStore和BlockStore存储一样,VersionedDB同样存在宕机重启后残缺数据的清理和恢复问题。不同于idStore和BlockStore的在自身对象建立起来后即进行“自我修复”,VersionedDB需要在BlockStore自我修复完毕之后,使用恢复后的BlockStore提供的数据
kvLedger
手动的恢复。为何使用BlockStore提供的数据来恢复呢?这点需要参看账本处理block的前后顺序:参看core/ledger/kvledger/kv_ledger.go的
Commit(block)
,是先执行
l.blockStore.AddBlock(block)
把block提交至BlockStore中后,然后执行
l.txtmgmt.Commit()
将有效交易数据提交至state数据库,最后向HistoryDB账本提交(这个提交顺序很重要,对恢复各个账本时的逻辑有很大影响)。BlockStore亦用于自恢复能力,在
newKVLedger(...)
之时,传入的BlockStore对象其实是已经完成了自我修复。因此当宕机发生时,BlockStore中的数据可能比VersionedDB更新的内容。在
newKVLedger(...)
中,随后执行了
l.recoverDBs()
,来恢复VersionedDB和HistoryDB,HistoryDB在下文详述。

recoverDBs()
中,
lastAvailableBlockNum
是从BlockStore中获取的最新的有效的blockID,
recoverables
存放预计需要恢复的账本对象,
recoverers
存放真正需要恢复的账本对象:

info, _ := l.blockStore.GetBlockchainInfo()
lastAvailableBlockNum := info.Height - 1
,使用已恢复的BlockStore获取当前已经写入BlockStore的有效的blockID。

recoverables := []recoverable{...}
,可能需要恢复的有VersionedDB和HistoryDB两个数据库。

for _, recoverable := range recoverables {...}
,依次调用
recoverable.ShouldRecover(lastAvailableBlockNum)
,使用最新有效的lastAvailableBlockNum,检测账本是否需要恢复。这里只看VersionedDB(HistoryDB的检测方式与之相同)。在txmgr/lockbasedtxmgr/lockbased_txmgr.go的
ShouldRecover
中:
savepoint, err := txmgr.GetLastSavepoint()
获取了state数据库中的保存点 -> 然后将
savepoint.BlockNum
与lastAvailableBlockNum进行对比 -> 如果相等,则state数据库不需要恢复,如果不同(savepoint.BlockNum一定比lastAvailableBlockNum小),则说明state中未完整存储序号为lastAvailableBlockNum的block的所有有效交易,需要进行恢复 -> 当需要恢复时,将返回ture,
savepoint.BlockNum + 1
。true标识state需要恢复操作,
savepoint.BlockNum + 1
则标识需要从哪一块block进行恢复,即恢复起点
ShouldRecover
结束后,返回的数据付给了
recoverFlag
firstBlockNum
,如果
recoverFlag
为true,则会将VersionedDB账本放入
recoverers
。这里假设VersionedDB确实需要恢复。

接着,也就是for循环之后,有一系列的
if
分支。
if len(recoverers) == 0
,则说明没有确实需要恢复的账本,直接返回。
if len(recoverers) == 1
,则说明只有一个需要恢复的账本,此时只可能是HistoryDB,因为若VersionedDB需要恢复,则HistoryDB必也需要恢复,且VersionedDB的恢复起点一定 >= HistoryDB的恢复起点。因此这里只看两个账本都需要恢复的情况,设VersionedDB需要从10处开始恢复,HistoryDB需要从6处开始恢复,lastAvailableBlockNum值为15,则:(1)
if [0]... > [1]...
将成立,因此执行
[0], [1] = [1], [0]
置换,将lagger放入
recoverers
的0的位置,lagger的意思就是懒散的、落伍的人的意思,这里指恢复起点值更小的账本,且肯定是HistoryDB,而把恢复起点值更大的VersionedDB放到后边。(2)
if [0]... != [1]...
,置换后,0处的6肯定 != 1处的10,因此执行
l.recommitLostBlocks(...)
,单独向lagger提交6-9之间的block数据进行恢复。(3)
l.recommitLostBlocks([1].., lastAvailableBlockNum,[0],[1])
,接着,也是最后,向lagger和VersionedDB一同提交10-15之间的block数据进行恢复。当两个账本的恢复起点相等时,比如都为8,则(1)和(2)处的if分支都不会进入而直接进行至此步,一起向两个账本提交8-15之间的block数据进行恢复。这里需要说明一下当两个账本的恢复起点不一致时为什么要进行(1)和(2)的操作,又是换位置又是分段恢复的:因为block是存储在磁盘文件blockfile中的,因此换位置和分段恢复的操作,可以使得恢复过程中BlockStore读取blockfile中的block数据时顺序读取一次,这样效率更高。

最后单独看一下
recommitLostBlocks(...)
,传入的参数分别为恢复起点firstBlockNum、恢复终点lastBlockNum、需要恢复的账本recoverables。for循环中,
block, err = l.GetBlockByNumber(blockNumber)
使用BlockStore从blockfile中获取指定序列号的block,然后调用账本的
r.CommitLostBlock(block)
,向账本提交block,进行恢复。这里只看VersionedDB。在txmgr/lockbasedtxmgr/lockbased_txmgr.go的
CommitLostBlock(block)
中,和正常的向VersionedDB账本提交交易数据的过程如出一辙:
txmgr.ValidateAndPrepare(block, false)
准备升级包数据,注意这里第二个参数给的是false,即不再做mvcc验证,因为这里是恢复,所提交的block是从BlockStore中获取的,也肯定是之前已经验证过才会放入BlockStore的,所以这里不需要再重复进行验证 ->
txmgr.Commit()
将升级包数据真正提交到state数据库,完成VersionedDB的恢复工作。

HistoryDB

HistoryDB也是一个标准的以leveldb数据库为依托的账本,实现在core/ledger/kvledger/history/historydb下(下文以此目录为基准)。比较特殊的地方是这个账本只存储block中有效交易相关的key,而不存储value(由于leveldb存储的键值对不允许nil,因此这里实际上value是有值的,只不过所有的值均为
[]byte{}
)。另外提供一个历史查询器HistoryDBQueryExecutor在交易的时候使用。其余的操作,如开闭,读写操作均与正常的leveldb类型账本并无二致,不再赘述。

HistoryDB是可配置是否使能的,在core/ledger/ledgerconfig/ledger_config.go的
IsHistoryDBEnabled()
接口可以获取当前是否开启使用了HistoryDB。原始的配置则在core.yaml中的enableHistoryDatabase项,默认为true,即开启使用HistoryDB。

既然HistoryDB实际只存储key,那么也就是说,key自身既携带了索引信息又携带了我们需要的值信息,key其实就是HistoryDB所要存储等价于value的对象并供外界检索。这个key是一个组合key,对看historyleveldb/historyleveldb.go中的
Commit(block)
接口,当向HistoryDB提交一个block时,会筛选出block中的有效交易,并把这些交易的写集中的每个写值读取出来,以命名空间ns + compositeKeySep + 写值key + compositeKeySep + block序列号 + 交易ID的组合形式形成一个组合键
compositeHistoryKey
,然后
dbBatch.Put(compositeHistoryKey, emptyValue)
,将
compositeHistoryKey
与空值
emptyValue
作为一个键值对写入批量升级包
dbBatch
中。当block中所有有效交易均遍历完毕后,
dbBatch.Put(savePointKey, height.ToBytes())
以保存点封底,最后
historyDB.db.WriteBatch(dbBatch, false)
向HistoryDB提交数据。

组合键
compositeHistoryKey
,其中的compositeKeySep当作分隔符理解即可。前半部分命名空间ns + compositeKeySep + 写值key + compositeKeySep即为用于索引的信息,后半部分block序列号 + 交易ID即为值信息。

历史查询器

对HistoryDB的检索主要通过HistoryDB提供的一个HistoryDBQueryExecutor对象来实现。HistoryQueryExecutor在historyleveldb/historyleveldb_query_executer.go中实现,只提供
GetHistoryForKey(...)
一个接口,该接口根据提供的命名空间和写值key,返回一个迭代器
historyScanner
(内部封装了levedb数据库的Iterator)。是迭代器必定涉及起点和止点,
historyScanner
的起点是命名空间ns + compositeKeySep + 写值key + compositeKeySep的组合键,止点是命名空间ns + compositeKeySep + 写值key + compositeKeySep + 0xff的组合键。对比起点和止点,止点多了一个0xff,相当于一个字符的极限值,也因此这个范围查询的是所有以命名空间ns + compositeKeySep + 写值key + compositeKeySep为开头的key值。比如起点是
[]byte("A")
,止点是
append([]byte("A"),[]byte{0xff}...)
,则这个范围查询的是所有以字符A开头的key。又因为有HistoryDB存储的格式为前提,假设这里的ns为“chaincode_example02”,写值key为“A账户”,则这个起止范围相当于在查询所有chaincode_example02链码上改动过A账户的“ blockID+有效交易ID ”的信息。而有了blockID+有效交易ID这两个信息,自然可以通过BlockStore定位查询出原交易的所有信息,如改动时间,改动值,是否是删除操作等。

参看historyleveldb_query_executer.go中
historyScanner
Next()
接口:

if !scanner.dbItr.Next()
,因为HistoryDB不存储value,因此这里leveldb的迭代器dbItr的
Next()
操作不为获取value,而只是让迭代器向前走一步,同时作是否还有下一个值的判断。

historyKey := scanner.dbItr.Key()
,获取leveldb迭代器当前值的key,这也是我们实质想获取的内容,这个key就是由命名空间ns + compositeKeySep + 写值key + compositeKeySep + block序列号 + 交易ID组成的组合键。

blockNum, bytesConsumed := util...
tranNum, _ := util...
,从组合键中分别分解出其中携带的blockID和交易ID。

tranEnvelope, err := scanner.blockStore...
,使用BlockStore,根据blockID和交易ID,获取原始的交易信息。

queryResult, err := getKeyModificationFromTran(...)
,根据获取的原始交易信息,进行整理,返回当次
Next()
的单个查询结果,该结果里面包含改动时间,改动值,是否是删除操作等信息。

使用

HistoryDB账本被
kvLedger
使用过程和VersionedDB在过程上就是一先一后的区别,不用过多赘述。被chaincode使用主要是通过出现在交易中的HistoryDBQueryExecutor进行查询,而查询的过程与VersionedDB的交易模拟器的范围查询过程颇为类似(至于HistoryDBQueryExecutor为何会出现在交易中,请参看core/endorser/endorser.go的
ProcessProposal(...)
中的
ctx = context.WithValue(...)
处),以map.go为例,:

peer chaincode invoke...
,在peer结点执行map的Invoke命令,中间过程省略,直接定位到下一步。

在map.go的
Invoke(stub)
case "history":
分支中,
keysIter, err := stub.GetHistoryForKey(key)
,调用ChaincodeStubInterface的此接口,最终获取一个迭代器,该迭代器以HistoryDBQueryExecutor获取的范围数据为数据源,返回查询结果。

第1步调用后,会在ShimHandler和ServerHandler间辗转,然后在core/chaincode/handler.go中调用
handleGetHistoryForKey(...)
,在这个函数中,
historyIter, err := txContext.historyQueryExecutor.GetHistoryForKey(chaincodeID, getHistoryForKey.Key)
即是使用HistoryDBQueryExecutor获取一个
historyScanner
,然后在
getQueryResponse(...)
中依次调用
historyScanner
Next()
获取查询结果(即上文第5步查询到的内容)。查询结果最终放入了
ChaincodeMessage
Payload
返回。

携带查询结果的
ChaincodeMessage
返回至core/chaincode/shim/chaincode.go中的
GetHistoryForKey(...)
中,也即第1步所调用处,然后把结果集放在了
CommonIterator
迭代器中,上面再套上
HistoryQueryIterator
返回给chaincode,亦也就是第1步获取的keysIter。

获取keysIter后,就可以在
for keysIter.HasNext() { response, iterErr := keysIter.Next()... }
进行使用,以完成chaincode在该功能上自身想完成的任务。

因此,从使用上讲,HistoryDB算是一个辅助性的数据库,辅助其他数据库,辅助交易的进行。另外,拓展一下的话,HistoryDB的组合键的组合形式可以针对自身业务的需求进行设计,这样也可实现类似couchDB数据库那样的富查询。

在此总结一下吧。账本本身算是一个存储系统,或基于文件,或基于数据库,或两者相互配合,最终是用于存储交易的数据的,相当于fabric的“硬盘”。关键在于理解每个账本存储了什么,如何存储的,如何恢复的,向外界提供哪些功能。若读者读至此处,建议在回头看一遍《fabric源码解析24》的开篇之处。文中若有纰漏之处,还请留言指正。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息