您的位置:首页 > 其它

RocketMQ源码深度解析三之Broker篇

2017-08-07 16:50 351 查看
(一)Broker的初始化

在初始化过程中,会调用 BrokerController 对象的 initialize 方法进行初始化工作,大致逻辑如下:

(1)加载 topics.json、 consumerOffset.json、 subscriptionGroup.json 文件,分别将各文件的数据存入 TopicConfigManager、 ConsumerOffsetManager、SubscriptionGroupManager 对象中;

下面给出示例

//加载topics.json
{
"dataVersion":{
"counter":2,
"timestatmp":1393729865073
},
"topicConfigTable":{
//根据 consumer 的 group 生成的重试 topic
"%RETRY% group_name":{
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%RETRY% group_name",
"writeQueueNums":1
},
"TopicTest":{
"perm":6, // 100 读权限 , 10 写权限 6 是 110 读写权限
"readQueueNums":8,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"writeQueueNums":8
}
}
}


//加载消费进度偏移量consumerOffset.json
{
"offsetTable":{
"%RETRY% group_name@ group_name":{
0:0 //重试队列消费进度为零
},
"TopicTest@ group_name":{
0:23,1:23,2:22,3:22,4:21,5:18,6:18,7:18
//分组名 group_name 消费 topic 为 TopicTest 的进度为:
// 队列 queue=0 消费进度 23
// 队列 queue=2 消费进度为 22 等等…
}
}
}


//加载消费者订阅关系
{
"dataVersion":{
"counter":1,
"timestatmp":1393641744664
},
" group_name":{
"brokerId":0, //0 代表这台 broker 机器为 master,若要设为 slave 值大于 0
"consumeBroadcastEnable":true,
"consumeEnable":true,
"consumeFromMinEnable":true,
"groupName":" group_name",
"retryMaxTimes":5,
"retryQueueNums":1,
"whichBrokerWhenConsumeSlowly":1
}
}
}


(2)初始化 DefaultMessageStore 对象,该对象是应用层访问存储层的访问类;

(3)调用 DefaultMessageStore.load 加载数据

3.1)调用 ScheduleMessageService.load 方法,初始化延迟级别列表

3.2)调用 CommitLog.load 方法,在此方法中调用 MapedFileQueue.load 法,将HOME/store/commitlog目录下的所有文件加载到MapedFileQueue的List变量中3.3)调用DefaultMessageStore.loadConsumeQueue方法加载consumequeue文件数据到DefaultMessageStore.consumeQueueTable集合中3.4)调用TransactionStateService.load()方法加载tranStateTable文件和tranRedoLog文件3.5)初始化StoreCheckPoint对象,加载HOME/store/checkpoint 文件,该

文件记录三个字段值,分别是物理队列消息时间戳、逻辑队列消息时间戳、索引队列消息时间戳

3.6)调用 IndexService.load 方法加载$HOME/store/index 目录下的文件。

对该目录下的每个文件初始化一个 IndexFile 对象。

3.7)恢复内存数据。

4、初始化 Netty 服务端 NettyRemotingServer 对象

5、初始化发送消息线程池( sendMessageExecutor)、拉取消息线程池(pullMessageExecutor)、管理 Broker 线程池( adminBrokerExecutor)、客户端管理线程池( clientManageExecutor)

6、注册事件处理器

7、 启动定时任务。

8、若该 Broker 为主用,则启动定时任务打印主用 Broker 和备用 Broker 在Commitlog 上的写入位置相差多少个字节

9、若该 Broker 为备用,设置同步 Config 文件的定时任务,每隔 60 秒调用 SlaveSynchronize.syncAll()方法向主用 Broker 请求一次 config 类文件的同步。

(二)启动过程

1、调用 DefaultMessageStore.start 方法启动 DefaultMessageStore 对象中的一些服务线程。

2、 启动 Broker 的 Netty 服务端 NettyRemotingServer。监听消费者或生产者发起的请求信息并处理;

3、启动 BrokerOuterAPI 中的 NettyRemotingClient,即建立与 NameServer的链接,用于自身 Broker 与其他模块的 RPC 功能调用;

4、 启动拉消息管理服务 PullRequestHoldService,当拉取消息时未发现消息,则初始化 PullRequeset 对象放入该服务线程的 pullRequestTable 列表中,由 PullRequestHoldService 每隔 1 秒钟就检查一遍每个 PullRequeset 对象要读取的数据位置在 consumequeue 中是否已经有数据了,若有则交由PullManageProcessor 处理。

5、 启动 ClientHousekeepingService 服务,在启动过程中设置定时任务

6、启动 FilterServerManager,每隔 30 秒定期一次检查 Filter Server 个数,若没有达到 16 个则创建

7、首先调用 BrokerController.registerBrokerAll 方法立即向 NameServer注册 Broker;然后设置定时任务,每隔 30 秒调用一次该方法向 NameServer 注册;

8、启动每隔 5 秒对未使用的 topic 数据进行清理的定时任务。该定时任务调用 DefaultMessageStore.cleanUnusedTopic(Settopics)方法

(三)向NameServer注册Broker

调用 BrokerController.registerBrokerAll 方法立即向 NameServer 注册Broker。大致步骤如下:

(1)将 TopicConfigManager.topicConfigTable 变量序列化成TopicConfigSerializeWrapper 对象;

(2)通过BrokerOuterAPI.registerBroker向NameServer注册

(3)根据 updateMasterHAServerAddrPeriodically 标注位判断是否需要更新主用Broker地址

(4)用 NameServer 返回的 MasterAddr 值更新 SlaveSynchronize.masterAddr值,用于主备 Broker 同步 Config 文件使用;

(5)根据 Name Server 上的配置参数 ORDER_TOPIC_CONFIG 的值来更新Broker端的 TopicConfigManager.topicConfigTable 变量值

(四)处理Producer发送的消息

如果收到请求码为SEND_MESSAGE/SEND_MESSAGE_V2 的消息后转由 SendMessageProcessor 处理。大致过程如下:

1、解码收到的消息,并构建 SendMessageRequestHeader 对象;

2、若 SendMessageProcessor 处理器设置了发送消息的钩子,则调用该钩子类的

sendMessageBefore 方法,执行发送前的处理方法,在发送消息结果之后调用

sendMessageAfter 方法。 该钩子类是为了便于业务层面的扩展而设计的

3、 调用 SendMessageProcessor.sendMessage方法处理的消息并返回处理结果

PutMessageResult 对象;

4、 根据上一步返回的 PutMessageResult 对象中的status,设置相应的消息代码

(五)处理心跳消息

接受到客户端发送的 HEART_BEAT 请求码之后,由ClientManageProcessor.processRequest方法处理该请求。心跳消息调用

heartBeat(ChannelHandlerContext ctx, RemotingCommand request)方法处理。大致逻辑如下:

1、 解码接受消息,生成 HeartbeatData 对象;

2、根据链接的 Channel、 ClientID 等信息初始化 ClientChannelInfo 对象

3、若 HeartbeatData 对象中的 ConsumerData 集合有数据,则进行 Consumer注册

4、若 HeartbeatData 对象中的 ProducerData 集合有数据,则对每个ProducerData进行 Producer 注册

其实都是根据收到的请求码,然后执行相应的验证和处理,其他的方法就不详细说了。有兴趣的可以自己去看,遇到不懂的欢迎和我交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rocketmq 源码