您的位置:首页 > 其它

RocketMQ集群搭建

2019-02-23 23:53 369 查看

RocketMQ集群搭建

  • 1.集群架构
  • 2.集群环境配置
  • 3.集群配置文件
  • 4.修改启动脚本参数
  • 5.详细搭建步骤
  • 6.Rocketmq-Console搭建步骤
  • 7.停止集群注意点
  • 8.坑汇总:
  • 9.参考
  • 前言

    整个RocketMQ消息系统主要由如下4个部分组成:

    从中间件服务角度来看整个RocketMQ消息系统(服务端)主要分为:NameSrv和Broker两个部分。

    NameSrv:在RocketMQ分布式消息系统中,NameSrv主要提供两个功能:

    提供服务发现和注册,这里主要是管理Broker,NameSrv接受来自Broker的注册,并通过心跳机制来检测Broker服务的健康性;

    提供路由功能,集群(这里是指以集群方式部署的NameSrv)中的每个NameSrv都保存了Broker集群(这里是指以集群方式部署的Broker)中整个的路由信息和队列信息。这里需要注意,在NameSrv集群中,每个NameSrv都是相互独立的,所以每个Broker需要连接所有的NameSrv,每创建一个新的topic都要同步到所有的NameSrv上。

    Broker:主要是负责消息的存储、传递、查询以及高可用(HA)保证等。其由如下几个子模块(源码总体上也是这么拆分的)构成:

    remoting,是Br 4000 oker的服务入口,负责客户端的接入(Producer和Consumer)和请求处理。

    client,管理客户端和维护消费者对于Topic的订阅。

    store,提供针对存储和消息查询的简单的API(数据存储在物理磁盘)。

    HA, 提供数据在主从节点间同步的功能特性。

    Index,通过特定的key构建消息索引,并提供快速的索引查询服务。

    而从客户端的角度看主要有:Producer、Consumer两个部分。

    Producer:消息的生产者,由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。

    Consumer:消息的消费者,也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制,满足大多数消费场景。

    来总结下,整个RocketMQ消息集群就是由NameSrv/Broker、Producer/Consumer组成的。为了让大家更清晰的理解它们之间的关系,我们以一条完整的信息流转为例,来看看RocketMQ消息系统是如何运转的,如下图所示:

    RocketMQ集群模式

    RocketMQ集群部署有多种模式,对于NameSrv来说可以同时部署多个节点,并且这些节点间也不需要有任何的信息同步,这里因为每个NameSrv节点都会存储全量路由信息,在NameSrv集群模式下,每个Broker都需要同时向集群中的每个NameSrv节点发送注册信息,所以这里对于NameSrv的集群部署来说并不需要做什么额外的设置。

    而对于Broker集群来说就有多种模式了,这里我先给大家介绍下这几种模式,然后我们再来看看生产级的部署方式是什么:

    1)、单个Master模式

    一个Broker作为主服务,不设置任何Slave,很显然这种方式风险比较大,存在单节点故障会导致整个基于消息的服务挂掉,所以生产环境不可能采用这种模式。

    2)、多Master模式

    这种模式的Broker集群,全是Master,没有Slave节点。这种方式的优缺点如下:

    优点:配置会比较简单一些,如果单个Master挂掉或重启维护的话对应用是没有什么影响的。如果磁盘配置为RAID10(服务器的磁盘阵列模式,遗忘的同学可以自己查下资料)的话,即使在机器宕机不可恢复的情况下,由于RAID10磁盘本身的可靠性,消息也不会丢失(异步刷盘丢失少量消息,同步刷盘一条不丢),这种Broker的集群模式性能相对来说是最高的。

    缺点:就是在单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前是不可以进行消息订阅的,这对消息的实时性会有一些影响。

    3)、多Master多Slave模式(异步复制)

    在这种模式下Broker集群存在多个Master节点,并且每个Master节点都会对应一个Slave节点,有多对Master-Slave,HA(高可用)之间采用异步复制的方式进行信息同步,在这种方式下主从之间会有短暂的毫秒级的消息延迟。

    优点:在这种模式下即便磁盘损坏了,消息丢失的情况也非常少,因为主从之间有信息备份;并且,在这种模式下消息的实时性也不会受影响,因为Master宕机后Slave可以自动切换为Master模式,这样Consumer仍然可以通过Slave进行消息消费,而这个过程对应用来说则是完全透明的,并不需要人工干预;另外,这种模式的性能与多Master模式几乎差不多。

    缺点:如果Master宕机,并且在磁盘损坏的情况下,会丢失少量的消息。

    4)、多Master多Slave模式(同步复制)

    这种模式与3)差不多,只是HA采用的是同步双写的方式,即主备都写成功后,才会向应用返回成功。

    优点:在这种模式下数据与服务都不存在单点的情况,在Master宕机的情况下,消息也没有延迟,服务的可用性以及数据的可用性都非常高。

    缺点:性能相比于异步复制来说略低一些(大约10%);另外一个缺点就是相比于异步复制,目前Slave备机还暂时不能实现自动切换为Master,可能后续的版本会支持Master-Slave的自动切换功能。

    生产级RocketMQ集群

    综合考虑以上集群模式的优缺点,在实际生产环境中目前基于RocketMQ消息集群的部署方式基本都是采用多Master多Slave(异步复制)这种模式,
    看到这里相信大家应该对RocketMQ有一个大致的了解了,那么下面我们就具体看看,如何搭建一套生产级的RocketMQ消息集群系统吧!

    1.集群架构

    2.集群环境配置

    OS: Red Hat 4.8.5-16 Memory: 8G 2台

    JDK 1.8.0_151-b12

    3.集群配置文件

    [broker-a-m] broker-a.properties

    brokerClusterName=mos-test-rocketmq-cluster
    brokerName=broker-a
    brokerId=0
    namesrvAddr=xx.xxx.xx.01:9876;xx.xxx.xx.02:9876
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #存储路径
    storePathRootDir=/opt/rocketmq/store/rootdir-a-m
    #commitLog 存储路径
    storePathCommitLog=/opt/rocketmq/store/commitlog-a-m
    #消费队列存储路径存储路径
    storePathConsumeQueue=/opt/rocketmq/store/consumequeue-a-m
    #消息索引存储路径
    storePathIndex=/opt/rocketmq/store/index-a-m
    #checkpoint 文件存储路径
    storeCheckpoint=/opt/rocketmq/store/checkpoint-a-m
    #abort 文件存储路径
    abortFile=/opt/rocketmq/store/abort-a-m
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH

    [broker-b-s] broker-b-s.properties

    brokerClusterName=mos-test-rocketmq-cluster
    brokerName=broker-b
    brokerId=1
    namesrvAddr=xx.xxx.xx.01:9876;xx.xxx.xx.02:9876
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10921
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #存储路径
    storePathRootDir=/opt/rocketmq/store/rootdir-b-s
    #commitLog 存储路径
    storePathCommitLog=/opt/rocketmq/store/commitlog-b-s
    #消费队列存储路径存储路径
    storePathConsumeQueue=/opt/rocketmq/store/consumequeue-b-s
    #消息索引存储路径
    storePathIndex=/opt/rocketmq/store/index-b-s
    #checkpoint 文件存储路径
    storeCheckpoint=/opt/rocketmq/store/checkpoint-b-s
    #abort 文件存储路径
    abortFile=/opt/rocketmq/store/abort-b-s
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH

    [broker-b-m] broker-b.properties

    brokerClusterName=mos-test-rocketmq-cluster
    brokerName=broker-b
    brokerId=0
    namesrvAddr=xx.xxx.xx.01:9876;xx.xxx.xx.02:9876
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #存储路径
    storePathRootDir=/opt/rocketmq/store/rootdir-b-m
    #commitLog 存储路径
    storePathCommitLog=/opt/rocketmq/store/commitlog-b-m
    #消费队列存储路径存储路径
    storePathConsumeQueue=/opt/rocketmq/store/consumequeue-b-m
    #消息索引存储路径
    storePathIndex=/opt/rocketmq/store/index-b-m
    #checkpoint 文件存储路径
    storeCheckpoint=/opt/rocketmq/store/checkpoint-b-m
    #abort 文件存储路径
    abortFile=/opt/rocketmq/store/abort-b-m
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH

    [broker-a-s] broker-a-s.properties

    brokerClusterName=mos-test-rocketmq-cluster
    brokerName=broker-a
    brokerId=1
    namesrvAddr=xx.xxx.xx.01:9876;xx.xxx.xx.02:9876
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10921
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=48
    #存储路径
    storePathRootDir=/opt/rocketmq/store/rootdir-a-s
    #commitLog 存储路径
    storePathCommitLog=/opt/rocketmq/store/commitlog-a-s
    #消费队列存储路径存储路径
    storePathConsumeQueue=/opt/rocketmq/store/consumequeue-a-s
    #消息索引存储路径
    storePathIndex=/opt/rocketmq/store/index-a-s
    #checkpoint 文件存储路径
    storeCheckpoint=/opt/rocketmq/store/checkpoint-a-s
    #abort 文件存储路径
    abortFile=/opt/rocketmq/store/abort-a-s
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH

    4.修改启动脚本参数

    runserver.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    runbroker.sh

    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

    5.详细搭建步骤

    //下载zip包
    https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
    
    //切换目录
    cd /opt
    
    //上传rocketmq-all-4.3.2-bin-release.zip
    rz
    
    //解压
    unzip rocketmq-all-4.3.2-bin-release.zip
    
    //重命名文件夹
    mv rocketmq-all-4.3.2-bin-release rocketmq
    
    //创建存储路径,以 broker-a机器为例
    mkdir -p ./store/{rootdir-a-m,commitlog-a-m, checkpoint-a-m,abort-a-m, consumequeue-a-m, index-a-m,rootdir-b-s,commitlog-b-s, checkpoint-b-s, abort-b-s, consumequeue-b-s, index-b-s}
    
    //修改配置文件前先备份
    cp -r conf/ conf.bak/
    
    //修改日志目录
    cd /opt/ rocketmq/conf
    sed -i 's#${user.home}#/opt/rocketmq#g' *.xml
    
    //修改nameServerJVM启动参数
    //因为受限于系统资源,因此JVM参数调小
    vim runserver.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    vim runbroker.sh
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
    //分别启动nameServer
    nohup sh mqnamesrv > /opt/rocketmq/logs/mqnamesrv.log 2>&1 &
    
    //启动broker-a-m
    nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a.properties  > /opt/rocketmq/logs/mqbroker-a-m.log 2>&1 &
    
    //启动broker-b-m
    nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b.properties > /opt/rocketmq/logs/mqbroker-b-m.log 2>&1 &
    
    //启动broker-b-s
    nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b-s.properties  > /opt/rocketmq/logs/mqbroker-b-s.log 2>&1 &
    
    //启动broker-a-s
    nohup sh mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a-s.properties > /opt/rocketmq/logs/mqbroker-a-s.log 2>&1 &
    
    //查看启动状态
    mqadmin clusterList --namesrvAddr=xx.xxx.xx.01:9876
    

    6.Rocketmq-Console搭建步骤

    rocketmq提供多种管理方式,命令行和界面等,
    apache提供一个开源的扩展项目: https://github.com/apache/rocketmq-externals 里面包含一个子项目rocketmq-console,配置下,打个包就可以用了

    //启动rocketmq-console
    //namesrv-addr参数需要在启动时指定,启动后在界面上添加无效,谨防入坑!!!
    nohup java -jar rocketmq-console-ng-1.0.0.jar --server_port=8081 --data-path=/usr/local/rocketmq-console/data --namesrv-addr='xx.xxx.xx.01:9876;xx.xxx.xx.02:9876' &
    
    //打开rocketmq-console
    http://xx.xxx.xx.xx:8081/

    7.停止集群注意点

    Broker 重启对客户端的影响
    Broker 重启可能会导致正在发往这台机器的的消息发送失败,RocketMQ提供了一种优雅关闭Broker的方法,通过执行以下命令会清除Broker的写权限,过40s后,所有客户端都会更新Broker路由信息,此时再关闭Broker就不会发生发送消息失败的情况,因为所有消息都发往了其他 Broker。

    //清楚broker的写权限
    sh mqadmin wipeWritePerm -b brokerName -n namesrvAddr
    //先停止broker
    sh bin/mqshutdown broker
    //最后停止namesrv
    sh bin/mqshutdown namesrv

    8.坑汇总:

    1.同一个consumerGroup不要订阅不同的topic,broker向consumer推消息,consumer可能无法消费到,
    2.事务消息推送,transactionListenerGroup和consumerGroup不可相同

    9.参考

    rocketmq快速入门:https://rocketmq.apache.org/docs/quick-start/
    rocketmq与其它消息中间件对比:https://rocketmq.apache.org/docs/motivation/
    rocketmq核心概念:https://rocketmq.apache.org/docs/core-concept/
    rocketmq管理命令:https://rocketmq.apache.org/docs/cli-admin-tool/
    rocketmq集群:https://rocketmq.apache.org/docs/cluster-deployment/
    集群搭建:https://www.jianshu.com/p/23e04d8178b8
    集群搭建:https://blog.csdn.net/weixin_40533111/article/details/84451219

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: