您的位置:首页 > 运维架构 > Linux

Linux搭建RocketMQ集群

2020-01-11 18:21 447 查看
  1. 安装配置jdk
    必须要有jdk环境,jdk必须要64位,1.7以上

  2. 下载安装包
    地址:http://rocketmq.apache.org/release_notes/release-notes-4.5.2/

  3. 解压文件到/usr/local

  4. 进入conf文件
    可以看到有3个文件夹

    2m-2s-async:多Master多Slave模式,异步复制
    2m-2s-sync:多Master多Slave模式,同步双写
    2m-noslave:多Master模式
    这里的配置选择2m-noslave模式,我只配置了两台服务器
    服务器ip分别为:172.16.120.143,172.16.120.144

  5. 创建存储路径
    mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store
    mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store/commitlog
    mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store/consumequeue
    mkdir /usr/local/rocketmq-all-4.5.2-bin-release/store/index

  6. 修改配置文件

    broker-a.properties配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#brokerClusterName=DefaultCluster
#brokerName=broker-a
#brokerId=0
#deleteWhen=04
#fileReservedTime=48
#brokerRole=ASYNC_MASTER
#flushDiskType=ASYNC_FLUSH

# 配置参考官方链接:http://rocketmq.apache.org/docs/rmq-deployment/
# 所属集群名字
brokerClusterName=rocketmq-cluster

# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a

# 0 表示 Master,>0 表示 Slave
brokerId=0

# 删除文件时间点,默认凌晨4点。24小时制,单位小时
deleteWhen=04

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# 文件保留时间,默认 72 小时。根据业务情况调整
fileReservedTime=168

# Broker 对外服务的监听端口
listenPort=10911

# nameServer地址,分号分割,这里写自己需要做集群的两台服务器的地址
namesrvAddr=172.16.120.143:9876;172.16.120.144:9876

# Details:Should be configured if having multiple addresses; Default value:InetAddress for network interface
# 本机ip地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况
下可以人工配置。
brokerIP1=172.16.120.144

#存储路径
storePathRootDir==/usr/local/rocketmq-all-4.5.2-bin-release/store

# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-all-4.5.2-bin-release/store/commitlog

# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketmq-all-4.5.2-bin-release/store/consumequeue

#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index

#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index

# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#限制的消息大小
maxMessageSize=65536

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

broker-b.properties配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#brokerClusterName=DefaultCluster
#brokerName=broker-a
#brokerId=0
#deleteWhen=04
#fileReservedTime=48
#brokerRole=ASYNC_MASTER
#flushDiskType=ASYNC_FLUSH

# 配置参考官方链接:http://rocketmq.apache.org/docs/rmq-deployment/
# 所属集群名字
brokerClusterName=rocketmq-cluster

# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b

# 0 表示 Master,>0 表示 Slave
brokerId=0

# 删除文件时间点,默认凌晨4点。24小时制,单位小时
deleteWhen=04

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# 文件保留时间,默认 72 小时。根据业务情况调整
fileReservedTime=168

# Broker 对外服务的监听端口
listenPort=10911

# nameServer地址,分号分割
namesrvAddr=172.16.120.143:9876;172.16.120.144:9876

# Details:Should be configured if having multiple addresses; Default value:InetAddress for network interface
# 本机ip地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况
下可以人工配置。
brokerIP1=172.16.120.144

#存储路径
storePathRootDir==/usr/local/rocketmq-all-4.5.2-bin-release/store

# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-all-4.5.2-bin-release/store/commitlog

# 消费队列存储路径存储路径
storePathConsumerQueue=/usr/local/rocketmq-all-4.5.2-bin-release/store/consumequeue

#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index

#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.5.2-bin-release/store/index

# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#限制的消息大小
maxMessageSize=65536

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
  1. 修改日志配置文件(两台服务器)
    mkdir -p /usr/local/rocketmq-all-4.5.2-bin-release/logs
    cd /usr/local/rocketmq-all-4.5.2-bin-release/conf && sed -i ‘s#${user.home}#/usr/local/rocketmq-all-4.5.2-bin-release#g’ *.xml

  2. 修改占用内存大小(两台服务器)
    vim /usr/local/rocketmq-all-4.5.2-bin-release/bin/runbroker.sh

  3. 启动NameServer(两台服务器)
    cd /usr/local/rocketmq-all-4.5.2-bin-release/bin
    nohup sh mqnamesrv &

  4. 启动172.16.120.143的BrokerServer
    cd /usr/local/rocketmq-all-4.5.2-bin-release/bin
    nohup sh mqbroker -c /usr/local/rocketmq-all-4.5.2-bin-release/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &

  5. 启动172.16.120.144的BrokerServer
    cd /usr/local/rocketmq-all-4.5.2-bin-release/bin
    nohup sh mqbroker -c /usr/local/rocketmq-all-4.5.2-bin-release/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &

  6. 查看是否启动成功
    输入jps命令,能看到namesrv和broker证明启动成功

  7. 关闭rocketmq
    cd /usr/local/rocketmq-all-4.5.2-bin-release/bin/
    关闭namesrv服务:sh mqshutdown namesrv
    关闭broker服务:sh mqshutdown broker

简单测试

  • 加入依赖

    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
    </dependency>
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.51</version>
    </dependency>
  • 创建生产者

    public class Producer {
    
    public static void main(String[] args) throws MQClientException {
    //1、分组
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    //2、服务器集群地址
    producer.setNamesrvAddr("172.16.120.143:9876;172.16.120.144:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
    for (int i=0;i<10;i++){
    Thread.sleep(1000);
    Message message = new Message("my_topic","tagA",("mytopic"+i).getBytes());
    SendResult result = producer.send(message);
    System.out.println(result.toString());
    }
    }catch (Exception e){
    e.printStackTrace();
    }
    producer.shutdown();
    }
    }
  • 创建消费者

    public class Consumer {
    
    public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.120.143:9876;172.16.120.144:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("my_topic","tagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    for (MessageExt messageExt:list){
    String msgId = messageExt.getMsgId();
    System.out.println("msgId:"+msgId+",body:"+new String(messageExt.getBody()));
    }
    
    //消费状态:1、消费成功;2、消费失败
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    consumer.start();
    System.out.println("Consumer Started");
    
    }
    }

遇到的问题:
启动测试代码的时候报错:org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, my_topic

  • 无法创建topic,Producer没有正确连接到Name Server
  • 我这里是因为没有配置jdk的环境变量,正确配置环境变量且关闭防火墙
  • 在bin目录下执行命令sh mqadmin clusterList -n localhost:9876 如果看到

    证明证明已经连接到nameserver上,再次启动则没有报错了
  • 点赞
  • 收藏
  • 分享
  • 文章举报
粉蒸妹 发布了16 篇原创文章 · 获赞 0 · 访问量 326 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: