您的位置:首页 > 运维架构 > Linux

rocketmq linux-单个配置流程

2017-02-13 10:46 330 查看
一。下载rocketmq源码:https://github.com/alibaba/RocketMQ/releases

二。由于需要编译,linux环境需要安装maven和jdk

三。解压rocketmq,执行./install.sh

四。环境变量:在终端中输入以下命令:vi /etc/profile ,在文件的末尾中添加如下两句话:export rocketmq=/usr/local/RocketMQ-3.5.8  export PATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source /etc/profile.

五。修改nameserv内存大小 /usr/local/RocketMQ-3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/bin runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:PermSize=32m -XX:MaxPermSize=64m"

      修改broker内存大小 /usr/local/RocketMQ-3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/bin runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=32m -XX:MaxPermSize=64m"

六。启动

    进入以下目录:/usr/local/RocketMQ-3.5.8/target/alibaba-rocketmq-broker/alibaba-rocketmq/bin

    启动nameserv :nohup sh mqnamesrv > /usr/local/RocketMQ-3.5.8/logs/namesrv.log 2>&1 &

    启动broker:nohup sh mqbroker -nlocalhost:9876 autoCreateTopicEnable=true > /usr/local/RocketMQ-3.5.8/logs/broker.log 2>&1 &

   如果broker和nameserv在不同服务器,localhost写对应nameserv的ip

启动报错时候:

注释runserver.sh和runbroker.sh以下,并添加java_home地址

#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
#[ ! -e "$JAVA_HOME/bi
4000
n/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME="/usr/local/jdk8"

export JAVA="$JAVA_HOME/bin/java"

export BASE_DIR=$(dirname $0)/..

export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

六。java实例

pom文件中添加rocketmq依赖

<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>

customer.java

public class ConsumerTest01 {
/** 

     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br> 

     * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br> 

     */  

    public static void main(String[] args) {  

  

        /** 

         * 注意:ConsumerGroupName需要由应用来保证唯一 

         */  

        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");  

        //pushConsumer.setNamesrvAddr("192.168.180.1:9876");  

        pushConsumer.setNamesrvAddr("120.77.61.47:9876");  

        pushConsumer.setInstanceName("Consumer");  

        try {  

            /** 

             * 订阅指定topic下tags分别等于TagA或TagC或TagD 

             * 两个参数:第一个参数是topic第二个参数是tags 

             */  

            pushConsumer.subscribe("TopicTest1", "TagA || TagC || TagD");  

            /** 

            * 订阅指定topic下所有消息<br> 

            * 注意:一个consumer对象可以订阅多个topic 

            */  

            //pushConsumer.subscribe("TopicTest2", "*");  

            pushConsumer.registerMessageListener(new MessageListenerConcurrently() {  

                @Override  

                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  

                                                                ConsumeConcurrentlyContext consumeConcurrentlyContext) {  

                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());  

                    MessageExt messageExt = msgs.get(0);  

                    if("TopicTest1".equals(messageExt.getTopic())){  

                        // 执行TopicTest1的消费逻辑  

                        if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {  

                            // 执行TagA的消费  

                            System.out.println(new String(messageExt.getBody()));  

                        }else if(messageExt.getTags() != null && messageExt.getTags().equals("TagB")){  

                            System.out.println(new String(messageExt.getBody()));  

                        }else if(messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {  

                            System.out.println(new String(messageExt.getBody()));  

                        }  

                    }else if("TopicTest2".equals(messageExt.getTopic())){  

                        System.out.println(new String(messageExt.getBody()));  

                    }  

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  

                }  

            });  

        } catch (MQClientException e) {  

            e.printStackTrace();  

        }  

        /** 

         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> 

         */  

        try {  

            pushConsumer.start();  

        } catch (MQClientException e) {  

            e.printStackTrace();  

        }  

        System.out.println("Consumer Started.");  

    }  

}

producer.java

public class ProducerTest01 {
public static void main(String[] args) {  
 
       /** 
        * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
        * 注意:ProducerGroupName需要由应用来保证唯一<br> 
        * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, 
        * 因为服务器会回查这个Group下的任意一个Producer 
        */  
       DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
       //producer.setNamesrvAddr("192.168.180.1:9876");  
       producer.setNamesrvAddr("120.77.61.47:9876");  
       producer.setInstanceName("Producer");  
       /** 
        * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
        * 注意:切记不可以在每次发送消息时,都调用start方法 
        */  
       try {  
           producer.start();  
       } catch (MQClientException e) {  
           e.printStackTrace();  
       }  
       for (int i = 0; i < 10; i++) {  
           try {  
               /** 
                * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 
                * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br> 
                * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br> 
                * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 
                */  
               {  
                   Message msg = new Message("TopicTest1",// topic  
                           "TagA",// tag  
                           "OrderID001",// key  
                           ("fuck U").getBytes());// body  
                   SendResult sendResult = producer.send(msg);  
                   System.out.println(sendResult);  
               }  
 
               {  
                   Message msg = new Message("TopicTest2",  
                           "TagB",  
                           "OrderID001",  
                           ("Hello MetaQ TagB".getBytes()));  
 
                   SendResult sendResult = producer.send(msg);  
                   System.out.println(sendResult);  
               }  
 
               {  
                   Message msg = new Message("TopicTest3",  
                           "TagC",  
                           "OrderID001",  
                           ("Hello MetaQ TagC").getBytes());  
 
                   SendResult sendResult = producer.send(msg);  
 
                   System.out.println(sendResult);  
               }  
 
               TimeUnit.MILLISECONDS.sleep(1000);  
 
           } catch (MQClientException e) {  
               e.printStackTrace();  
           } catch (InterruptedException e) {  
               e.printStackTrace();  
           } catch (RemotingException e) {  
               e.printStackTrace();  
           } catch (MQBrokerException e) {  
               e.printStackTrace();  
           }  
       }  
       /** 
        * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 
        * 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法 
        */  
       producer.shutdown();  
   }  

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: