您的位置:首页 > 其它

安装RocketMQ及配置测试

2016-06-19 18:19 369 查看

安装

wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
tar alibaba-rocketmq-3.2.6.tar.gz

cd  alibaba-rocketmq



启动

nohup sh mqnamesrv -n 10.105.23.114:9876 & 

nohup sh mqbroker -n 10.105.23.114:9876

java测试

使用maven构建环境

[html] view
plain copy

 





<!-- http://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->  

<dependency>  

    <groupId>com.alibaba.rocketmq</groupId>  

    <artifactId>rocketmq-client</artifactId>  

    <version>3.2.3</version>  

</dependency>  

[java] view
plain copy

 





package rocketmq;  

  

  

import java.util.Date;  

  

  

import com.alibaba.rocketmq.client.exception.MQClientException;  

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;  

import com.alibaba.rocketmq.client.producer.SendResult;  

import com.alibaba.rocketmq.common.message.Message;  

  

  

public class Producer {  

    public static void main(String[] args) throws MQClientException, InterruptedException {  

        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");  

        producer.setNamesrvAddr("182.254.145.66:9876");  

        producer.setInstanceName("rmq-instance");  

        producer.start();  

        try {  

            for (int i = 0; i < 3; i++) {  

                Message msg = new Message("TopicA-test",// topic  

                    "TagA",// tag  

                        (new Date() + "Hello RocketMQ ,QuickStart" + i)  

                                .getBytes()// body  

                );  

                SendResult sendResult = producer.send(msg);  

            }  

        } catch (Exception e) {  

            e.printStackTrace();  

        }  

        producer.shutdown();  

    }  

}  

  

  

  

  

package rocketmq;  

  

  

import java.util.List;  

  

  

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  

import com.alibaba.rocketmq.client.exception.MQClientException;  

import com.alibaba.rocketmq.common.message.MessageExt;  

  

  

public class Consumer {  

   

    public static void main(String[] args) throws InterruptedException, MQClientException {  

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");  

   

        consumer.setNamesrvAddr("182.254.145.66:9876");  

        consumer.setInstanceName("rmq-instance");  

        consumer.subscribe("TopicA-test", "TagA");  

   

        consumer.registerMessageListener(new MessageListenerConcurrently() {  

            @Override  

            public ConsumeConcurrentlyStatus consumeMessage(  

                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  

                for (MessageExt msg : msgs) {  

                    System.out.println(new String(msg.getBody()));  

                }  

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

            }  

        });  

        consumer.start();  

        System.out.println("Consumer Started.");  

    }  

}  

运行consumer后发现

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <10.105.23.114:10911> failed

在nohup.out里发现 

The broker[localhost, 10.105.23.114:10911] boot success. and name server is 182.254.145.65:9876

哎,看来还是外网内网ip的问题
上次在安装Tair的时候就碰到过类似的问题 详见  Centos7安装Tair及配置测试

最后经过多方搜索,在官方的用户说明里看到下面的方法



经过我修改后的broker.p

[html] view
plain copy

 





namesrvAddr=127.0.0.1:9876  

brokerIP1=182.254.145.66  

brokerName=localhost  

brokerClusterName=DefaultCluster  

brokerId=0  

autoCreateTopicEnable=true  

autoCreateSubscriptionGroup=true  

rejectTransactionMessage=false  

fetchNamesrvAddrByAddressServer=false  

storePathRootDir=/root/store  

storePathCommitLog=/root/store/commitlog  

flushIntervalCommitLog=1000  

flushCommitLogTimed=false  

deleteWhen=04  

fileReservedTime=72  

maxTransferBytesOnMessageInMemory=262144  

maxTransferCountOnMessageInMemory=32  

maxTransferBytesOnMessageInDisk=65536  

maxTransferCountOnMessageInDisk=8  

accessMessageInMemoryMaxRatio=40  

messageIndexEnable=true  

messageIndexSafe=false  

haMasterAddress=  

brokerRole=ASYNC_MASTER  

flushDiskType=ASYNC_FLUSH  

cleanFileForciblyEnable=true  

ok!
这说明什么?说明第一手资料很重要

参考资料

http://www.jialeens.com/archives/681.html
http://www.cnblogs.com/xiaodf/p/5075167.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: