python ethereum 代码分析《2》
2017-06-22 11:22
127 查看
python ethereum 代码分析 《2》
python 版本以太坊
pyethapp 模块
本章主要介绍pyethapp 模块中的ChainService 和 PoWService一些关键概念
totalDifficulty 总难度:total difficulty总难度是当前某条链所有区块难度的总和,total difficulty被用来指示最长的那一条链,某个节点如果想从其他节点同步数据的话,他会选择total difficulty最大的那一条链来同步数据。值得注意的是uncle block的difficulty也被计算到totalDifficulty中去,以太坊白皮书中的幽灵协议阐述了为什么这么设计
uncle block 叔叔区块:也是矿工挖出来的区块,它也是合法的,但是发现的稍晚,或者是网络传输稍慢,而没有能成为最长的链上的区块
以太坊十几秒的出块间隔,大大增加了孤块的产生,并且降低了安全性。通过鼓励引用叔块,使引用主链获得更多的安全保证(因为孤块本身也是合法的)
区块可以不引用,或者最多引用两个叔块
叔块必须是区块的前2层~前7层的祖先的直接的子块
被引用过的叔块不能重复引用
引用叔块的区块,可以获得挖矿报酬的1/32,也就是5*1/32=0.15625 Ether。最多获得2*0.15625=0.3125 Ether
参考资料http://blog.csdn.net/superswords/article/details/76445278
https://zhuanlan.zhihu.com/p/28928827
head_candidate 候选头区块: 矿工本地的候选头区块,相当于一个临时区块,head_candidate一直存在并且矿工会一直更新。矿工将交易打包进这个区块,计算出随机数后将次区块作为新区块广播出去。
contract :合约,本身也是一个账户。每当一个交易指向一个合约账户时,该交易transaction的data属性作为该合约的输入执行该合约。
以太坊基本的一些概念:http://ethdocs.org/en/latest/contracts-and-transactions/index.html
ChainService 和 PowService的关系
chain service负责区块链的同步和更新,处理连接的各个节点的eth_protocol数据包,以及交易和区块的广播; pow service是矿工挖矿的服务,计算出’幸运值’后告知chain service,chain service 将区块写入区块链并广播出去。矿工将交易打包,更新head_candidate
@property def head_candidate(self): if self._head_candidate_needs_updating: self._head_candidate_needs_updating = False # Make a copy of self.transaction_queue because # make_head_candidate modifies it. txqueue = copy.deepcopy(self.transaction_queue) #将交易打包,引用uncle区块,执行交易中的合约,更新区块状态 self._head_candidate, self._head_candidate_state = make_head_candidate( self.chain, txqueue, timestamp=int(time.time())) return self._head_candidate
chain 实例的_on_new_head回调
def _on_new_head(self, block): log.debug('new head cbs', num=len(self.on_new_head_cbs)) self.transaction_queue = self.transaction_queue.diff( block.transactions) self._head_candidate_needs_updating = True #cb是pow service的回调函数mine_head_candidate,更新head_candidate并开始挖矿 for cb in self.on_new_head_cbs: cb(block)
powservice的回调
def mine_head_candidate(self, _=None): #打包当前交易队里中的交易并更新head_candidate hc = self.chain.head_candidate if not self.active or self.chain.is_syncing: return elif (hc.transaction_count == 0 and not self.app.config['pow']['mine_empty_blocks']): return log.debug('mining', difficulty=hc.difficulty) #开始挖矿 self.ppipe.put(('mine', dict(mining_hash=hc.mining_hash, block_number=hc.number, difficulty=hc.difficulty)))
计算出幸运值后,调用chain.add_mined_block写入区块链并广播
#成功找到幸运值 def recv_found_nonce(self, bin_nonce, mixhash, mining_hash): log.info('nonce found', mining_hash=mining_hash.encode('hex')) #再次打包交易,更新head_candidate block = self.chain.head_candidate if block.mining_hash != mining_hash: log.warn('mining_hash does not match') return False block.header.mixhash = mixhash block.header.nonce = bin_nonce #添加新块并广播 if self.chain.add_mined_block(block): log.debug('mined block %d (%s) added to chain' % ( block.number, encode_hex(block.hash[:8]))) return True else: log.debug('failed to add mined block %d (%s) to chain' % ( block.number, encode_hex(block.hash[:8]))) return False
def add_mined_block(self, block): log.debug('adding mined block', block=block) assert isinstance(block, Block) #添加新块 if self.chain.add_block(block): log.debug('added', block=block, ts=time.time()) assert block == self.chain.head self.transaction_queue = self.transaction_queue.diff(block.transactions) self._head_candidate_needs_updating = True #广播新块 self.broadcast_newblock(block, chain_difficulty=self.chain.get_score(block)) return True log.debug('failed to add', block=block, ts=time.time()) return False
在写入新块完成后,调用new_head_cb回调,再次开始挖矿
至此,整个循环的过程就是矿工不断打包交易挖矿到广播新块的过程
ChainService
service start 服务启动服务启动时将eth_protocol(以太坊协议)中command对应的回调函数添加到eth_protocol实例中
先看看eth_protocol协议
以太坊协议定义eth_protocol.py
每个peer对象都有一个eth_protocol实例。在p2p协议中,当节点收到hello数据包的时候,便初始化所有协议实例,其中包括eth_protocol
peer对象receive hello
初始化eth_protocol
def connect_service(self, service): assert isinstance(service, WiredService) protocol_class = service.wire_protocol assert issubclass(protocol_class, BaseProtocol) # create protcol instance which connects peer with serivce protocol = protocol_class(self, service) # register protocol assert protocol_class not in self.protocols log.debug('registering protocol', protocol=protocol.name, peer=self) self.protocols[protocol_class] = protocol self.mux.add_protocol(protocol.protocol_id) protocol.start()#调用chain_service.on_wire_protocol_start(self)
chain service为该实例添加回调函数,并向对方节点发送status数据包
def on_wire_protocol_start(self, proto): log.debug('----------------------------------') log.debug('on_wire_protocol_start', proto=proto) assert isinstance(proto, self.wire_protocol) # register callbacks proto.receive_status_callbacks.append(self.on_receive_status)#处理status数据包 proto.receive_newblockhashes_callbacks.append(self.on_newblockhashes) proto.receive_transactions_callbacks.append(self.on_receive_transactions) proto.receive_getblockheaders_callbacks.append(self.on_receive_getblockheaders) proto.receive_blockheaders_callbacks.append(self.on_receive_blockheaders) proto.receive_getblockbodies_callbacks.append(self.on_receive_getblockbodies) proto.receive_blockbodies_callbacks.append(self.on_receive_blockbodies) proto.receive_newblock_callbacks.append(self.on_receive_newblock) # send status 一旦连接就向对方发送自己的区块链状态status head = self.chain.head proto.send_status(chain_difficulty=self.chain.get_score(head), chain_head_hash=head.hash, genesis_hash=self.chain.genesis.hash)
eth_protocol协议中包括
Status 与新节点建立连接后,互相发送自己的区块链状态
NewBlockHashes 向网络中广播一批新区块的hash
Transactions 包含一批交易的数据包
GetBlockHashes 从指定hash开始,请求一批BlockHashes
BlockHashes 返回GetBlockHashes请求
GetBlocks 从指定hash开始,请求一批Block
Blocks 返回GetBlocks请求
NewBlock 矿工挖矿后广播新区块,节点接到该区块后验证后添加到本地
1.收到status:
某个已经完成握手的peer节点发送他当前的区块链网络状态ethereum state,status数据包是节点之间建立连接后收到的第一个数据包。
通过status数据包来获取网络中最新的区块并更新本地的区块链
class status(BaseProtocol.command): """ protocolVersion: The version of the Ethereum protocol this peer implements. 30 at present. networkID: The network version of Ethereum for this peer. 0 for the official testnet. totalDifficulty: Total Difficulty of the best chain. Integer, as found in block header. latestHash: The hash of the block with the highest validated total difficulty. GenesisHash: The hash of the Genesis block. """ cmd_id = 0 sent = False structure = [ ('eth_version', rlp.sedes.big_endian_int), ('network_id', rlp.sedes.big_endian_int), ('chain_difficulty', rlp.sedes.big_endian_int),#totalDifficulty,该链的总难度 ('chain_head_hash', rlp.sedes.binary),#latestHash,头区块hash ('genesis_hash', rlp.sedes.binary)]#初始区块hash def create(self, proto, chain_difficulty, chain_head_hash, genesis_hash): self.sent = True network_id = proto.service.app.config['eth'].get('network_id', proto.network_id) return [proto.version, network_id, chain_difficulty, chain_head_hash, genesis_hash]
receive packet收到数据包
receive status解析数据包定位到status处理函数
之前已注册回调eth_protocol实例初始化时已注册
on receive status receive status处理函数
这里注意一点,这里看的是DAO事件之前的版本,不是分叉过后的版本(还是先按正常逻辑来。。)
def on_receive_status(self, proto, eth_version, network_id, chain_difficulty, chain_head_hash, genesis_hash): log.debug('----------------------------------') log.debug('status received', proto=proto, eth_version=eth_version) assert eth_version == proto.version, (eth_version, proto.version) #必须是同一个networkid if network_id != self.config['eth'].get('network_id', proto.network_id): log.warn("invalid network id", remote_network_id=network_id, expected_network_id=self.config['eth'].get('network_id', proto.network_id)) raise eth_protocol.ETHProtocolError('wrong network_id') # check genesis #初始区块hash必须一致 if genesis_hash != self.chain.genesis.hash: log.warn("invalid genesis hash", remote_id=proto, genesis=genesis_hash.encode('hex')) raise eth_protocol.ETHProtocolError('wrong genesis block') # request chain #调用同步器同步数据 self.synchronizer.receive_status(proto, chain_head_hash, chain_difficulty) # send transactions #获取网络中已知的但尚未被计入区块的交易transactions transactions = self.chain.get_transactions() if transactions: log.debug("sending transactions", remote_id=proto) #将这些交易告知对方 proto.send_transactions(*transactions)
synchronizer同步器同步数据receive_status:
def receive_status(self, proto, blockhash, chain_difficulty): "called if a new peer is connected" log.debug('status received', proto=proto, chain_difficulty=chain_difficulty) # memorize proto with difficulty #将改节点区块链总难度记下 self._protocols[proto] = chain_difficulty #对方头区块hash已经在本地存在,则忽略 if self.chainservice.knows_block(blockhash) or self.synctask: log.debug('existing task or known hash, discarding') return if self.force_sync: blockhash, chain_difficulty = self.force_sync log.debug('starting forced syctask', blockhash=blockhash.encode('hex')) self.synctask = SyncTask(self, proto, blockhash, chain_difficulty) #如果这条链总难度比自己的大,则认为这条链状态更新一点,并从他给出的头区块hash开始同步区块,同步的时候从已连接节点中总难度chain_difficulty最大的那个开始同步(但是这并不能保证同步的链就一定是主链,后面会分析如果同步的链不是主链的情况) elif chain_difficulty > self.chain.head.chain_difficulty(): log.debug('sufficient difficulty') self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)
fetch_hashchain 向网络请求以指定hash作为头区块的一批区块hash
def fetch_hashchain(self): log_st.debug('fetching hashchain') blockhashes_chain = [self.blockhash] # youngest to oldest # For testing purposes: skip the hash downoading stage # import ast # blockhashes_chain = ast.literal_eval(open('/home/vub/blockhashes.pyast').read())[:299000] blockhash = self.blockhash = blockhashes_chain[-1] assert blockhash not in self.chain # get block hashes until we found a known one max_blockhashes_per_request = self.initial_blockhashes_per_request while blockhash not in self.chain: # proto with highest_difficulty should be the proto we got the newblock from blockhashes_batch = [] # try with protos protocols = self.protocols if not protocols: log_st.warn('no protocols available') return self.exit(success=False) #这里的protocols是各个已连接节点peer对象的eth_protocol实例,按链总难度大小排序,难度最大在前面 for proto in protocols: log.debug('syncing with', proto=proto) if proto.is_stopped: continue # request assert proto not in self.requests deferred = AsyncResult() self.requests[proto] = deferred #从指定hash开始,请求一批BlockHashes proto.send_getblockhashes(blockhash, max_blockhashes_per_request) try: #获取到这批BlockHashes blockhashes_batch = deferred.get(block=True, timeout=self.blockhashes_request_timeout) except gevent.Timeout: log_st.warn('syncing hashchain timed out') continue finally: # is also executed 'on the way out' when any other clause of the try statement # is left via a break, continue or return statement. del self.requests[proto] if not blockhashes_batch: log_st.warn('empty getblockhashes result') continue if not all(isinstance(bh, bytes) for bh in blockhashes_batch): log_st.warn('got wrong data type', expected='bytes', received=type(blockhashes_batch[0])) continue break if not blockhashes_batch: log_st.warn('syncing failed with all peers', num_protos=len(protocols)) return self.exit(success=False) #在获取到的这批blockhashes中直到找到一个blockhash是自己数据库中有的,注意这里只要找到一个自己有的blockhash就可以了,而且这个blockhash不一定是自己本地chain的头区块,因为本地的头区块有可能不是在主链上的区块,上面已经提到过这点。节点总是会去同步最长的那条链,而且可能是从自己本地chain的头区块的父辈区块的某个区块开始同步,这样就提供了一定的纠错机制,让节点可以纠正到主链上去。 for blockhash in blockhashes_batch: # youngest to oldest assert utils.is_string(blockhash) if blockhash not in self.chain: blockhashes_chain.append(blockhash) else: log_st.debug('found known blockhash', blockhash=utils.encode_hex(blockhash), is_genesis=bool(blockhash == self.chain.genesis.hash)) break log_st.debug('downloaded ' + str(len(blockhashes_chain)) + ' block hashes, ending with %s' % utils.encode_hex(blockhashes_chain[-1])) max_blockhashes_per_request = self.max_blockhashes_per_request #取到最长链的这批blockhash后,开始同步区块 self.fetch_blocks(blockhashes_chain)
fetch blocks 向网络请求同步这批区块
def fetch_blocks(self, blockhashes_chain): # fetch blocks (no parallelism here) log_st.debug('fetching blocks', num=len(blockhashes_chain)) assert blockhashes_chain blockhashes_chain.reverse() # oldest to youngest num_blocks = len(blockhashes_chain) num_fetched = 0 while blockhashes_chain: blockhashes_batch = blockhashes_chain[:self.max_blocks_per_request] t_blocks = [] # try with protos protocols = self.protocols if not protocols: log_st.warn('no protocols available') return self.exit(success=False) #向每个节点请求 for proto in protocols: if proto.is_stopped: continue assert proto not in self.requests # request log_st.debug('requesting blocks', num=len(blockhashes_batch)) deferred = AsyncResult() self.requests[proto] = deferred #向网络请求这批hash值对应的区块 proto.send_getblocks(*blockhashes_batch) try: t_blocks = deferred.get(block=True, timeout=self.blocks_request_timeout) except gevent.Timeout: log_st.warn('getblocks timed out, trying next proto') continue finally: del self.requests[proto] if not t_blocks: log_st.warn('empty getblocks reply, trying next proto') continue elif not isinstance(t_blocks[0], TransientBlock): log_st.warn('received unexpected data', data=repr(t_blocks)) t_blocks = [] continue # we have results if not [b.header.hash for b in t_blocks] == blockhashes_batch[:len(t_blocks)]: log_st.warn('received wrong blocks, should ban peer') t_blocks = [] continue break # add received t_blocks num_fetched += len(t_blocks) log_st.debug('received blocks', num=len(t_blocks), num_fetched=num_fetched, total=num_blocks, missing=num_blocks - num_fetched) if not t_blocks: log_st.warn('failed to fetch blocks', missing=len(blockhashes_chain)) return self.exit(success=False) ts = time.time() log_st.debug('adding blocks', qsize=self.chainservice.block_queue.qsize()) for t_block in t_blocks: b = blockhashes_chain.pop(0) assert t_block.header.hash == b assert t_block.header.hash not in blockhashes_chain #将获取的block添加到队列,最后添加到本地数据库 self.chainservice.add_block(t_block, proto) # this blocks if the queue is full log_st.debug('adding blocks done', took=time.time() - ts) # done last_block = t_block assert not len(blockhashes_chain) assert last_block.header.hash == self.blockhash log_st.debug('syncing finished') # at this point blocks are not in the chain yet, but in the add_block queue if self.chain_difficulty >= self.chain.head.chain_difficulty(): #广播这批区块最新的那个出去。 self.chainservice.broadcast_newblock(last_block, self.chain_difficulty, origin=proto) self.exit(success=True)
至此,on receive status 函数处理完成,已同步已连接节点的最新区块
2.transactions
on_receive_transactions 节点收到transactions数据包后
矿工收到transactions数据包后,校验每笔交易的合法性,将交易打包进head candidate临时区块并广播交易
add_transaction
def add_transaction(self, tx, origin=None): if self.is_syncing: #如果正在同步中的话,本地链的状态是过时的 return # we can not evaluate the tx based on outdated state log.debug('add_transaction', locked=self.add_transaction_lock.locked(), tx=tx) assert isinstance(tx, Transaction) assert origin is None or isinstance(origin, BaseProtocol) if tx.hash in self.broadcast_filter: log.debug('discarding known tx') # discard early return # validate transaction #交易合法性校验 try: #tx签名要正确;交易数nonce要与本地账户nonce一致;gas要足够;本地账户余额足够 validate_transaction(self.chain.head_candidate, tx) log.debug('valid tx, broadcasting') self.broadcast_transaction(tx, origin=origin) # asap except InvalidTransaction as e: log.debug('invalid tx', error=e) return if origin is not None: # not locally added via jsonrpc if not self.is_mining or self.is_syncing: log.debug('discarding tx', syncing=self.is_syncing, mining=self.is_mining) return self.add_transaction_lock.acquire() #向head_candidate临时区块写入交易 success = self.chain.add_transaction(tx) self.add_transaction_lock.release() if success: self._on_new_head_candidate()
3.GetBlockHashes BlockHashes
在向网络同步区块链的时候被调用fetch_hashchain中调用send_getblockhashes来发送GetBlockHashes数据包
其他节点接受到GetBlockHashes数据包后,返回BlockHashes数据包,及指定blockash开始的一批blockhash。
4.GetBlocks Blocks
在向网络同步区块链的时候被调用fetch_blocks中调用send_getblocks来发送GetBlocks数据包
其他节点接受到GetBlocks数据包后,返回Blocks数据包。
5.NewBlock
矿工挖到新块后广播该区块
节点接收到广播的新区块
def receive_newblock(self, proto, t_block, chain_difficulty): "called if there's a newblock announced on the network" log.debug('newblock', proto=proto, block=t_block, chain_difficulty=chain_difficulty, client=proto.peer.remote_client_version) if t_block.header.hash in self.chain: assert chain_difficulty == self.chain.get(t_block.header.hash).chain_difficulty() # memorize proto with difficulty #记住该节点区块链的总难度 self._protocols[proto] = chain_difficulty #如果该区块存在则忽略 if self.chainservice.knows_block(block_hash=t_block.header.hash): log.debug('known block') return # check pow if not t_block.header.check_pow(): log.warn('check pow failed, should ban!') return #预计的总难度 expected_difficulty = self.chain.head.chain_difficulty() + t_block.header.difficulty #总难度至少比本地链的总难度大 if chain_difficulty >= self.chain.head.chain_difficulty(): # broadcast duplicates filtering is done in eth_service log.debug('sufficient difficulty, broadcasting', client=proto.peer.remote_client_version) self.chainservice.broadcast_newblock(t_block, chain_difficulty, origin=proto) else: # any criteria for which blocks/chains not to add? age = self.chain.head.number - t_block.header.number log.debug('low difficulty', client=proto.peer.remote_client_version, chain_difficulty=chain_difficulty, expected_difficulty=expected_difficulty, block_age=age) if age > self.MAX_NEWBLOCK_AGE: log.debug('newblock is too old, not adding', block_age=age, max_age=self.MAX_NEWBLOCK_AGE) return # unknown and pow check and highest difficulty # check if we have parent #如果有祖先,直接添加 if self.chainservice.knows_block(block_hash=t_block.header.prevhash): log.debug('adding block') self.chainservice.add_block(t_block, proto) #没有祖先说明还差一个以上区块,向网络同步区块 else: log.debug('missing parent') if not self.synctask: self.synctask = SyncTask(self, proto, t_block.header.hash, chain_difficulty) else: log.debug('existing task, discarding')
以上是以太坊节点与节点交互的协议部分
相关文章推荐
- python ethereum 代码分析 《3》
- python ethereum 代码分析
- appium python-client代码分析(一)
- 线性回归分析——含python代码
- Python自动化之rabbitmq rpc client端代码分析(原创)
- python 随机数生成的代码的详细分析
- 深入分析在Python模块顶层运行的代码引起的一个Bug
- Holt-Winters模型原理分析及代码实现(python)
- Python中使用插入排序算法的简单分析与代码示例
- Python数据分析与挖掘实战代码纠错 代码3-1
- Python代码中的捕捉性能-CPU分析(解释器)
- python几个实际代码的性能分析
- 【Python3-API】情感倾向分析示例代码
- Logistic Regression(基本原理分析+python代码实现)
- Python代码分析工具:PyChecker、Pylint
- 分析dnspython TLSA 代码
- 代写CS|留学生|金融编程|代码代做|C++语言|JAVA|R语言|Python|经济统计|数值分析|建模|作业加急|天才写手网
- 建模分析之机器学习算法(附python&R代码)
- appium python-client代码分析(二)