您的位置:首页 > 其它

RocketMQ源码分析之基础知识介绍(一)

2018-07-31 01:09 495 查看
版权声明:本博客为技术原创, 未经允许,谢绝转载 !!! https://blog.csdn.net/yewandemty/article/details/81294883

RocketMQ之基础知识讲解

一、RocketMQ知识介绍

1.1 前言

  • RocketMQ是一个队列模型的消息中间件,具有高性能、高可用、高实时、分布式特点。
  • 对列集合成为Topic, 如果Consumer做广播消费,则一个Consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均这个Topic对应的队列集合。
  • 能够保证严格的消息顺序, 但是会失去FailOver特性,可以通过同步双写模式解决,但是服务切换任然会造成几分钟服务不可用。
  • 可以提供多种消息拉取方式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅能力
  • 亿级消息堆积能力,RocketMQ消息是持久化的,内部通过将几天前的消息删除,来满足亿级消息的处理能力。
  • 较少的依赖
  • RocketMQ主要引用了JMS规范和Corbar Notification规范, 但是RocketMQ并不遵循任何规范,它参考了各种规范和同类的设计思想 。

  • RocketMq经历了三个主要版本的迭代,Metaq 1.x , MetaQ 2.x , RocketMQ 3.x , 在 Metaq 1.x 和 2.x版本中,用zookeeper作为注册中心,以满足高可用性能,但是nameSrv不仅可以满足高可用,相较于zookeeper更加的轻量级而且性能更好

1.2 专业术语

  • Producer
    消息生产者,负责产生消息,一般由业务系统负责产生消息
  • Consumer
    消息消费者,负责消费消息,一般是后台系统负责异步消费
  • Push Consumer
    Consumer的一种, 应用通常向Consumer注册一个Listener接口, 一旦收到消息, Consumer对象立即回掉Listener接口方法 。
  • Pull Consumer
    Cosumer的一种, 应用通常主动调用Consumer拉取消息方法从Broker拉取消息, 主动权由应用控制
  • Producer Group
    一类Producer的集合名称,这类Producer通常发送一类消息, 且发送逻辑一样
  • Consumer Group
    一类Consumer的集合名称, 这类Consumer通常消费一类消息,且消费逻辑一样
  • Broker
    消息中转角色, 负责存储消息,转发消息,一般也成为Server, 在JMS规范中成为Provider
  • 广播消费
    一条消息被多个Consumer消费, 即使这些Consumer属于同一个Consumer Group , 消息也会被Consumer Group中的每一个Consumer都消费一次,广播消费中的Consumer Group 概念可以认为在消息划分方面没有意义。
    在Corba Notification规范中,消息方式都属于广播消费。
    在JMS规范中,相当于JMS publish/subscribe 模式
  • 集群消费
    一个Consumer Group中的Consumer实例平均分摊消费消息,其中Consumer Group有三个实例 ,那么每个实例只需要消费其中3条消息。
    在JMS规范中,point-to-point与之类似,但是RocketMQ的集群消费能力 >= PTP模型,因为单个Consumer Group内的消费者类似于PTP,但是一个Topic/Queue可以被多个Consumer Group消费。
  • 普通顺序消息
    正常情况下,可以保证完全的顺序消费,但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希 取模后定位的队列会变化,产生短暂的消息顺序不一致。如果能够忍受消息的短暂乱序可以使用普通顺序方式
  • 严格顺序消息
    无论正常情况还是异常情况,都可以保证完全的顺序消费,但是牺牲了分布式Failover特性,即只要Broker集群中有一台机器不可用,则整个集群不可用,服务可用性大大降低,如果服务器部署为同步双写模式,此缺陷可通过备机切换为主机避免,不过仍然存在几分钟的服务不可用(依赖同步双写,主备自动切换,主备切换功能目前还没有实现)
  • Message Queue
    在RocketMQ中,所有的消息队列都是持久化,长度无限制的数据结构,所谓长度无限制是指队列中的每个存储单元都是定长的,访问其中的存储单元通过offset来访问,offset为java long类型,64位,理论上100年不会溢出,另外队列只保存最近几天的数据,之前的数据会按照过期时间来删除。

1.3 RocketMQ解决的问题

  • Publish/Subscribe(发布订阅) 支持
    消息中间件最基本的功能
  • Message Priority(消息优先级) 不支持
    消息优先级指的是在同一个队列中每条消息有不同的优先级,按照消息优先级进行投递。RocketMQ所有消息都是持久化的,同一个队列消息如果按照优先级进行投递,需要进行排序,开销会很大。
    替代方案, 设置多个队列, 分别表示不同优先级的队列
    1) 达到优先级目的,但不是严格意义的优先级。用多个Topic分别表示不同的优先级, 需要对业务优先级的额精确的做出妥协 。
    2) 严格意义的优先级,优先级用整数表示。这种方式不能通过不同的Topic来解决,如果通过MQ来解决这个问题,会对性能有很大影响, 需要确定是否确实需要这种严格的优先级, 如果通过多个Topic来表示优先级会有什么影响 。

    Message Order(消息有序) 支持
    消息有序指的是一类消息消费时,能按照发送的顺序来消费,比如果同一个订单的不同阶段可以保证完全有序 。
  • Message Filter(消息过滤) 支持
    1) Broker端消息过滤。按照Consumer端的要求进行过滤,减少了网络无用消息的过滤,但是增加了Broker端的负担,实现相对复杂 。
    2) Consumer端消息过滤。可以由应用完全自定义实现,可能会造成很多无用的消息传输到Consumer端。

  • Message Persistense(消息持久化) 支持
    1) 持久化到数据库, 比如Mysql
    2) 持久化到KV存储, 例如levelDB
    3) 文件记录是持久化
    4) 对内存数据库做一个持久化镜像,比如beanstalkd, Visinotify
    1),2),3) 具有将内存队列Buffer进行扩展的能力 , 4) 是内存的镜像,作用是当Broker挂掉或重启后仍能将之前的数据回复出来。

  • Message Reliability(消息可靠性) 支持
    影响消息可靠性的几种情况,
    1) RocketMQ可以保证消息不丢失或丢失少量数据。
    Broker正常关闭, Broker异常crash, OS Crash, 机器断电 但是可以立即回复供电情况
    2) 单点故障,一旦发生消息全部丢失。
    机器无法开机, 磁盘设备损坏
    解决方式: 通过异步复制, 可保证数据99%消息不丢失; 同步双写模式可以完全避免地点故障,但是会影响性能,适合对消息要求极高的场合 。

  • Low Latency Messaging(低延迟消息) 支持
    在消息不堆积情况下,消息到达Broker后,能立即到达Consumer。RockerMQ使用长轮询Pull方式,可以保证消息非常实时 。

  • At least Once(消息必须投递一次) 支持
    RocketMQ Consumer先Pull消息到本地,消费完后,才向客户端ack, 如果没有消费一定不会ack

  • Exactly Only Once 不支持
    1) 发送消息阶段, 不允许发送重复消息
    2) 接收消息阶段, 不允许消费重复消息
    为了支持高性能, RocketMQ不支持这种特性,在分布式环境下,不可避免的会产生巨大的性能开销,可以在业务上进行去重,保证消费消息的幂等性。

  • 消息堆积的处理方式
    Broker的buffer通常指Broker中一个内存Buffer的大小,这类Buffer通常大小有限。RocketMQ没有内存Buffer的概念, 数据是持久化到磁盘,定期进行数据清理。RocketMQ的内存Buffer抽象成一个无限的长度的队列,理论不管来多少数据都可以容纳的下,Broker会定期删除过期的数据,以保证长度无限。

  • 回溯消费 支持
    回溯是指已经消费Consumer已经消费成功,由于业务需求需要重新消费,RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯也可以向后回溯。

  • 消息堆积
    消息中间件主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这要求消息中间件具有一定的消息堆积能力。处理方式
    1) 消息堆积在内存Buffer, 可以按照一定的策略丢弃消息,消息堆积后,性能不会下降太大,内存中存储数据的多少对影响对外提供访问的处理能里影响有限。
    1.1) 拒绝新来的消息,
    1.2) 按照特定策略丢弃已有信息
    AnyOrder, FifoOrder, LifoOrder, PriorityOrder, DeadlineOrder
    2) 消息堆积到持久化系统中,例如DB,KV系统,文件记录形式。

    堆积能里评估标准:
    1) 消息的堆积能力
    2) 消息堆积后,发消息的吞吐量是否收到影响
    3) 消息堆积后,发消息的吞吐量大小,是否会受到堆积的影响
    4) 消息堆积后,访问磁盘的消息时,吞吐量的大小

  • 定时消息
    定时消息是指消息发到Broker后,不能被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
    Broker支持定时消息,定时不支持任意时间精度,只支持特定的level,比如5s, 10s , 1mins
  • 1.4 RocketMQ网络部署图

    • Name Server是一个几乎无状态几点, 可集群部署,几点之间无任何消息同步 。
    • Broker分为Master和Slave, 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master, Master与Slave之间通过执行相同的Broker Name 不同的BrokerId来定义,brokerId = 0 表示Master节点, brokerId > 0 表示slave节点 , 每个broker与每个Name Server建立长连接,定时注册Topic信息到Name Server
    • Producer 与Name Server 集群中的某个节点建立长连接(随机选择) , 定期从Namer Server获取Topic的路由信息, 并向提供topic服务的Master建立长连接,且定时向Master发送心跳,Producer是无状态的, 可集群部署
    • Consumer与Name Server集群中的某个节点建立长连接(随机选择),定期从Name Server 获取Topic的路由信息, 并向提供服务的Master、slave建立长连接,且定时向Master、Slave发送心跳,Consumer既可以向Master订阅信息, 也可以向Slave订阅信息,具体的订阅规则由Broker的配置决定。

    1.5 RocketMQ逻辑部署结构

    • Producer Group

      用来表示发送消息用 , 一个Producer Group 包含多个Producer实例
    • Producer实例可以是多台机器,也可以是一台机器的多个线程,或者一个线程的多个Producer对像
    • 标识一类Producer
    • 发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回掉Producer Group 内的任意一台机器来确认事务状态
  • Consumer Group

      用来表示一个消费者应用,一个Consumer Group 下包含多个Consumer实例
    • Consumer实例,可以是多台机器、一台机器的多个线程、一个线程的多个Consumer对象
    • 一个Consmer Group下多个Consumer以均摊的方式消费消息, 如果广播的消费方式,那么每个Consumer消费所有的消息

    1.6 RocketMQ存储特点 mmap + write方式

    • 使用mmap + write 方式
      优点:即使频繁调用,小块文件传输效率也很高
    • 缺点: 不能很好的利用DMA方式, 回避sendfile多消耗CPU,内存安全性控制复杂,需要比秒JVM Crash
  • 使用sendfile方式

      优点: 可以使用DMA方式,消耗CPU比较少, 大块文件效率高,无内存安全问题
    • 缺点:小块文件传输效率较mmap方式低,只能是BIO方式传输,不能使用NIO。
  • 数据存储结构
    图片没有导入成功, 后期导入可以导入再添加该部分信息

  • RocketMQ关键特性

      所有数据单独存储到一个Commit Log的位置信息,完全顺序写,随机读
    • 对最终用户展现的队列实际只存储消息在Commit Log的位置消息,并且串行方式刷盘
    • 优点
      队列轻量化,单个队列数量非常少。
    • 对磁盘的访问串行化,避免磁盘竞争,不会因为队列增加导致IOWAIT增高
  • 缺点
      写虽然完全顺序写,但是读却变成了完全的随机读。
    • 读一条信息, 会先读Consumer queue , 再读Commit Log , 增加了开销。
    • 要保证Commit Log 与 Consume Queue完全的一致 , 增加了编程的复杂度
  • 规避方式
      随机读。尽可能让读命中PAGE CACHE,减少IO操作,多以内存越大越好。
    • 如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降。 不会急剧下降
      访问PAGECACHE , 系统会提前预读出更多的数据,在下次读时, 可能命中内存 。
    • 随机访问Commit Log 磁盘数据,设置系统IO调度算法位NOOP , 会在一定程度上将完全的随机读取变成顺序跳跃方式 , 这种跳跃方式较随机方式性能要快 。
  • 由于Consume Queue存储数据量极少,而且是顺序读,在PAGE CACHE预读作用下,即使存在堆积,Consume Queue的读性能几乎和内存一致 。
  • Commit Log中存储了所有的元信息, 包含消息体 ,所以只要Commit Log 的消息在,Consume Queue即使数据丢失,任然可以回复出来 。
  • 数据刷盘
      异步刷盘
    • 同步刷盘

    1.7RocketMQ客户端

    • 客户端寻址方式

      代码中指定

      producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
      consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
    • Java启动参数中指定Name Server地址

      -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
    • 环境变量指定Name Server地址

      export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
    • HTTP静态服务器资源(推荐) 客户端部署简单, Name Server集群可热部署
      客户端启动后, 会定时访问一个静态HTTP服务器,比如: http://jmenv.tbsite.net:8080/rocketmq/nsaddr
      访问后,这个地址会返回 192.168.0.1:9876;192.168.0.2:9876
      可以通过修改/etc/host 增加如下配置 10.232.191.1 jmenv.tbsite.net
  • 客户端的公共配置

  • 参数名 默认值 说明
    namesrvAddr Name Server地址列表,多个Name Server用分号隔开
    clientIP 本机IP 客户端本机IP, 多网卡的情况下,某些机器会发生无法识别客户端IP地址的情况,需要应用在代码强制指定
    instanceName DEFAULT 客户端实例名称,客户端创建多个Peoducer、Consumer实际是使用一个内部实例(这个实例包含网络连接、线程资源等)
    clientCallbackExcutorThreads 4 通信层异步回调线程数
    poolNameServerInteval 30000 沦轮询Name Server 间隔时间,单位ms
    heartbeatInteval 30000 向Broker发送心跳间隔时间 ,单位ms
    persistConsumerOffsetInteval 5000 持久化Consumer消费进度间隔时间,得ms
    • Producer 配置
    参数名 默认值 说明
    producerGroup DEFAULT_PRODUCER producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将他们归为一组
    createTopicKey TBW102 在发送消息时自动创建服务器不存在的topic,需要指定Key
    defaultTopicQueueNums 4 默认创建的队列数
    sendMsgTimeOut 10000 发送消息超时时间,单位毫秒(ms)
    compressMsgBodyOverHowmuch 4096 消息body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
    retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult, 当是发送sendStatus != OK, 是否重试发送
    maxMessageSize 131072 客户端限制的消息大小,超过报错,同时服务端也会限制
    transactionListener 事务消息回查监听器, 如果发送事务消息,必须设置
    checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池最小数目
    checkThreadPoolMaxSize 1 Broker回查Producer事务状态时, 线程池最大数目
    checkRequestHoldMax 2000 Broker回查Producer事务状态时, Producer本地缓冲请求队列大小
    • PushComsumer 配置
    参数名 默认值 说明
    consumerGroup DEFAULT_GROUP Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一样,则应该将他们归为一组
    messageModel CLUSTERING 消息模型 支持 1. 集群消费 2. 广播消费
    consumerFromWhere CONSUMER_FROM_LAST_OFFSET Consumer启动后默认从什么位置开始消费
    allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
    subscription {} 订阅关系
    messageListener 消息监听器
    offsetStore 消费进度存储
    consumerThreadMin 10 消费线程池最小数目
    consumerThreadMax 20 消费线程池最大数目
    consumerConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度
    pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数
    pullInteval 0 拉消息间隔,由于是长轮询,所以为0 ,但是应用为了控流,也可以设置大于0的值,单位毫秒(ms)
    consumerMessageBatchMaxSize 1 批量消费,一次消费多少条消息
    ppullBatchSize 32 批量拉消息, 一次最多拉多少条
    • PullConsumer配置
    参数名 默认值 说明
    producerGroup DEFAULT_PRODUCER producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将他们归为一组
    brokerSuspendMaxTimeMills 20000 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒(ms)
    cosumerTimeoutMillsWhenSuspend 300000 长轮询, Consumer拉消息请求在Broker挂起超过指定时间, 客户端认为超时,单位毫秒(ms)
    consumerPullTimeoutMills 10000 非长轮询,拉消息超时时间,单位毫秒(ms)
    messageModel BROADCASTING 消息模型,支持以下两种, 1. 集群消费 2.广播消费
    messageQueueListener 监听队列变化
    offsetStore 消息进度存储
    registerTopics [] 注册的Topic集合
    allocateMessageQueueStrangely AllocateMessageQueueAveragely Rebalance算法实现策略
    • Producer数据结构

      Producer数据结构

    • Consumer数据结构
      在Producer端,使用org.apache.rocketmq.common.message.Message这个数据结构,由于Broker会为Message增加数据结构,所以消息到达Consumer后, 会在Message基础上增加多个字段,Consumer看到的是org.apache.rocketmq.common.message.MessageExt 这个数据结构,MessageExt继承自Message 。

    注: 后期有什么新的内容会不定期的更新

    阅读更多
    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: 
    相关文章推荐