您的位置:首页 > 编程语言 > Python开发

python ethereum 代码分析《2》

2017-06-22 11:22 127 查看

python ethereum 代码分析 《2》

python 版本以太坊

pyethapp 模块

本章主要介绍pyethapp 模块中的ChainService 和 PoWService

一些关键概念

totalDifficulty 总难度:

total difficulty总难度是当前某条链所有区块难度的总和,total difficulty被用来指示最长的那一条链,某个节点如果想从其他节点同步数据的话,他会选择total difficulty最大的那一条链来同步数据。值得注意的是uncle block的difficulty也被计算到totalDifficulty中去,以太坊白皮书中的幽灵协议阐述了为什么这么设计

uncle block 叔叔区块:也是矿工挖出来的区块,它也是合法的,但是发现的稍晚,或者是网络传输稍慢,而没有能成为最长的链上的区块

以太坊十几秒的出块间隔,大大增加了孤块的产生,并且降低了安全性。通过鼓励引用叔块,使引用主链获得更多的安全保证(因为孤块本身也是合法的)

区块可以不引用,或者最多引用两个叔块

叔块必须是区块的前2层~前7层的祖先的直接的子块

被引用过的叔块不能重复引用

引用叔块的区块,可以获得挖矿报酬的1/32,也就是5*1/32=0.15625 Ether。最多获得2*0.15625=0.3125 Ether

参考资料http://blog.csdn.net/superswords/article/details/76445278

https://zhuanlan.zhihu.com/p/28928827

head_candidate 候选头区块: 矿工本地的候选头区块,相当于一个临时区块,head_candidate一直存在并且矿工会一直更新。矿工将交易打包进这个区块,计算出随机数后将次区块作为新区块广播出去。

contract :合约,本身也是一个账户。每当一个交易指向一个合约账户时,该交易transaction的data属性作为该合约的输入执行该合约。

以太坊基本的一些概念:http://ethdocs.org/en/latest/contracts-and-transactions/index.html

ChainService 和 PowService的关系

chain service负责区块链的同步和更新,处理连接的各个节点的eth_protocol数据包,以及交易和区块的广播; pow service是矿工挖矿的服务,计算出’幸运值’后告知chain service,chain service 将区块写入区块链并广播出去。

矿工将交易打包,更新head_candidate

@property
def head_candidate(self):
if self._head_candidate_needs_updating:
self._head_candidate_needs_updating = False
# Make a copy of self.transaction_queue because
# make_head_candidate modifies it.
txqueue = copy.deepcopy(self.transaction_queue)
#将交易打包,引用uncle区块,执行交易中的合约,更新区块状态
self._head_candidate, self._head_candidate_state = make_head_candidate(
self.chain, txqueue, timestamp=int(time.time()))
return self._head_candidate


chain 实例的_on_new_head回调

def _on_new_head(self, block):
log.debug('new head cbs', num=len(self.on_new_head_cbs))
self.transaction_queue = self.transaction_queue.diff(
block.transactions)
self._head_candidate_needs_updating = True
#cb是pow service的回调函数mine_head_candidate,更新head_candidate并开始挖矿
for cb in self.on_new_head_cbs:
cb(block)


powservice的回调

def mine_head_candidate(self, _=None):
#打包当前交易队里中的交易并更新head_candidate
hc = self.chain.head_candidate
if not self.active or self.chain.is_syncing:
return
elif (hc.transaction_count == 0 and
not self.app.config['pow']['mine_empty_blocks']):
return

log.debug('mining', difficulty=hc.difficulty)
#开始挖矿
self.ppipe.put(('mine', dict(mining_hash=hc.mining_hash,
block_number=hc.number,
difficulty=hc.difficulty)))


计算出幸运值后,调用chain.add_mined_block写入区块链并广播

#成功找到幸运值
def recv_found_nonce(self, bin_nonce, mixhash, mining_hash):
log.info('nonce found', mining_hash=mining_hash.encode('hex'))
#再次打包交易,更新head_candidate
block = self.chain.head_candidate
if block.mining_hash != mining_hash:
log.warn('mining_hash does not match')
return False
block.header.mixhash = mixhash
block.header.nonce = bin_nonce
#添加新块并广播
if self.chain.add_mined_block(block):
log.debug('mined block %d (%s) added to chain' % (
block.number, encode_hex(block.hash[:8])))
return True
else:
log.debug('failed to add mined block %d (%s) to chain' % (
block.number, encode_hex(block.hash[:8])))
return False


def add_mined_block(self, block):
log.debug('adding mined block', block=block)
assert isinstance(block, Block)
#添加新块
if self.chain.add_block(block):
log.debug('added', block=block, ts=time.time())
assert block == self.chain.head
self.transaction_queue = self.transaction_queue.diff(block.transactions)
self._head_candidate_needs_updating = True
#广播新块
self.broadcast_newblock(block, chain_difficulty=self.chain.get_score(block))
return True
log.debug('failed to add', block=block, ts=time.time())
return False


在写入新块完成后,调用new_head_cb回调,再次开始挖矿

至此,整个循环的过程就是矿工不断打包交易挖矿到广播新块的过程

ChainService

service start 服务启动

服务启动时将eth_protocol(以太坊协议)中command对应的回调函数添加到eth_protocol实例中

先看看eth_protocol协议

以太坊协议定义eth_protocol.py

每个peer对象都有一个eth_protocol实例。在p2p协议中,当节点收到hello数据包的时候,便初始化所有协议实例,其中包括eth_protocol

peer对象receive hello

初始化eth_protocol

def connect_service(self, service):
assert isinstance(service, WiredService)
protocol_class = service.wire_protocol
assert issubclass(protocol_class, BaseProtocol)
# create protcol instance which connects peer with serivce
protocol = protocol_class(self, service)
# register protocol
assert protocol_class not in self.protocols
log.debug('registering protocol', protocol=protocol.name, peer=self)
self.protocols[protocol_class] = protocol
self.mux.add_protocol(protocol.protocol_id)
protocol.start()#调用chain_service.on_wire_protocol_start(self)


chain service为该实例添加回调函数,并向对方节点发送status数据包

def on_wire_protocol_start(self, proto):
log.debug('----------------------------------')
log.debug('on_wire_protocol_start', proto=proto)
assert isinstance(proto, self.wire_protocol)
# register callbacks
proto.receive_status_callbacks.append(self.on_receive_status)#处理status数据包
proto.receive_newblockhashes_callbacks.append(self.on_newblockhashes)
proto.receive_transactions_callbacks.append(self.on_receive_transactions)
proto.receive_getblockheaders_callbacks.append(self.on_receive_getblockheaders)
proto.receive_blockheaders_callbacks.append(self.on_receive_blockheaders)
proto.receive_getblockbodies_callbacks.append(self.on_receive_getblockbodies)
proto.receive_blockbodies_callbacks.append(self.on_receive_blockbodies)
proto.receive_newblock_callbacks.append(self.on_receive_newblock)

# send status  一旦连接就向对方发送自己的区块链状态status
head = self.chain.head
proto.send_status(chain_difficulty=self.chain.get_score(head), chain_head_hash=head.hash,
genesis_hash=self.chain.genesis.hash)


eth_protocol协议中包括

Status 与新节点建立连接后,互相发送自己的区块链状态

NewBlockHashes 向网络中广播一批新区块的hash

Transactions 包含一批交易的数据包

GetBlockHashes 从指定hash开始,请求一批BlockHashes

BlockHashes 返回GetBlockHashes请求

GetBlocks 从指定hash开始,请求一批Block

Blocks 返回GetBlocks请求

NewBlock 矿工挖矿后广播新区块,节点接到该区块后验证后添加到本地

1.收到status:

某个已经完成握手的peer节点发送他当前的区块链网络状态ethereum state,status数据包是节点之间建立连接后收到的第一个数据包。

通过status数据包来获取网络中最新的区块并更新本地的区块链

class status(BaseProtocol.command):

"""
protocolVersion: The version of the Ethereum protocol this peer implements. 30 at present.
networkID: The network version of Ethereum for this peer. 0 for the official testnet.
totalDifficulty: Total Difficulty of the best chain. Integer, as found in block header.
latestHash: The hash of the block with the highest validated total difficulty.
GenesisHash: The hash of the Genesis block.
"""
cmd_id = 0
sent = False

structure = [
('eth_version', rlp.sedes.big_endian_int),
('network_id', rlp.sedes.big_endian_int),
('chain_difficulty', rlp.sedes.big_endian_int),#totalDifficulty,该链的总难度
('chain_head_hash', rlp.sedes.binary),#latestHash,头区块hash
('genesis_hash', rlp.sedes.binary)]#初始区块hash

def create(self, proto, chain_difficulty, chain_head_hash, genesis_hash):
self.sent = True
network_id = proto.service.app.config['eth'].get('network_id', proto.network_id)
return [proto.version, network_id, chain_difficulty, chain_head_hash, genesis_hash]


receive packet收到数据包

receive status解析数据包定位到status处理函数

之前已注册回调eth_protocol实例初始化时已注册

on receive status receive status处理函数

这里注意一点,这里看的是DAO事件之前的版本,不是分叉过后的版本(还是先按正常逻辑来。。)

def on_receive_status(self, proto, eth_version, network_id, chain_difficulty, chain_head_hash,
genesis_hash):
log.debug('----------------------------------')
log.debug('status received', proto=proto, eth_version=eth_version)
assert eth_version == proto.version, (eth_version, proto.version)
#必须是同一个networkid
if network_id != self.config['eth'].get('network_id', proto.network_id):
log.warn("invalid network id", remote_network_id=network_id,
expected_network_id=self.config['eth'].get('network_id', proto.network_id))
raise eth_protocol.ETHProtocolError('wrong network_id')

# check genesis
#初始区块hash必须一致
if genesis_hash != self.chain.genesis.hash:
log.warn("invalid genesis hash", remote_id=proto, genesis=genesis_hash.encode('hex'))
raise eth_protocol.ETHProtocolError('wrong genesis block')

# request chain
#调用同步器同步数据
self.synchronizer.receive_status(proto, chain_head_hash, chain_difficulty)

# send transactions
#获取网络中已知的但尚未被计入区块的交易transactions
transactions = self.chain.get_transactions()
if transactions:
log.debug("sending transactions", remote_id=proto)
#将这些交易告知对方
proto.send_transactions(*transactions)


synchronizer同步器同步数据receive_status

def receive_status(self, proto, blockhash, chain_difficulty):
"called if a new peer is connected"
log.debug('status received', proto=proto, chain_difficulty=chain_difficulty)

# memorize proto with difficulty
#将改节点区块链总难度记下
self._protocols[proto] = chain_difficulty

#对方头区块hash已经在本地存在,则忽略
if self.chainservice.knows_block(blockhash) or self.synctask:
log.debug('existing task or known hash, discarding')
return

if self.force_sync:
blockhash, chain_difficulty = self.force_sync
log.debug('starting forced syctask', blockhash=blockhash.encode('hex'))
self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)

#如果这条链总难度比自己的大,则认为这条链状态更新一点,并从他给出的头区块hash开始同步区块,同步的时候从已连接节点中总难度chain_difficulty最大的那个开始同步(但是这并不能保证同步的链就一定是主链,后面会分析如果同步的链不是主链的情况)
elif chain_difficulty > self.chain.head.chain_difficulty():
log.debug('sufficient difficulty')
self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)


fetch_hashchain 向网络请求以指定hash作为头区块的一批区块hash

def fetch_hashchain(self):
log_st.debug('fetching hashchain')
blockhashes_chain = [self.blockhash]  # youngest to oldest
# For testing purposes: skip the hash downoading stage
# import ast
# blockhashes_chain = ast.literal_eval(open('/home/vub/blockhashes.pyast').read())[:299000]

blockhash = self.blockhash = blockhashes_chain[-1]
assert blockhash not in self.chain

# get block hashes until we found a known one
max_blockhashes_per_request = self.initial_blockhashes_per_request
while blockhash not in self.chain:
# proto with highest_difficulty should be the proto we got the newblock from
blockhashes_batch = []

# try with protos
protocols = self.protocols
if not protocols:
log_st.warn('no protocols available')
return self.exit(success=False)
#这里的protocols是各个已连接节点peer对象的eth_protocol实例,按链总难度大小排序,难度最大在前面
for proto in protocols:
log.debug('syncing with', proto=proto)
if proto.is_stopped:
continue

# request
assert proto not in self.requests
deferred = AsyncResult()
self.requests[proto] = deferred
#从指定hash开始,请求一批BlockHashes
proto.send_getblockhashes(blockhash, max_blockhashes_per_request)
try:
#获取到这批BlockHashes
blockhashes_batch = deferred.get(block=True,
timeout=self.blockhashes_request_timeout)
except gevent.Timeout:
log_st.warn('syncing hashchain timed out')
continue
finally:
# is also executed 'on the way out' when any other clause of the try statement
# is left via a break, continue or return statement.
del self.requests[proto]

if not blockhashes_batch:
log_st.warn('empty getblockhashes result')
continue
if not all(isinstance(bh, bytes) for bh in blockhashes_batch):
log_st.warn('got wrong data type', expected='bytes',
received=type(blockhashes_batch[0]))
continue
break

if not blockhashes_batch:
log_st.warn('syncing failed with all peers', num_protos=len(protocols))
return self.exit(success=False)

#在获取到的这批blockhashes中直到找到一个blockhash是自己数据库中有的,注意这里只要找到一个自己有的blockhash就可以了,而且这个blockhash不一定是自己本地chain的头区块,因为本地的头区块有可能不是在主链上的区块,上面已经提到过这点。节点总是会去同步最长的那条链,而且可能是从自己本地chain的头区块的父辈区块的某个区块开始同步,这样就提供了一定的纠错机制,让节点可以纠正到主链上去。
for blockhash in blockhashes_batch:  # youngest to oldest
assert utils.is_string(blockhash)
if blockhash not in self.chain:
blockhashes_chain.append(blockhash)
else:
log_st.debug('found known blockhash', blockhash=utils.encode_hex(blockhash),
is_genesis=bool(blockhash == self.chain.genesis.hash))
break
log_st.debug('downloaded ' + str(len(blockhashes_chain)) + ' block hashes, ending with %s' % utils.encode_hex(blockhashes_chain[-1]))
max_blockhashes_per_request = self.max_blockhashes_per_request
#取到最长链的这批blockhash后,开始同步区块
self.fetch_blocks(blockhashes_chain)


fetch blocks 向网络请求同步这批区块

def fetch_blocks(self, blockhashes_chain):
# fetch blocks (no parallelism here)
log_st.debug('fetching blocks', num=len(blockhashes_chain))
assert blockhashes_chain
blockhashes_chain.reverse()  # oldest to youngest
num_blocks = len(blockhashes_chain)
num_fetched = 0

while blockhashes_chain:
blockhashes_batch = blockhashes_chain[:self.max_blocks_per_request]
t_blocks = []

# try with protos
protocols = self.protocols
if not protocols:
log_st.warn('no protocols available')
return self.exit(success=False)
#向每个节点请求
for proto in protocols:
if proto.is_stopped:
continue
assert proto not in self.requests
# request
log_st.debug('requesting blocks', num=len(blockhashes_batch))
deferred = AsyncResult()
self.requests[proto] = deferred
#向网络请求这批hash值对应的区块
proto.send_getblocks(*blockhashes_batch)
try:
t_blocks = deferred.get(block=True, timeout=self.blocks_request_timeout)
except gevent.Timeout:
log_st.warn('getblocks timed out, trying next proto')
continue
finally:
del self.requests[proto]
if not t_blocks:
log_st.warn('empty getblocks reply, trying next proto')
continue
elif not isinstance(t_blocks[0], TransientBlock):
log_st.warn('received unexpected data', data=repr(t_blocks))
t_blocks = []
continue
# we have results
if not [b.header.hash for b in t_blocks] == blockhashes_batch[:len(t_blocks)]:
log_st.warn('received wrong blocks, should ban peer')
t_blocks = []
continue
break

# add received t_blocks
num_fetched += len(t_blocks)
log_st.debug('received blocks', num=len(t_blocks), num_fetched=num_fetched,
total=num_blocks, missing=num_blocks - num_fetched)

if not t_blocks:
log_st.warn('failed to fetch blocks', missing=len(blockhashes_chain))
return self.exit(success=False)

ts = time.time()
log_st.debug('adding blocks', qsize=self.chainservice.block_queue.qsize())
for t_block in t_blocks:
b = blockhashes_chain.pop(0)
assert t_block.header.hash == b
assert t_block.header.hash not in blockhashes_chain
#将获取的block添加到队列,最后添加到本地数据库
self.chainservice.add_block(t_block, proto)  # this blocks if the queue is full
log_st.debug('adding blocks done', took=time.time() - ts)

# done
last_block = t_block
assert not len(blockhashes_chain)
assert last_block.header.hash == self.blockhash
log_st.debug('syncing finished')
# at this point blocks are not in the chain yet, but in the add_block queue
if self.chain_difficulty >= self.chain.head.chain_difficulty():
#广播这批区块最新的那个出去。
self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto)

self.exit(success=True)


至此,on receive status 函数处理完成,已同步已连接节点的最新区块

2.transactions

on_receive_transactions 节点收到transactions数据包后

矿工收到transactions数据包后,校验每笔交易的合法性,将交易打包进head candidate临时区块并广播交易

add_transaction

def add_transaction(self, tx, origin=None):
if self.is_syncing:
#如果正在同步中的话,本地链的状态是过时的
return  # we can not evaluate the tx based on outdated state
log.debug('add_transaction', locked=self.add_transaction_lock.locked(), tx=tx)
assert isinstance(tx, Transaction)
assert origin is None or isinstance(origin, BaseProtocol)

if tx.hash in self.broadcast_filter:
log.debug('discarding known tx')  # discard early
return

# validate transaction
#交易合法性校验
try:
#tx签名要正确;交易数nonce要与本地账户nonce一致;gas要足够;本地账户余额足够
validate_transaction(self.chain.head_candidate, tx)
log.debug('valid tx, broadcasting')
self.broadcast_transaction(tx, origin=origin)  # asap
except InvalidTransaction as e:
log.debug('invalid tx', error=e)
return

if origin is not None:  # not locally added via jsonrpc
if not self.is_mining or self.is_syncing:
log.debug('discarding tx', syncing=self.is_syncing, mining=self.is_mining)
return

self.add_transaction_lock.acquire()
#向head_candidate临时区块写入交易
success = self.chain.add_transaction(tx)
self.add_transaction_lock.release()
if success:
self._on_new_head_candidate()


3.GetBlockHashes BlockHashes

在向网络同步区块链的时候被调用fetch_hashchain中调用send_getblockhashes来发送GetBlockHashes数据包

其他节点接受到GetBlockHashes数据包后,返回BlockHashes数据包,及指定blockash开始的一批blockhash。

4.GetBlocks Blocks

在向网络同步区块链的时候被调用fetch_blocks中调用send_getblocks来发送GetBlocks数据包

其他节点接受到GetBlocks数据包后,返回Blocks数据包。

5.NewBlock

矿工挖到新块后广播该区块

节点接收到广播的新区块

def receive_newblock(self, proto, t_block, chain_difficulty):
"called if there's a newblock announced on the network"
log.debug('newblock', proto=proto, block=t_block, chain_difficulty=chain_difficulty,
client=proto.peer.remote_client_version)

if t_block.header.hash in self.chain:
assert chain_difficulty == self.chain.get(t_block.header.hash).chain_difficulty()

# memorize proto with difficulty
#记住该节点区块链的总难度
self._protocols[proto] = chain_difficulty
#如果该区块存在则忽略
if self.chainservice.knows_block(block_hash=t_block.header.hash):
log.debug('known block')
return

# check pow
if not t_block.header.check_pow():
log.warn('check pow failed, should ban!')
return
#预计的总难度
expected_difficulty = self.chain.head.chain_difficulty() + t_block.header.difficulty
#总难度至少比本地链的总难度大
if chain_difficulty >= self.chain.head.chain_difficulty():
# broadcast duplicates filtering is done in eth_service
log.debug('sufficient difficulty, broadcasting',
client=proto.peer.remote_client_version)
self.chainservice.broadcast_newblock(t_block, chain_difficulty, origin=proto)
else:
# any criteria for which blocks/chains not to add?
age = self.chain.head.number - t_block.header.number
log.debug('low difficulty', client=proto.peer.remote_client_version,
chain_difficulty=chain_difficulty, expected_difficulty=expected_difficulty,
block_age=age)
if age > self.MAX_NEWBLOCK_AGE:
log.debug('newblock is too old, not adding', block_age=age,
max_age=self.MAX_NEWBLOCK_AGE)
return

# unknown and pow check and highest difficulty

# check if we have parent
#如果有祖先,直接添加
if self.chainservice.knows_block(block_hash=t_block.header.prevhash):
log.debug('adding block')
self.chainservice.add_block(t_block, proto)
#没有祖先说明还差一个以上区块,向网络同步区块
else:
log.debug('missing parent')
if not self.synctask:
self.synctask = SyncTask(self, proto, t_block.header.hash, chain_difficulty)
else:
log.debug('existing task, discarding')


以上是以太坊节点与节点交互的协议部分
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息