您的位置:首页 > 其它

RocketMQ 源码分析

2015-08-01 22:20 337 查看

RocketMQ 源码分析

RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异)。接下来记录下自己阅读源码的一些探索。

RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务;生产者和消费者是消息的源头和归宿。




在知道各个角色的基本位置后,就该让程序跑起来,这样才能看的更真切,RocketMQ依赖很少,可以很容易的在本地部署,首先要设置环境变量ROCKETMQ_HOME,这是RocketMQ需要的工作目录。nameserver 首先启动,这样才会监听(监听端口为9876)来自Broker的连接,Broker启动的时候指定nameserver,否则会到某个服务器寻找可用的nameserver列表,然后可以运行rocketmq-example中的例子进行简单的测试。

Name server短小精悍,协调全局

从代码可以看到主要是初始化服务端通信层和线程执行组件, 然后启动ServerBootstrap启动。通过jstack命令你可以看到启动了哪些线程。

Producer发送消息是如何得知发到哪个broker的 ?

每个应用在收发消息之前,一般会调用一次producer.start()/consumer.start(),幕后的工作就是:创建需要的实例对象,如MQClientInstance;设置定时任务,如从Nameserver中定时更新本地的Topic route info,发送心跳信息到所有的broker,动态调整线程池的大小,等等;把当前producer加入到指定的组中。客户端会缓存路由信息TopicPublishInfo, 同时定期从NameServer取Topic路由信息,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer。Producer在发送消息的时候会去查询本地的topicPublishInfoTable(一个HashMap),如果没有命中的话就会与NameServer沟通得到路由信息,RequestCode=GET_ROUTEINTO_BY_TOPIC 如果nameserver中也木有查询到,那么将会发送一个default的topic进行路由查询。具体过程如下图所示:



Producer发送消息的具体过程,以同步模式为例。

得到了通信地址,发送过程就显而易见了。可以看到在选择消息队列进行发送时采用随机方式,同时和上一次发送的broker保持不同,防止热点。



Broker是如何接收来自Producer的消息呢?

每个producer咋发送消息的时候都和对应的Broker建立了长连接,此时broker已经准备好接收Message,具体过程如下。接收到消息后,会先写入Commit Log文件(顺序写,写满了会新建一个新的文件),然后更新Consume queue文件(存储如何由topic定位到具体的消息)。



RocketMQ 存储特点

RocketMQ的消息采用顺序写到commitlog文件,然后利用consume queue文件作为索引,如图。RocketMQ采用零拷贝mmap+write的方式来回应Consumer的请求,RocketMQ宣称大部分请求都会在Page Cache层得到满足,所以消息过多不会因为磁盘读使得性能下降,这里自己的理解是,在64bit机器下,虚存地址空间(vm_area_struct)不是问题,所以相关的文件都会被映射到内存中(有定期删除文件的操作),即使此刻不在内存,操作系统也会因为缺页异常进行换入,虽然地址空间不是问题,但是一个进程映射文件的个数是有限的,所以可能在这里发生OOM。




传统读写和两种零拷贝的简单对比如下,其实就是CPU和IO的权衡。




通过Broker中的存储目录也能看到上述表述的体现:



顺序消息是如何保证的?

需要业务层自己觉得哪些消息应该顺序保证,然后发送的时候通过规则映射到同一个队列,因为没有谁比业务自己更加知道关于消息顺序的特点。这样的顺序是相对顺序,局部顺序,因为发送方只保证把这些消息顺序的发送到broker上的同一队列,但是不保证其他Producer也会发送到那个队列,所以需要Consumer在拉到消息后做一些过滤。

RocketMQ 刷盘实现

刷盘的最终实现都是使用NIO中的MappedByteBuffer.force()将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。异步而言,只是唤醒对应的线程,不保证执行的时机,流程如下。



消息过滤

类似于重复数据删除技术,可以在源端做,也可以在目的端实现,就是网络和存储的权衡,如果在Broker端做消息过滤就需要逐一比对consume queue的tagsCode字段(hashcode),如果符合则传输给消费者,因为是hashcode,所以存在误判,需要在Consumer接收到消息后进行字符串级别的过滤,确保准确性。

总结:阅读这个代码也花了好几天时间,虽然在分布式可靠性方面还未仔细研究,主要看了关键的设计思想和流程,有很多东西要不断的沉淀和消化,特别是多线程同步,异步等。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: