您的位置:首页 > 其它

RocketMQ单机部署及使用

2017-05-07 13:36 411 查看

1.开发环境:

  

   操作系统:CentOS7  IP:192.168.1.212

   JVM:jdk1.8.0_111

   其他:maven3.5.0,Git

2.安装步骤

   

    git下载rocketMq安装包

> git clone -b develop https://github.com/apache/incubator-rocketmq.git[/code] 
   下载后可以看到有incubator-rocketmq,进该文件夹中

> cd incubator-rocketmq

  
  使用mvn命令打包安装软件

mvn -Prelease-all -DskipTests clean install -U


   第一次安装下载jar比较慢,看到如下结果就成功了,



   进入目标编译后的Apache-rocketmq文件夹

> cd distribution/target/apache-rocketmq


   进入目标编译后的Apache-rocketmq文件夹

> cd distribution/target/apache-rocketmq

  
   启动name   server

nohup sh bin/mqnamesrv &


    
   查看是否启动成功   

tail -f ~/logs/rocketmqlogs/namesrv.log


若前面启动提示: nohup: ignoring input and appending output to ‘nohup.out’,则直接查看当前nohup.out即可。如下提示启动成功



     
   启动broker,并查看日志

>nohup sh bin/mqbroker -n localhost:9876 &

>tail -f ~/logs/rocketmqlogs/broker.log




   简易生产消费实例:

public static void sendMsg(){

/*
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.1.212:9876");
try {
producer.start();
/*
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,

* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,

* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
for(int i=0;i<200;i++){
Message msg = new Message(
"rocketTest",                   // topic
"TagA",                         // tag
"OrderID00"+i,                  // key
("Hello MetaQ"+i).getBytes());  // body
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
//logger.info("sendResult:{}", sendResult);
}
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
/*
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}

public static void receiveMsg(){

// 获取消息生产者
//DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("192.168.1.212:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 订阅主体
try {

consumer.subscribe("rocketTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

/**
* * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
public ConsumeConcurrentlyStatus consumeMessage(
List msgs, ConsumeConcurrentlyContext context) {

logger.info("currentThreadName:{} and Receive New Messages:{}",Thread.currentThread().getName(),msgs);

MessageExt msg = msgs.get(0);

if (msg.getTopic().equals("rocketTest")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
logger.info("MsgBody:{}",new String(msg.getBody()));
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 执行TagC的消费
} else if (msg.getTags() != null
&& msg.getTags().equals("TagD")) {
// 执行TagD的消费
}
} else if (msg.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可

*/
consumer.start();

logger.info("Consumer Started.");
} catch (MQClientException e) {
e.printStackTrace();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RocketMQ