Linux安装Rocketmq(无忧爬坑指南^_^)
2017-04-17 00:00
423 查看
rocketmq安装前请先确定Linux的jdk和maven配置正确无误。
本人配置地址> https://my.oschina.net/gentlelions/blog/880655一:RocketMQ简介
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的订阅者水平扩展能力
4.实时的消息订阅机制
5.亿级消息堆积能力
二:安装RocketMQ
下载地址为:> https://github.com/alibaba/RocketMQ/releases或者 wget https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz
请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。
下载完成后解压放到 /usr/local/java/ 目录下
进入bin目录,执行
install.sh 或者 install.bat
进行编译。
在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/java/rocketmq中。
如果你不想编译的话,可以从这里下载编译之后的rocketmq[rocketmq-3.5.8]。
配置环境变量
gedit /etc/profile
在末尾加上:
#rocketmq export rocketmq=/usr/local/java/alibaba-rocketmq3.5.8 export PATH=$PATH:$rocketmq/bin
更新配置:
source /etc/profile
三:启动RocketMQ
修改RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略):cd /usr/local/java/alibaba-rocketmq/bin
根据个人情况修改如下配置:
vim runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
vim runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
启动mqnameserver
nohup sh mqnamesrv > /usr/local/java/alibaba-rocketmq3.5.8/logs/namesrv.log 2>&1 &
[b]启动mqbroker[/b]
nohup sh mqbroker -n 192.168.27.129:9876 autoCreateTopicEnable=true > /usr/local/java/alibaba-rocketmq3.5.8/logs/broker.log 2>&1 &
关闭命令:
mqshutdown broker | namesrv
RocketMQ相关命令查阅地址: > http://blog.csdn.net/zhu_tianwei/article/details/40951301
通过 ps aux | grep java命令来查看启动的情况
使用telnet测试端口开启情况:
telnet 192.168.27.129 9876
如果端口9876连接失败,防火墙开启端口命令:
firewall-cmd --zone=public --add-port=9876/tcp --permanent
[b]注意[/b]:
当防火墙开启的时候,通过程序启动producer会报client Exception异常,如果引用的是3.2.6版本会出现xxxx:10909failed字样,需要开启10911端口。3.5.8版本则出现xxxx:10911failed字样,需要开启10911端口,具体为什么我不清楚。如果关闭防火墙则不用开启此端口。
还有就是工程引用的jar包最好和Linux下的包相同,减少不必要的麻烦。(由于3.5.0以上版本不能自动创建topic,即使在启动broker的时候设置了autoCreateTopicEnable=true也是不行的。需要在程序内使用producer.setCreateTopicKey("xxtopic")(未经本人检验) 。
firewall-cmd --reload
防火墙操作相关命令请进firewall-cmd命令 查看。
四:编写测试案例
引用的jar包:<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.2.6</version> <type>pom</type> </dependency>
public class ProducerTest { public static void main(String[] args) throws MQClientException, InterruptedException { /** * ProducerGroupName保证唯一 */ final DefaultMQProducer producer = new DefaultMQProducer( "ProducerGroupName"); //nameserver服务,多个以;分开 producer.setNamesrvAddr("192.168.27.129:9876"); producer.setInstanceName("Producer"); // producer.setCreateTopicKey("TopicTest1"); // producer.setVipChannelEnabled(false); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); /** * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br> * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br> * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 */ for (int i = 0; i < 10; i++) { try { { //通过topic订阅消息,tag过滤消息 Message msg = new Message("TopicTest1",// topic "TagA",// tag 消息标签,只支持设置一个Tag(服务端消息过滤使用) "OrderID001",// key 消息关键词,多个Key用KEY_SEPARATOR隔开(查询消息使用) ("Hello MetaQA").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQB").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello MetaQC").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { producer.shutdown(); } })); System.exit(0); } }
public class ConsumerTest { /** * 当前例子是Consumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br> * 但是实际Consumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br> * * @throws MQClientException */ public static void main(String[] args) throws MQClientException { /** * 注意:ConsumerGroupName需要由应用来保证唯一 ,最好使用服务的包名区分同一服务,一类Consumer集合的名称,这类Consumer通常消费一类消息,且消费逻辑一致 * PushConsumer:Consumer的一种,应用通常向Consumer注册一个Listener监听器,Consumer收到消息立刻回调Listener监听器 * PullConsumer:Consumer的一种,应用通常主动调用Consumer的拉取消息方法从Broker拉消息,主动权由应用控制 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ConsumerGroupName"); // //nameserver服务 consumer.setNamesrvAddr("192.168.27.129:9876"); consumer.setInstanceName("Consumber"); // consumer.setVipChannelEnabled(false); //设置批量消费个数 //consumer.setConsumeMessageBatchMaxSize(10); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 订阅指定topic下所有消息<br> * 注意:一个consumer对象可以订阅多个topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { ///接收消息个数msgs.size() System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 执行TagC的消费 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 执行TagD的消费 System.out.println(new String(msg.getBody())); } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("ConsumerStarted."); } }
相关文章推荐
- 新版mysql+apache+php Linux安装指南
- 新版mysql+apache+php Linux安装指南
- linux入门教程(2)---安装指南
- linux安装mysql指南
- 新版mysql+apache+php Linux安装指南
- Zeus+php+Zend Optimizer for Linux安装指南
- LINUX下MYSQL完全安装使用指南
- 新版mysql+apache+phpLinux安装指南
- Linux 下安装subversion 详细指南
- Gforge for Postgresql 安装指南(RH Linux 4.4 32bits)
- mrtg的安装指南(linux平台下)
- 最新版 linux Fedora 下apache2.2 +mysql5.1.5 +php5.12 +GD +mod_limitipconn... 安装指南
- 新版mysql+apache+php Linux安装指南
- mysql+apache+php Linux安装指南
- Linux下常用软件安装指南
- Oracle92 for Linux安装指南
- linux入门教程 第2章 安装指南
- linux下Mplayer安装与设置指南(以及如何加载显示中文字幕)