Michael.W谈hyperledger Fabric第21期-详细带读Fabric的源码6-orderer节点接收交易数据与广播区块数据
2019-03-17 21:16
771 查看
版权声明:原创内容,转载请注明作者。 https://blog.csdn.net/michael_wgy_/article/details/88626888
这两个部分围绕着orderer/server.go文件里的server结构体展开:
Michael.W谈hyperledger Fabric第21期-详细带读Fabric的源码6-orderer节点接收交易数据与广播区块数据
这两个部分围绕着orderer/server.go文件里的server结构体展开:
type server struct { // 交易数据的接收 bh broadcast.Handler // 区块数据的广播 dh deliver.Handler }
1 接收交易数据
进入交易接收,查看broadcast.Handler定义:进入orderer/common/broadcast/broadcast.go中
type Handler interface { // 为gRPC连接启动一个服务线程并服务于广播的连接 Handle(srv ab.AtomicBroadcast_BroadcastServer) error }
这个文件里面有很多接口定义,这里就不一一介绍了。
来主要看一下Handle这个成员方法,即对Handler接口的实现:
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { logger.Debugf("Starting new broadcast loop") for { // 交易的接收(基于gRPC) msg, err := srv.Recv() if err == io.EOF { logger.Debugf("Received EOF, hangup") return nil } if err != nil { logger.Warningf("Error reading from stream: %s", err) return err } // 将接收到的消息(protofuf序列化),反序列化成Payload对象 payload, err := utils.UnmarshalPayload(msg.Payload) // 校验反序列化数据的正确 if err != nil { logger.Warningf("Received malformed message, dropping connection: %s", err) return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } if payload.Header == nil { logger.Warningf("Received malformed message, with missing header, dropping connection") return srv.Send(&a 1d25b mp;ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } // 反序列化payload.Header.ChannelHeader,并验证 chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err) return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) { // 如果该交易消息是配置交易(更新配置) logger.Debugf("Preprocessing CONFIG_UPDATE") // 对该*cb.Envelope类型的配置交易进行一次格式上的转换,但从外面看转换后依然是*cb.Envelope。 msg, err = bh.sm.Process(msg) if err != nil { logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err) return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } // 此处也可以理解为对如果是配置交易,就对Payload做一次二次校验 err = proto.Unmarshal(msg.Payload, payload) if err != nil || payload.Header == nil { logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } // 此处也可以理解为对如果是配置交易,就对payload.Header.ChannelHeader做一次二次校验 chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err) return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } if chdr.ChannelId == "" { logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } } // 通过通道ID获得一个chainsupport对象 support, ok := bh.sm.GetChain(chdr.ChannelId) if !ok { logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId) return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND}) } logger.Debugf("[channel: %s] Broadcast is filtering message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type]) // 将该交易信息传到chainsupport的过滤器进行过滤。这个地方是第一次对交易消息的过滤,之前看区块分割的源代码的Ordered方法中对交易数据又进行了一次过滤 _, filterErr := support.Filters().Apply(msg) if filterErr != nil { logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr) return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } // 如果以上检查都没问题,将该交易信息入列。加入队列成功返回true,加入失败返回false。 // 如果加入队列成功,随后就被solo或kafka进行排序处理 if !support.Enqueue(msg) { return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) } if logger.IsEnabledFor(logging.DEBUG) { logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type]) } // 处理成功后,返回一个SUCCESS信息。 err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) if err != nil { logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err) return err } } }
const ( Status_UNKNOWN Status = 0 Status_SUCCESS Status = 200 Status_BAD_REQUEST Status = 400 Status_FORBIDDEN Status = 403 Status_NOT_FOUND Status = 404 Status_REQUEST_ENTITY_TOO_LARGE Status = 413 Status_INTERNAL_SERVER_ERROR Status = 500 Status_SERVICE_UNAVAILABLE Status = 503 ) // 可见处理成功后,客户端会收到orderer节点发过来的状态码200。
在之前我手动搭建Fabric网络时,也会看见在转账成功后,会有"status:200"的提示。
2 广播区块数据
区块是如何被扩散出去的?
进入orderer/common/deliver/deliver.go中,主要来看一下deliverServer对Handle方法的实现:
// Handler接口用来对发送请求进行管理 type Handler interface { Handle(srv ab.AtomicBroadcast_DeliverServer) error } // Handle方法的实现 func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error { logger.Debugf("Starting new deliver loop") for { logger.Debugf("Attempting to read seek info message") // 接收别处发来的请求 envelope, err := srv.Recv() if err == io.EOF { logger.Debugf("Received EOF, hangup") return nil } if err != nil { logger.Warningf("Error reading from stream: %s", err) return err } // 将接收到的消息(protofuf序列化),反序列化成Payload对象 payload, err := utils.UnmarshalPayload(envelope.Payload) if err != nil { logger.Warningf("Received an envelope with no payload: %s", err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } if payload.Header == nil { logger.Warningf("Malformed envelope received with bad header") return sendStatusReply(srv, cb.Status_BAD_REQUEST) } // 反序列化payload.Header.ChannelHeader,并校验 chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) if err != nil { logger.Warningf("Failed to unmarshal channel header: %s", err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } // 通过通道ID获取chainsupport对象 chain, ok := ds.sm.GetChain(chdr.ChannelId) if !ok { logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId) return sendStatusReply(srv, cb.Status_NOT_FOUND) } // 监听是否错误发生。erroredChan类型为 <-chan struct{} erroredChan := chain.Errored() select { // 如果监听到有错误发生 case <-erroredChan: logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId) return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE) default: } // 获取通道最新的配置序列号,以此来检测通道配置有没有被更改 lastConfigSequence := chain.Sequence() // 得到一个签名过滤器。每一次对签名进行评估前,都会去调用策略的名称。 // 因为策略的名称和策略本身都是可变的。这也是为什么每次在调用策略的时候不仅要通过策略本身来检索,还需要通过策略的名称来检索。 sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) // 验证请求发出者的身份是否有效 result, _ := sf.Apply(envelope) if result != filter.Forward { logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId) return sendStatusReply(srv, cb.Status_FORBIDDEN) } // 解析传递过来的请求消息的内容[1] seekInfo := &ab.SeekInfo{} if err = proto.Unmarshal(payload.Data, seekInfo); err != nil { logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } // 校验请求消息中的Start和Stop是否正确 if seekInfo.Start == nil || seekInfo.Stop == nil { logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo) cursor, number := chain.Reader().Iterator(seekInfo.Start) var stopNum uint64 switch stop := seekInfo.Stop.Type.(type) { // 如果要查询最原始的区块,那么Stop的值跟Start相等 case *ab.SeekPosition_Oldest: stopNum = number // 如果要查询最新的区块,那么Stop的值等于当前区块高度-1 case *ab.SeekPosition_Newest: stopNum = chain.Reader().Height() - 1 // 自定义一个区块编号来查询 case *ab.SeekPosition_Specified: stopNum = stop.Specified.Number //如果自定义编号大于账本中的区块数 if stopNum < number { logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum) return sendStatusReply(srv, cb.Status_BAD_REQUEST) } } // 判断行为 for { // 如果请求的行为为等待orderer if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY { select { // 监听出现错误,直接退出 case <-erroredChan: logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId) return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE) case <-cursor.ReadyChan(): } } else { //如果请求的行为为不等待orderer select { case <-cursor.ReadyChan(): default: // 只监听一次,如果orderer不准备好,直接退出,状态码404 return sendStatusReply(srv, cb.Status_NOT_FOUND) } } // 校验配置信息是否被更改 currentConfigSequence := chain.Sequence() if currentConfigSequence > lastConfigSequence { // 如果当前的配置序列号大于最新的配置序列号(表名当前配置信息已经被更改) // 将最新的配置序列号改为当前的配置序列号 lastConfigSequence = currentConfigSequence // 生成签名过滤器,验证请求的有效性 sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager()) result, _ := sf.Apply(envelope) if result != filter.Forward { logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId) return sendStatusReply(srv, cb.Status_FORBIDDEN) } } // 如果以上的验证都没问题,开始循环读取区块 block, status := cursor.Next() if status != cb.Status_SUCCESS { logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status) return sendStatusReply(srv, status) } logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo) // 返回查到的区块返给消息请求方(即peer组织的主节点) if err := sendBlockReply(srv, block); err != nil { logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err) return err } // 如果读到了事先在请求消息中的结束的位置,跳出循环 if stopNum == block.Header.Number { break } } // 返回一个成功的标识 if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil { logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err) return err } logger.Debugf("[channel: %s] Done delivering for (%p), waiting for new SeekInfo", chdr.ChannelId, seekInfo) } }
[1] 看一下请求内容对象ab.SeekInfo的结构:
type SeekInfo struct { // 从哪个区块开始查询 Start *SeekPosition `protobuf:"bytes,1,opt,name=start" json:"start,omitempty"` // 到哪个区块结束查询 Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop" json:"stop,omitempty"` Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"` // SeekInfo_SeekBehavior有两种行为,一种是一直等到orderer节点准备就绪,一种是如果orderer节点没有准备好直接返回 }
以上就是orderer节点将排完序的区块扩散到Fabric网络中的整个过程。
ps:
本人热爱图灵,热爱中本聪,热爱V神,热爱一切被梨花照过的姑娘。
以下是我个人的公众号,如果有技术问题可以关注我的公众号来跟我交流。
同时我也会在这个公众号上每周更新我的原创文章,喜欢的小伙伴或者老伙计可以支持一下!
如果需要转发,麻烦注明作者。十分感谢!
公众号名称:后现代泼痞浪漫主义奠基人
相关文章推荐
- Michael.W谈hyperledger Fabric第22期-详细带读Fabric的源码7-orderer节点相关源码梳理
- Michael.W谈hyperledger Fabric第25期-详细带读Fabric的源码10-peer节点交易的读写集合RWSet
- Michael.W谈hyperledger Fabric第28期-详细带读Fabric的源码13-区块的存储持久化
- Michael.W谈hyperledger Fabric第26期-详细带读Fabric的源码11-世界状态在代码级别的行为实现
- Michael.W谈hyperledger Fabric第27期-详细带读Fabric的源码12-历史状态数据库在代码级别的行为实现
- HyperLedger Fabric:自顶向下的方法--第1篇 编译fabric源码及手工搭建单个Peer节点网络
- Hyperledger Fabric Orderer节点启动
- Hyperledger Fabric 1.0 从零开始(四)——Fabric源码及镜像文件处理
- Hyperledger Fabric 1.0 从零开始(七)——启动Fabric多节点集群
- hyperLedger Fabric 系列二:peer节点的介绍
- Hyperledger Fabric源码解析
- 我对hyperledger fabric1.1.0的执着(四):部署单机多节点网络
- Hyperledger Fabric处理Peer与Peer之间通信的源码解析
- hyperledger fabric1.0动态添加peer节点之添加peer节点
- Hyperledger fabric 源码分析之 peer 服务启动过程
- 区块链之Hyperledger(超级账本)Fabric v1.0 的环境搭建(超详细教程)
- Hyperledger Fabric笔记--kafka共识的多orderer集群部署
- hyperledger fabric 1.0交易流程理解
- Hyperledger fabric0.6 peer启动过程源码分析
- 基于Hyperledger Fabric的多节点部署