您的位置:首页 > 大数据 > 人工智能

Hyperledger Fabric 排序节点启动过程

2017-10-20 19:24 831 查看
Hyperledger 源码分析之 Fabric

Orderer 节点启动通过 
orderer
 包下的 main() 方法实现,会进一步调用到 
orderer/common/server
 包中的 
Main()
 方法。

核心代码如下所示。
// Main is the entry point of orderer processfunc Main() {
fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

// "version" command
if fullCmd == version.FullCommand() {
fmt.Println(metadata.GetVersionInfo())
return
}

conf := config.Load()
initializeLoggingLevel(conf)
initializeLocalMsp(conf)

Start(fullCmd, conf)}


包括配置初始化过程和核心启动过程两个部分:

config.Load():从本地配置文件和环境变量中读取配置信息,构建配置树结构。

initializeLoggingLevel(conf):配置日志级别。

initializeLocalMsp(conf):配置 MSP 结构。

Start():完成启动后的核心工作。


整体过程

核心启动过程都在 
orderer/common/server
包中的 Start() 方法,如下图所示。



Start() 方法会初始化 gRPC 服务需要的结构,然后启动服务。

核心代码如下所示。
func Start(cmd string, conf *config.TopLevel) {
logger.Debugf("Start()")
signer := localmsp.NewSigner()
manager := initializeMultichannelRegistrar(conf, signer)
server := NewServer(manager, signer, &conf.Debug)

switch cmd {
case start.FullCommand(): // "start" command
logger.Infof("Starting %s", metadata.GetVersionInfo())
initializeProfilingService(conf)
grpcServer := initializeGrpcServer(conf)
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
logger.Info("Beginning to serve requests")
grpcServer.Start()
case benchmark.FullCommand(): // "benchmark" command
logger.Info("Starting orderer in benchmark mode")
benchmarkServer := performance.GetBenchmarkServer()
benchmarkServer.RegisterService(server)
benchmarkServer.Start()
}}


包括两大部分:

gRPC 服务结构初始化;

gRPC 服务启动。


gRPC 服务结构初始化

包括创建新的 MSP 签名结构,初始化 Registrar 结构来管理各个账本结构,启动共识过程,以及创建 gRPC 服务端结构。

核心过程包括:
signer := localmsp.NewSigner() // 初始化签名结构manager := initializeMultichannelRegistrar(conf, signer) // 初始化账本管理器(Registrar)结构


其中,
initializeMultichannelRegistrar(conf, signer)
 方法最为关键,核心代码如下:
func initializeMultichannelRegistrar(conf *config.TopLevel, signer crypto.LocalSigner) *multichannel.Registrar {
// 创建账本操作的工厂结构
lf, _ := createLedgerFactory(conf)

// 如果是新启动情况,创建系统通道的账本结构
if len(lf.ChainIDs()) == 0 {
logger.Debugf("There is no chain, hence we must be in bootstrapping")
initializeBootstrapChannel(conf, lf)
} else {
logger.Info("Not bootstrapping because of existing chains")
}
//初始化共识插件
consenters := make(map[string]consensus.Consenter)
consenters["solo"] = solo.New()
consenters["kafka"] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version, conf.Kafka.Verbose)

// 创建各个账本的管理器(Registrar)结构,并启动共识过程
return multichannel.NewRegistrar(lf, consenters, signer)}


利用传入的配置信息和签名信息完成如下步骤:

创建账本操作的工厂结构;

如果是新启动情况,利用给定的系统初始区块文件初始化系统通道的相关结构;

完成共识插件(包括 
solo
 和 
kafka
 两种)的初始化;

multichannel.NewRegistrar(lf, consenters, signer)


方法会扫描本地账本数据(此时至少已存在系统通道),创建 Registrar 结构,并为每个账本都启动共识(如 Kafka 排序)过程。

说明:Registrar 结构(位于 
orderer.common.multichannel
 包)是 Orderer 组件中最核心的结构,管理了 Orderer
中所有的账本、共识插件等数据结构。


创建 Registrar 结构并启动共识过程

NewRegistrar(lf, consenters, signer)
 方法位于 
orderer.common.multichannel
 包,负责初始化链支持、消息处理器等重要数据结构,并为各个账本启动共识过程。

核心代码如下:
existingChains := ledgerFactory.ChainIDs()for _, chainID := range existingChains {
if _, ok := ledgerResources.ConsortiumsConfig(); ok { // 如果是系统账本
chain := newChainSupport(r, ledgerResources, consenters, signer)
chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain))
r.chains[chainID] = chain
r.systemChannelID = chainID
r.systemChannel = chain
defer chain.start() // 启动共识过程
else // 如果是应用账本
chain := newChainSupport(r, ledgerResources, consenters, signer)
r.chains[chainID] = chain
chain.start()  // 启动共识过程
}


chain.start()
 方法负责启动共识过程,以 Kafka 共识插件为例,最终调用到 
orderer.consensus.kafka
 包中的 
startThread()
 方法。

startThread()
 方法将为指定的账本结构配置共识服务,并将其启动,核心代码包括:
// 创建 Producer 结构chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)// 发送 CONNECT 消息sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel)// 创建处理对应 Kafka topic 的 Consumer 结构chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)// 配置从指定 partition 读取消息的 PartitionConsumer 结构chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)// 从该链对应的分区读取消息,并进行处理过程chain.processMessagesToBlocks()


主要包括如下步骤:

创建到 Kafka 集群的 Producer 结构并发送 CONNECT 消息;

为对应的 topic 创建 Consumer 结构,并配置从指定分区读取消息的 PartitionConsumer 结构;

启动链对应的 Kafka 分区中消息的循环处理过程。
processMessagesToBlocks()
 方法不断从分区中 Consume 消息并进行处理,同时定时发送 TimeToCut 消息。处理消息类型包括 Connect 消息(Producer 启动后发出)、TimeToCut 消息和 Regular 消息(普通的交易)。分别调用对应方法进行处理。


gRPC 服务启动

初始化 gRPC 服务结构,完成绑定并启动监听。
// 初始化 gRPC 服务端结构server := NewServer(manager, signer, &conf.Debug)// 创建 gRPC 服务连接grpcServer := initializeGrpcServer(conf)// 绑定 gRPC 服务并启动ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)grpcServer.Start()


其中,
NewServer(manager, signer, &conf.Debug)
 方法(位于 
orderer.common.server
 包)最为核心,将
gRPC 相关的服务结构进行初始化,并绑定到 gRPC 请求上。分别响应 Deliver() 和 Broadcast() 两个 gRPC 调用。
// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Readerfunc NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer {
s := &server{
dh:    deliver.NewHandlerImpl(deliverSupport{Registrar: r}),
bh:    broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
debug: debug,
}
return s}


===========================

《区块链原理、设计与应用》一书已经正式出版,以超级账本项目为例,介绍了区块链和分布式账本技术的底层原理、设计架构、应用实践的大量细节,欢迎大家阅读指正。

===== 关于 TechFirst 公众号 =====

专注金融科技、人工智能、云计算、大数据相关领域的热门技术与前瞻方向。

发送关键词(如区块链、云计算、大数据、容器),获取热门点评与技术干货。

欢迎投稿!

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息