您的位置:首页 > 其它

RocketMQ3.4.6 安装与测试环境搭建

2016-08-18 13:47 453 查看
第一步:先安装jdk, maven等工具

vim /etc/profile 添加

JAVA_HOME=/usr/java/jdk1.8.0_65
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
M2_HOME=/usr/local/apache-maven-3.3.9
PATH=$M2_HOME/bin:$JAVA_HOME/bin:$PATH
export JAVA_HOME
export CLASSPATH
export M2_HOME
export PATH

source /etc/profile

测试: mvn -v ; mvn help:system

第二步:安装RocketMQ
https://github.com/alibaba/RocketMQ最新版本RocketMQ-3.4.6.tar.gz
tar -xvzf RocketMQ-3.4.6.tar.gz

cat install.sh

执行sh install.sh



可以看到安装成功了。

cd /root/RocketMQ-3.4.6/devenv/bin

chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv

nohup ./mqnamesrv >/var/log/mqname.log &

export NAMESRV_ADDR=192.168.26.164:9876

nohup ./mqbroker >/var/log/mqbroker.log &

另外启动方式:

nohup ./mqnamesrv >/var/log/mqname.log 2>&1 &
nohup ./mqbroker -n "192.168.26.164:9876" -c ../conf/2m-noslave/broker-b.properties >/var/log/mqbroker.log 2>&1 &

nohup ./mqnamesrv >/var/log/mqname.log 2>&1 &
nohup ./mqbroker -n "192.168.26.164:9876" > /var/log/mqbrober.log 2>&1 &

第三步:测试RocketMQ环境

新建一个maven的java项目。

在pom.xml中添加

<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.5</version>
</dependency>

编写Productor.java

package com.test.rocketmqtest;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Productor {

public static void main(String[] args) {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setVipChannelEnabled(false);
producer.setNamesrvAddr("192.168.26.164:9876");

try {
producer.start();

Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes());

SendResult result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());

msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes());

result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());

msg = new Message("PullTopic", "pull", "1", "Just for test.".getBytes());

result = producer.send(msg);
System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}

}

编写Consumer.java

package com.test.rocketmqtest;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {

public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr("192.168.26.164:9876");
try {
// 订阅PushTopic下Tag为push的消息
consumer.subscribe("PushTopic", "push");
// 程序第一次启动从消息队列头取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext Context) {
Message msg = list.get(0);
System.out.println(msg.toString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}

}

结果如图:



运行Productor



运行Consumer

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: