RocketMQ双Master模式部署
2017-02-20 09:39
148 查看
一、RocketMQ是一个分布式的、队列模型的消息中间件,具有以下特点:
1、能够保证严格的消息顺序
2、提供丰富的消息拉去模式
3、高效的订阅者水平扩展能力
4、实时的消息订阅机制
5、亿级的消息堆积能力
6、Metaq1.x和2.x。3.x以后改名RocketMQ
选用理由:
强调集群无单点,可扩展,任意一点高可用,水平可扩展。
海量消息堆积能力,消息堆积后,写入低延迟。
支持上万个队列
消息失败重试机制
消息可查询
二、RocketMQ集群方式
常见的几种Broker 集群部署方式,这里的Slave 不可写,但可读,类似于Mysql 主备方式。
2.1. 单个 Master
这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
2.2. 多 Master 模式
一个集群无 Slave,全是Master,例如2 个Master 或者3 个Master
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢) 。性能最高。
raid0 就是 把多个(最少2)硬盘合并1个逻辑盘使用,数据读写各硬盘同时操作,不同硬盘写入不同数据,速度快
raid1就是同时对2个硬盘读写同样的数据,强调数据的安全性。
raid10就是raid1+raid0,适合速度要求高的,又要完全容错。最少需要4块硬盘(做raid10要先做raid1,再把数个raid1做成raid0,这样比先做raid0再做raid1有跟高的可靠性)
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订
阅,消息实时性会受到受到影响。
### 先启动NameServer
### 在机器A,启动第一个Master
### 在机器B,启动第二个Master
3、RocketMQ部署【双Master方式】
3.1. 服务器环境
序号 IP 用户名 角色 模式
1 192.168.100.145 root nameServer1,brokerServer1 Master1
2 192.168.100.146 root nameServer2,brokerServer2 Master2
3.2. Hosts添加信息
IP NAME
192.168.100.145 rocketmq-nameserver1
192.168.100.145 rocketmq-master1
192.168.100.146 rocketmq-nameserver2
192.168.100.146 rocketmq-master2
3.3. 上传解压【两台机器】
3.4. 创建存储路径【两台机器】
3.5. RocketMQ配置文件【两台机器】--该文件有点类似于redis里面的redis.conf
3.6. 修改日志配置文件【两台机器】
上面的sed命令表示:cd到 /usr/local/rocketmq/conf目录下,将所有.xml问将中的${user.home}修改为/usr/local/rocketmq
3.7. 修改启动脚本参数【两台机器】
注意:rocketMQ最小堆内存是1G。如果设置成512M就会报错。
3.8. 启动NameServer【两台机器】--先启动NameServer然后再启动Broker
nohup sh mqnamesrv &表示,当前用户退出了该服务还是启动着的
如下图所示
通过tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log命令查看日志
3.9. 启动BrokerServer A【192.168.100.145】
通过nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 & 命令启动Broker
也是通过nohup守护启动 并且指定配置文件a.properties 错误的时候输出到
3.10. 启动BrokerServer B【192.168.100.146】
通过命令tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log可以查看日志
上面表示我们的rocketMQ环境已经搭建好了。由于RocketMQ没有像AcvtiveMQ自带管控台,所以我们需要在tomcat中部署rocketmq-console.war
启动tomcat:
/usr/local/tomcat/apache-tomcat-7.0.29/bin/startup.sh
http://192.168.100.145:8080/rocketmq-console
最简单的hello world程序(可以从github直接down下来源码)
消费者:
消费了五百条数据:
pom.xml的jar包配置
3.12. 数据清理
# cd /usr/local/rocketmq/bin
# sh mqshutdown broker
# sh mqshutdown namesrv
# --等待停止
# rm -rf /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
# --按照上面步骤重启NameServer与BrokerServer
1、能够保证严格的消息顺序
2、提供丰富的消息拉去模式
3、高效的订阅者水平扩展能力
4、实时的消息订阅机制
5、亿级的消息堆积能力
6、Metaq1.x和2.x。3.x以后改名RocketMQ
选用理由:
强调集群无单点,可扩展,任意一点高可用,水平可扩展。
海量消息堆积能力,消息堆积后,写入低延迟。
支持上万个队列
消息失败重试机制
消息可查询
二、RocketMQ集群方式
常见的几种Broker 集群部署方式,这里的Slave 不可写,但可读,类似于Mysql 主备方式。
2.1. 单个 Master
这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
2.2. 多 Master 模式
一个集群无 Slave,全是Master,例如2 个Master 或者3 个Master
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢) 。性能最高。
raid0 就是 把多个(最少2)硬盘合并1个逻辑盘使用,数据读写各硬盘同时操作,不同硬盘写入不同数据,速度快
raid1就是同时对2个硬盘读写同样的数据,强调数据的安全性。
raid10就是raid1+raid0,适合速度要求高的,又要完全容错。最少需要4块硬盘(做raid10要先做raid1,再把数个raid1做成raid0,这样比先做raid0再做raid1有跟高的可靠性)
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订
阅,消息实时性会受到受到影响。
### 先启动NameServer
### 在机器A,启动第一个Master
### 在机器B,启动第二个Master
3、RocketMQ部署【双Master方式】
3.1. 服务器环境
序号 IP 用户名 角色 模式
1 192.168.100.145 root nameServer1,brokerServer1 Master1
2 192.168.100.146 root nameServer2,brokerServer2 Master2
3.2. Hosts添加信息
IP NAME
192.168.100.145 rocketmq-nameserver1
192.168.100.145 rocketmq-master1
192.168.100.146 rocketmq-nameserver2
192.168.100.146 rocketmq-master2
# vi /etc/hosts
3.3. 上传解压【两台机器】
# 上传alibaba-rocketmq-3.2.6.tar.gz文件至/usr/local # tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local # mv alibaba-rocketmq-3.2.6 rocketmq ll /usr/local
3.4. 创建存储路径【两台机器】
# mkdir /usr/local/rocketmq/store # mkdir /usr/local/rocketmq/store/commitlog # mkdir /usr/local/rocketmq/store/consumequeue # mkdir /usr/local/rocketmq/store/index
3.5. RocketMQ配置文件【两台机器】--该文件有点类似于redis里面的redis.conf
# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties # vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a|broker-b #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
3.6. 修改日志配置文件【两台机器】
# mkdir -p /usr/local/rocketmq/logs # cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xmlsed命令:比如:sed 's/stringa1/stringa2/g' example.txt 将example.txt文件中的 "string1" 替换成 "string2"
上面的sed命令表示:cd到 /usr/local/rocketmq/conf目录下,将所有.xml问将中的${user.home}修改为/usr/local/rocketmq
3.7. 修改启动脚本参数【两台机器】
# vim /usr/local/rocketmq/bin/runbroker.sh
#============================================================ ================== # 开发环境JVM Configuration #============================================================ ================== JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m - XX:PermSize=128m -XX:MaxPermSize=320m"
# vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m - XX:PermSize=128m -XX:MaxPermSize=320m"
注意:rocketMQ最小堆内存是1G。如果设置成512M就会报错。
3.8. 启动NameServer【两台机器】--先启动NameServer然后再启动Broker
# cd /usr/local/rocketmq/bin # nohup sh mqnamesrv &
nohup sh mqnamesrv &表示,当前用户退出了该服务还是启动着的
如下图所示
通过tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log命令查看日志
3.9. 启动BrokerServer A【192.168.100.145】
# cd /usr/local/rocketmq/bin # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 & # netstat -ntlp # jps # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
通过nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 & 命令启动Broker
也是通过nohup守护启动 并且指定配置文件a.properties 错误的时候输出到
3.10. 启动BrokerServer B【192.168.100.146】
# cd /usr/local/rocketmq/bin # nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 & # netstat -ntlp # jps # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log # tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
通过命令tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log可以查看日志
上面表示我们的rocketMQ环境已经搭建好了。由于RocketMQ没有像AcvtiveMQ自带管控台,所以我们需要在tomcat中部署rocketmq-console.war
启动tomcat:
/usr/local/tomcat/apache-tomcat-7.0.29/bin/startup.sh
http://192.168.100.145:8080/rocketmq-console
最简单的hello world程序(可以从github直接down下来源码)
package quickstart; /** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * Producer,发送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }我发了1000条消息到MQ的2个节点中。
消费者:
package quickstart; /** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,订阅消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
消费了五百条数据:
pom.xml的jar包配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>recketmqAPI</groupId> <artifactId>recketmqAPI</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>recketmqAPI</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.25.Final</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>3.2.6</version> </dependency> </dependencies> </project>
3.12. 数据清理
# cd /usr/local/rocketmq/bin
# sh mqshutdown broker
# sh mqshutdown namesrv
# --等待停止
# rm -rf /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index
# --按照上面步骤重启NameServer与BrokerServer
相关文章推荐
- 用三层架构与设计模式思想部署企业级数据库业务系统开发(转)
- 用三层架构与设计模式思想部署企业级数据库业务系统开发
- SCCM 2007 & R2部署之连接远程SQL Server模式
- 《企业集成模式.设计、构建及部署消息传递解决方案》学习笔记
- Biztalk 2009在Windows 2008 R2环境中的High Availability(Cluster群集)部署(下)--AA模式
- 实战: SOLR的分布式部署(复制模式 CollectionDistribute)部署流程详解 (二)
- Application Virtualization 4.5 部署之四独立模式
- core模式下部署域控制器[为企业部署Windows Server 2008系列六] 推荐
- 《企业集成模式.设计、构建及部署消息传递解决方案》学习笔记
- 服务场模式部署WSS3中配置搜索服务
- 关于MOSS在工作组模式下的场级部署方式探索
- SCCM 2007 & R2部署之连接远程SQL Server模式 推荐
- 用三层架构与设计模式思想部署企业级数据库业务系统开发
- Configuration Manager 纯模式所需的 PKI 证书的分步部署示例
- 用三层架构与设计模式思想部署企业级数据库业务系统开发
- 水晶报表问题汇总(水晶报表的使用与查询条件生成报表、注册码、打印问题、模式使用示例、C#.Net的WinForm中的使用、程序发布与部署)
- 让开发自动化: 部署自动化模式
- 完整安装模式下配置windows server 2008[为企业部署Windows Server 2008系列二]
- Biztalk 2009在Windows 2008 R2环境中的High Availability(Cluster群集)部署(上)--AP模式
- Biztalk 2009在Windows 2008 R2环境中的High Availability(Cluster群集)部署(下)--AA模式