您的位置:首页 > 其它

RocketMQ双Master模式部署

2017-02-20 09:39 148 查看
一、RocketMQ是一个分布式的、队列模型的消息中间件,具有以下特点:

1、能够保证严格的消息顺序

2、提供丰富的消息拉去模式

3、高效的订阅者水平扩展能力

4、实时的消息订阅机制

5、亿级的消息堆积能力

6、Metaq1.x和2.x。3.x以后改名RocketMQ

选用理由:

  强调集群无单点,可扩展,任意一点高可用,水平可扩展。 

       海量消息堆积能力,消息堆积后,写入低延迟。 

       支持上万个队列 

       消息失败重试机制 

       消息可查询 

二、RocketMQ集群方式 

  常见的几种Broker 集群部署方式,这里的Slave 不可写,但可读,类似于Mysql 主备方式。 

2.1.  单个 Master 

     这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。 

   2.2.  多 Master 模式 

      一个集群无 Slave,全是Master,例如2 个Master 或者3 个Master 

优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢) 。性能最高。 

raid0 就是 把多个(最少2)硬盘合并1个逻辑盘使用,数据读写各硬盘同时操作,不同硬盘写入不同数据,速度快

raid1就是同时对2个硬盘读写同样的数据,强调数据的安全性。

raid10就是raid1+raid0,适合速度要求高的,又要完全容错。最少需要4块硬盘(做raid10要先做raid1,再把数个raid1做成raid0,这样比先做raid0再做raid1有跟高的可靠性)

缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订 

阅,消息实时性会受到受到影响。 

  ### 先启动NameServer 

  ### 在机器A,启动第一个Master 

  ### 在机器B,启动第二个Master 

3、RocketMQ部署【双Master方式】

3.1. 服务器环境

序号                IP                         用户名                              角色                                              模式

1       192.168.100.145            root               nameServer1,brokerServer1                   Master1

2       192.168.100.146            root                 nameServer2,brokerServer2                 Master2

3.2. Hosts添加信息

           IP                                    NAME

192.168.100.145             rocketmq-nameserver1

192.168.100.145         rocketmq-master1

192.168.100.146            rocketmq-nameserver2

192.168.100.146              rocketmq-master2

# vi /etc/hosts



3.3. 上传解压【两台机器】

# 上传alibaba-rocketmq-3.2.6.tar.gz文件至/usr/local
# tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local
# mv alibaba-rocketmq-3.2.6 rocketmq
ll /usr/local


3.4. 创建存储路径【两台机器】

# mkdir /usr/local/rocketmq/store
# mkdir /usr/local/rocketmq/store/commitlog
# mkdir /usr/local/rocketmq/store/consumequeue
# mkdir /usr/local/rocketmq/store/index


3.5. RocketMQ配置文件【两台机器】--该文件有点类似于redis里面的redis.conf

# vim /usr/local/rocketmq/conf/2m-noslave/broker-a.properties
# vim /usr/local/rocketmq/conf/2m-noslave/broker-b.properties

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128


3.6. 修改日志配置文件【两台机器】

# mkdir -p /usr/local/rocketmq/logs
# cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g'
*.xml
sed命令:比如:sed 's/stringa1/stringa2/g' example.txt 将example.txt文件中的 "string1" 替换成 "string2" 

上面的sed命令表示:cd到 /usr/local/rocketmq/conf目录下,将所有.xml问将中的${user.home}修改为/usr/local/rocketmq

3.7. 修改启动脚本参数【两台机器】
# vim /usr/local/rocketmq/bin/runbroker.sh
#============================================================
==================
# 开发环境JVM Configuration
#============================================================
==================
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"
# vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -
XX:PermSize=128m -XX:MaxPermSize=320m"

注意:rocketMQ最小堆内存是1G。如果设置成512M就会报错。

3.8. 启动NameServer【两台机器】--先启动NameServer然后再启动Broker

# cd /usr/local/rocketmq/bin
# nohup sh mqnamesrv &

nohup sh mqnamesrv &表示,当前用户退出了该服务还是启动着的

如下图所示



通过tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log命令查看日志



3.9. 启动BrokerServer A【192.168.100.145】
# cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

通过nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &  命令启动Broker

也是通过nohup守护启动 并且指定配置文件a.properties 错误的时候输出到



3.10. 启动BrokerServer B【192.168.100.146】
# cd /usr/local/rocketmq/bin
# nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
# netstat -ntlp
# jps
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
# tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

通过命令tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log可以查看日志



上面表示我们的rocketMQ环境已经搭建好了。由于RocketMQ没有像AcvtiveMQ自带管控台,所以我们需要在tomcat中部署rocketmq-console.war





启动tomcat:

/usr/local/tomcat/apache-tomcat-7.0.29/bin/startup.sh


http://192.168.100.145:8080/rocketmq-console


最简单的hello world程序(可以从github直接down下来源码)

package quickstart;

/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**
* Producer,发送消息
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");

producer.start();

for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

producer.shutdown();
}
}
我发了1000条消息到MQ的2个节点中。



消费者:

package quickstart;

/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
* Consumer,订阅消息
*/
public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

消费了五百条数据:



pom.xml的jar包配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>recketmqAPI</groupId>
<artifactId>recketmqAPI</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>recketmqAPI</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.25.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>3.2.6</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>3.2.6</version>
</dependency>

</dependencies>
</project>

3.12. 数据清理

# cd /usr/local/rocketmq/bin

# sh mqshutdown broker

# sh mqshutdown namesrv

# --等待停止

# rm -rf /usr/local/rocketmq/store

# mkdir /usr/local/rocketmq/store

# mkdir /usr/local/rocketmq/store/commitlog

# mkdir /usr/local/rocketmq/store/consumequeue

# mkdir /usr/local/rocketmq/store/index

# --按照上面步骤重启NameServer与BrokerServer
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐