您的位置:首页 > 运维架构 > Linux

Linux安装Rocketmq(无忧爬坑指南^_^)

2017-04-17 00:00 423 查看

rocketmq安装前请先确定Linux的jdk和maven配置正确无误。

本人配置地址> https://my.oschina.net/gentlelions/blog/880655

一:RocketMQ简介

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

1.能够保证严格的消息顺序

2.提供丰富的消息拉取模式

3.高效的订阅者水平扩展能力

4.实时的消息订阅机制

5.亿级消息堆积能力

二:安装RocketMQ

下载地址为:> https://github.com/alibaba/RocketMQ/releases
或者 wget https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz

请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。

下载完成后解压放到 /usr/local/java/ 目录下
进入bin目录,执行

install.sh
或者
install.bat

进行编译。
在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/java/rocketmq中。
如果你不想编译的话,可以从这里下载编译之后的rocketmq[rocketmq-3.5.8]。

配置环境变量

gedit /etc/profile

在末尾加上:

#rocketmq
export rocketmq=/usr/local/java/alibaba-rocketmq3.5.8
export PATH=$PATH:$rocketmq/bin

更新配置:

source /etc/profile

三:启动RocketMQ

修改RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略):

cd /usr/local/java/alibaba-rocketmq/bin

根据个人情况修改如下配置:

vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"

vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"

启动mqnameserver

nohup sh mqnamesrv > /usr/local/java/alibaba-rocketmq3.5.8/logs/namesrv.log 2>&1 &

[b]启动mqbroker[/b]

nohup sh mqbroker -n 192.168.27.129:9876 autoCreateTopicEnable=true > /usr/local/java/alibaba-rocketmq3.5.8/logs/broker.log 2>&1 &

关闭命令:

mqshutdown broker | namesrv

RocketMQ相关命令查阅地址: > http://blog.csdn.net/zhu_tianwei/article/details/40951301

通过 ps aux | grep java命令来查看启动的情况

使用telnet测试端口开启情况:

telnet 192.168.27.129 9876

如果端口9876连接失败,防火墙开启端口命令:

firewall-cmd --zone=public --add-port=9876/tcp --permanent

[b]注意[/b]:
当防火墙开启的时候,通过程序启动producer会报client Exception异常,如果引用的是3.2.6版本会出现xxxx:10909failed字样,需要开启10911端口。3.5.8版本则出现xxxx:10911failed字样,需要开启10911端口,具体为什么我不清楚。如果关闭防火墙则不用开启此端口。
还有就是工程引用的jar包最好和Linux下的包相同,减少不必要的麻烦。(由于3.5.0以上版本不能自动创建topic,即使在启动broker的时候设置了autoCreateTopicEnable=true也是不行的。需要在程序内使用producer.setCreateTopicKey("xxtopic")(未经本人检验) 。

firewall-cmd --reload

防火墙操作相关命令请进firewall-cmd命令 查看。

四:编写测试案例

引用的jar包:

<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.2.6</version>
<type>pom</type>
</dependency>

public class ProducerTest {
public static void main(String[] args) throws MQClientException,
InterruptedException {
/**
* ProducerGroupName保证唯一
*/
final DefaultMQProducer producer = new DefaultMQProducer(
"ProducerGroupName");
//nameserver服务,多个以;分开
producer.setNamesrvAddr("192.168.27.129:9876");
producer.setInstanceName("Producer");
//      producer.setCreateTopicKey("TopicTest1");
//      producer.setVipChannelEnabled(false);

/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();

/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
for (int i = 0; i < 10; i++) {
try {
{      //通过topic订阅消息,tag过滤消息
Message msg = new Message("TopicTest1",// topic
"TagA",// tag 消息标签,只支持设置一个Tag(服务端消息过滤使用)
"OrderID001",// key 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用)
("Hello MetaQA").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}

{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQB").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}

{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQC").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(1000);
}

/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
System.exit(0);
}
}

public class ConsumerTest {
/**
* 当前例子是Consumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际Consumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
*
* @throws MQClientException
*/
public static void main(String[] args) throws MQClientException {
/**
* 注意:ConsumerGroupName需要由应用来保证唯一 ,最好使用服务的包名区分同一服务,一类Consumer集合的名称,这类Consumer通常消费一类消息,且消费逻辑一致
* PushConsumer:Consumer的一种,应用通常向Consumer注册一个Listener监听器,Consumer收到消息立刻回调Listener监听器
* PullConsumer:Consumer的一种,应用通常主动调用Consumer的拉取消息方法从Broker拉消息,主动权由应用控制
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroupName");
// //nameserver服务
consumer.setNamesrvAddr("192.168.27.129:9876");
consumer.setInstanceName("Consumber");
//      consumer.setVipChannelEnabled(false);
//设置批量消费个数
//consumer.setConsumeMessageBatchMaxSize(10);

/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
///接收消息个数msgs.size()
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 执行TagC的消费
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagD")) {
// 执行TagD的消费
System.out.println(new String(msg.getBody()));
}
} else if (msg.getTopic().equals("TopicTest2")) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("ConsumerStarted.");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Linux RocketMQ