您的位置:首页 > 大数据 > 人工智能

bigchaindb源码分析(九)——选举

2017-08-02 16:44 549 查看
本节我们来分析election进程,election进程也是一个pipeline

# bigchaindb/pipelines/election.py
def start(events_queue=None):
pipeline = create_pipeline(events_queue=events_queue)
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline

def create_pipeline(events_queue=None):
election = Election(events_queue=events_queue)

election_pipeline = Pipeline([
Node(election.check_for_quorum),
Node(election.requeue_transactions)
])

return election_pipeline


我们还是首先来分析indata,然后再对pipeline的Node一个个分析

indata

get_changefeed
的返回值将作为indata。election进程的
get_changefeed
与我们在源码分析(六)中分析的block进程一致,同样是调用了
backend.get_changefeed
函数。只不过相对于block进程而言,election进程针对的是
votes
表,而针对的操作也仅仅只有insert操作

根据之前对block进程的indata的分析,votes表出现新的数据的插入,即会写入election进程的indata的输出队列

# bigchaindb/pipelines/election.py
def get_changefeed():
connection = backend.connect(**bigchaindb.config['database'])
return backend.get_changefeed(connection, 'votes', ChangeFeed.INSERT)


pipeline

election进程pipeline的Nodes有两个:
check_for_quorum->requeue_transactions
。在Election实例化的时候还定义了一个EventHandler。EventHandler实际上就是一个队列,对event的put、get操作实际上也就是从队列中写入与读取数据

# bigchaindb/pipelines/election.py
def __init__(self, events_queue=None):
self.bigchain = Bigchain()
self.event_handler = None
if events_queue:
self.event_handler = EventHandler(events_queue)

# bigchaindb/events.py
class EventHandler:

def __init__(self, events_queue):
self.events_queue = events_queue

def put_event(self, event, timeout=None):
self.events_queue.put(event, timeout=None)

def get_event(self, timeout=None):
return self.events_queue.get(timeout=None)


同时,还有一个通过区块来构建Event的函数。Event类仅仅包含两个成员属性:1)type:根据
handle_block_events
函数,event的type为区块的投票结果(invalid、valid);2)data:区块记录。
handle_block_events
根据区块的投票结果来构建Event

# bigchaindb/pipelines/election.py
def handle_block_events(self, result, block_id):
if self.event_handler:
if result['status'] == self.bigchain.BLOCK_UNDECIDED:
return
elif result['status'] == self.bigchain.BLOCK_INVALID:
event_type = EventTypes.BLOCK_INVALID
elif result['status'] == self.bigchain.BLOCK_VALID:
event_type = EventTypes.BLOCK_VALID

event = Event(event_type, self.bigchain.get_block(block_id))
self.event_handler.put_event(event)


下面我们来对pipeline的Node一个个分析

check_for_quorum

Node
check_for_quorum
首先从投票记录中找到投票的区块的id,然后根据区块的id来获取区块记录。在获取区块记录时,也不仅仅是从
bigchain
表中查询记录,而是在查询后,还把区块的所有事务的asset项进行了更新,更新为assets表中的对应记录,原因是有些事务,如TRANSFER事务的asset项只包含有asset的id

def check_for_quorum(self, next_vote):
try:
block_id = next_vote['vote']['voting_for_block']
node = next_vote['node_pubkey']
except KeyError:
return

next_block = self.bigchain.get_block(block_id)

result = self.bigchain.block_election(next_block)
...


在获取到区块记录之后,将调用
block_election
来进行选举。
block_election
将会首先从votes表中查找到对该区块进行投票的所有投票记录,然后调用一致性模块
consensus
中的函数来进行选举

# bigchaindb/core.py
def block_election(self, block):
if type(block) != dict:
block = block.to_dict()
votes = list(backend.query.get_votes_by_block_id(self.connection,
block['id']))
return self.consensus.voting.block_election(block, votes,
self.federation)


在选举时,首先根据区块记录获取到区块应该有的投票者,然后根据节点配置文件中设置的联盟中所有节点,做交集得出符合条件的投票者

# bigchaindb/voting.py
def block_election(cls, block, votes, keyring):
eligible_voters = set(block['block']['voters']) & set(keyring)
n_voters = len(eligible_voters)
eligible_votes, ineligible_votes = \
cls.partition_eligible_votes(votes, eligible_voters)
by_voter = cls.dedupe_by_voter(eligible_votes)
results = cls.count_votes(by_voter)
results['block_id'] = block['id']
results['status'] = cls.decide_votes(n_voters, **results['counts'])
results['ineligible'] = ineligible_votes
return results


函数
partition_eligible_votes
在之前已经介绍过,其作用为对于从votes中查询得到的对本区块的投票记录,判断记录的投票者是否符合条件,判断投票记录的签名是否正确,验证成功的投票记录保存在
eligible_votes
中,验证失败的保存在
ineligible_votes


dedupe_by_voter
用来判断所有成功的投票记录中,是否有某个节点投票了不止一次,若出现这种情况则抛出异常

count_votes
对投票记录进行统计。这个统计还包含了认为该区块为valid的节点所投票的上一个节点的一致性,来看
count_votes
的代码。该代码是维护了一个计数器,对所有对本区块投票为valid的节点,用计数器记录了这些节点投票过的上一区块,然后利用Collections模块的
most_common
函数返回计数最多的上一个区块,将这个区块作为本区块的前一个区块。同时将其他投票为valid的节点所投票的上一节点记录在
other_previous_block


# bigchaindb/voting.py
def count_votes(cls, by_voter):
prev_blocks = collections.Counter()
malformed = []

for vote in by_voter.values():
if not cls.verify_vote_schema(vote):
malformed.append(vote)
continue

if vote['vote']['is_block_valid'] is True:
prev_blocks[vote['vote']['previous_block']] += 1

n_valid = 0
prev_block = None
# Valid votes must agree on previous block
if prev_blocks:
prev_block, n_valid = prev_blocks.most_common()[0]
del prev_blocks[prev_block]

return {
'counts': {
'n_valid': n_valid,
'n_invalid': len(by_voter) - n_valid,
},
'malformed': malformed,
'previous_block': prev_block,
'other_previous_block': dict(prev_blocks),
}


对投票进行统计之后,就可以决定投票结果了,这段逻辑位于
decide_votes
函数中,其三个参数分别代表:投票者数目、投票为valid的数目、投票为invalid的数目。而确定区块是否为valid的方案则是看哪种投票结果超过半数

# bigchaindb/voting.py
def decide_votes(cls, n_voters, n_valid, n_invalid):
if n_invalid * 2 >= n_voters:
return INVALID
if n_valid * 2 > n_voters:
return VALID
return UNDECIDED


至此,投票选举完毕,Node
check_for_quorum
再将区块重构成Event,当区块最后被确定为invalid时,该区块还会被传输到pipeline的下一个Node中,否则已经处理完毕

def check_for_quorum(self, next_vote):
...
result = self.bigchain.block_election(next_block)

self.handle_block_events(result, block_id)
if result['status'] == self.bigchain.BLOCK_INVALID:
return Block.from_dict(next_block)

# Log the result
if result['status'] != self.bigchain.BLOCK_UNDECIDED:
msg = 'node:%s block:%s status:%s' % \
(node, block_id, result['status'])
...


requeue_transactions

Node
requeue_transactions
实际上只处理了投票为invalid的区块,目的在于给invalid区块的事务再依次被投票的机会,做法则是将该区块中的所有事务重新写入到
backlog
表中,此后block进程的pipeline又将被触发来处理这些事务

def requeue_transactions(self, invalid_block):
"""
Liquidates transactions from invalid blocks so they can be processed again
"""
logger.info('Rewriting %s transactions from invalid block %s',
len(invalid_block.transactions),
invalid_block.id)
for tx in invalid_block.transactions:
self.bigchain.write_transaction(tx)
return invalid_block
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: