您的位置:首页 > 大数据 > 人工智能

Hyperledger Fabric 排序节点处理 Broadcast 请求的实现

2017-10-20 18:12 861 查看
Hyperledger 源码分析之 Fabric

Broadcast,意味着客户端将请求消息(例如完成背书后的交易)通过 gRPC 接口发送给 Ordering 服务。

这些请求消息,会交给 
orderer.common.server
 包中 server 结构体的 
Broadcast(srv ab.AtomicBroadcast_BroadcastServer)
error
 方法处理。该方法主要会调用到 
orderer.common.broadcast
 包中 handlerImpl 结构的 
Handle(srv
ab.AtomicBroadcast_BroadcastServer) error
 方法进行处理。

handlerImpl 结构体十分重要,完成对 Broadcast 请求的处理过程。
type handlerImpl struct {
sm ChannelSupportRegistrar}func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error


Handle(srv ab.AtomicBroadcast_BroadcastServer) error
 方法会开启一个循环来从 srv 中读取请求消息并进行处理,直到结束。核心代码如下所示。
for {
msg, error := srv.Recv() // 从请求中提取一个 Envelope 消息
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg) // 解析消息:是否为配置消息,消息应由哪个链处理  

// 对应的链对消息进行处理
if !isConfig {
configSeq, err := processor.ProcessNormalMsg(msg) //消息检查
processor.Order(msg, configSeq) //入队列操作
} else {
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新消息
processor.Configure(config, configSeq) //入队列操作
}

srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) // 返回响应消息}


Broadcast 请求的整体处理过程如下图所示。




解析消息

首先,解析消息,获取消息头、是否为配置消息、获取对应处理器结构(链结构)。
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)


实际上,会映射到 orderer.common.server 包中 broadcastSupport 结构体的 
BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, broadcast.ChannelSupport, error)
 方法,进一步调用到
orderer.common.multichannel 包中 Registrar 结构体的对应方法。
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
chdr, err := utils.ChannelHeader(msg)
if err != nil {
return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
}

cs, ok := r.chains[chdr.ChannelId] //应用通道、系统通道
if !ok {
cs = r.systemChannel
}

isConfig := false
switch cs.ClassifyMsg(chdr) {
case msgprocessor.ConfigUpdateMsg:
isConfig = true
default:
}

return chdr, isConfig, cs, nil}


channel 头部从消息信封结构中解析出来;是否为配置信息根据消息头中类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE);通过字典查到对应的 ChainSupport 结构(应用通道、系统通道)作为处理器。

之后,利用解析后的结果,分别对不同类型的消息(普通消息、配置消息)进行不同处理。下面以应用通道的 ChainSupport 结构作为处理器进行介绍。


处理非配置消息

对于非配置消息,主要执行如下两个操作:消息检查和入队列操作。
configSeq, err := processor.ProcessNormalMsg(msg) //消息检查processor.Order(msg, configSeq) //入队列操作


消息检查方法会映射到 orderer.common.msgprocessor 包中 StandardChannel 结构体的 
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
 方法,实现如下。
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = s.support.Sequence() // 获取配置的序列号,映射到 common.configtx 包中 configManager 结构体的对应方法
err = s.filters.Apply(env) // 进行过滤检查,实现为 orderer.common.msgprocessor 包中 RuleSet 结构体的对应方法。
return}


其中,过滤器会在创建 ChainSupport 结构时候初始化:

应用通道:orderer.common.mspprocessor 包中的 
CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet
 方法,包括 EmptyRejectRule、SizeFilter 和 SigFilter(ChannelWriters
角色)。

系统通道:orderer.common.mspprocessor 包中的 
CreateSystemChannelFilters(chainCreator ChainCreator, ledgerResources channelconfig.Resources) *RuleSet
 方法,包括 EmptyRejectRule、SizeFilter、SigFilter(ChannelWriters
角色)和 SystemChannelFilter。

入队列操作会根据 consensus 配置的不同映射到 orderer.consensus.solo 包或 orderer.consensus.kafka 包中的方法。

以 kafka 情况为例,会映射到 chainImpl 结构体的对应方法。该方法会将消息进一步封装为 
sarama.ProducerMessage
 类型消息,通过 enqueue 方法发给 Kafka 后端。
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
marshaledEnv, err := utils.Marshal(env)
if err != nil {
return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
}
if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq)) {
return fmt.Errorf("cannot enqueue")
}
return nil}


处理配置消息

对于配置消息,处理过程与正常消息略有不同,包括合并配置更新消息和入队列操作。
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新消息processor.Configure(config, configSeq) //入队列操作


合并配置更新消息方法会映射到 orderer.common.msgprocessor 包中 StandardChannel 结构体的 
ProcessConfigUpdateMsg(env *cb.Envelope) (configSeq uint64, err error)
 方法,计算合并后的配置和配置编号,实现如下。
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
logger.Debugf("Processing config update message for channel %s", s.support.ChainID())

seq := s.support.Sequence()
err = s.filters.Apply(env)
if err != nil {
return nil, 0, err
}

configEnvelope, err := s.support.ProposeConfigUpdate(env)
if err != nil {
return nil, 0, err
}

config, err = utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
if err != nil {
return nil, 0, err
}

err = s.filters.Apply(config)
if err != nil {
return nil, 0, err
}

return config, seq, nil}


入队列操作会根据 consensus 配置的不同映射到 orderer.consensus.solo 包或 orderer.consensus.kafka 包中的方法。

以 kafka 情况为例,会映射到 chainImpl 结构体的对应方法。该方法会将消息进一步封装为 
sarama.ProducerMessage
 类型消息,通过 enqueue 方法发给 Kafka 后端。
func (chain *chainImpl) Configure(config *cb.Envelope, configSeq uint64) error {
marshaledConfig, err := utils.Marshal(config)
if err != nil {
return fmt.Errorf("cannot enqueue, unable to marshal config because = %s", err)
}
if !chain.enqueue(newConfigMessage(marshaledConfig, configSeq)) {
return fmt.Errorf("cannot enqueue")
}
return nil}


返回响应

如果处理成功,则返回成功响应消息。
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})


===========================

《区块链原理、设计与应用》一书已经正式出版,以超级账本项目为例,介绍了区块链和分布式账本技术的底层原理、设计架构、应用实践的大量细节,欢迎大家阅读指正。



===== 关于 TechFirst 公众号 =====

专注金融科技、人工智能、云计算、大数据相关领域的热门技术与前瞻方向。

发送关键词(如区块链、云计算、大数据、容器),获取热门点评与技术干货。

欢迎投稿!

如果你喜欢公众号内容,欢迎鼓励一杯 coffee~

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息