您的位置:首页 > 其它

使用ActiveMQ 发送/接收消息

2013-06-22 22:19 344 查看
本篇主要讲解在未使用其他框架(Spring)整合情况下,独立基于ActiveMQ,使用JMS规范进行消息通信。

     

     一.JMS回顾

       因为ActiveMQ是一个JMS Provider的实现,因此在开始实作前,有必要复习下JMS的基础知识

    Java Message Service (JMS)是sun提出来的为J2EE提供企业消息处理的一套规范,JMS目前有2套规范还在使用JMS 1.0.2b和1.1. 1.1已经成为主流的JMS Provider事实上的标准了.

      *1.1主要在session上面有一些重要改变,比如支持建立同一session上的transaction,让他支持同时发送P2P(Queue)消息和接受

Topic消息。

       

       在JMS中间主要定义了2种消息模式Point-to-Point (点对点),Publich/Subscribe Model (发布/订阅者),

    其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式。

     

     下面是JMS规范基本的接口和实现

     JMS Common Interfacse PTP-Specific Interface   Pub/Sub-specific interfaces

     ConnectionFactory     QueueConnectionFactory   TopicConnectionFactory

     Connection            QueueConnection          TopicConnection

     Destination           Queue                    Topic

     Session               QueueSession             TopiSession

     MessageProducer       QueueSender              TopicPublisher

     MessageConsumer       QueueReceiver/QueueBrwer TopicSubscriber

     二.使用Queue

         下面以ActiveMQ example的代码为主进行说明

         

        1. 使用ActiveMQ的Connection,ConnectionFactory 建立连接,注意这里没有用到pool

       

java 代码

import org.apache.activemq.ActiveMQConnection   

import org.apache.activemq.ActiveMQConnectionFactory   

        //建立Connection

java 代码

protected Connection createConnection() throws JMSException, Exception {
  

     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);   

     Connection connection = connectionFactory.createConnection();   

     if (durable && clientID!=null) {
  

         connection.setClientID(clientID);   

     }   

     connection.start();   

     return connection;   

    }  

        //建立Session

  

java 代码

protected Session createSession(Connection connection) throws Exception {
  

         Session session = connection.createSession(transacted, ackMode);   

         return session;   

        }   

        2。发送消息的代码

 //建立QueueSession

 

java 代码

protected MessageProducer createProducer(Session session) throws JMSException {
  

        Destincation destination = session.createQueue("queue.hello");   

        MessageProducer producer = session.createProducer(destination);   

        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   

           

        if( timeToLive!=0 )   

            producer.setTimeToLive(timeToLive);   

        return producer;   

        }   

         //使用Producer发送消息到Queue

    

java 代码

producer.send(message);   

       

        3。接受消息,在JMS规范里面,你可以使用

  

java 代码

QueueReceiver/QueueBrowser直接接受消息,但是更多的情况下我们采用消息通知方式,即实现MessageListener接口   

 public void onMessage(Message message) {
  

 //process message   

 }   

          

 //set MessageListner ,receive message   

 Destincation destination = session.createQueue("queue.hello");   

 consumer = session.createConsumer(destination);   

 consumer.setMessageListener(this);   

       

        以上就是使用jms queue发送接受消息的基本方式

  

     三 Topic

        1. 建立连接

   

java 代码

protected Connection createConnection() throws JMSException, Exception {   
  

        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);      

        Connection connection = connectionFactory.createConnection();      

        //如果你要使用DurableSubScription 方式,你必须为connection设置一个ClientID      

        if (durable && clientID!=null) {   
  

            connection.setClientID(clientID);      

        }      

        connection.start();      

        return connection;      

       }      

       2. 建立Session

java 代码

protected Session createSession(Connection connection) throws Exception {   
  

        Session session = connection.createSession(transacted, ackMode);      

        return session;      

        }    

 3.创建Producer 发送消息到Topic   

       

java 代码

//create topic on  session   

       topic = session.createTopic("topic.hello");   

 producer = session.createProducer(topic);   

       //send message    

       producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);   

 producer.send(message);   

 4.创建Consumer接受消息(基本上和Queue相同)

java 代码

Destincation destination  = session.createTopic("topic.hello");      

MessageConsumer consumer = session.createConsumer(destination);      

consumer.setMessageListener(this);      

           

     //如果你使用的是Durable Subscription方式,你必须在建立connection的时候      

     //设置ClientID,而且建立comsumer的时候使用createDurableSubscriber方法,为他指定一个consumerName。      

 //connection.setClientID(clientId);      

 //consumer = session.createDurableSubscriber((Topic) destination, consumerName);   

       

 四:连接ActiveMQ的方式

        ActiveMQConnectionFactory 提供了多种连接到Broker的方式activemq.apache.org/uri-protocols.html

 常见的有

 vm://host:port     //vm 

 tcp://host:port    //tcp

 ssl://host:port    //SSL

 stomp://host:port  //stomp协议可以跨语言,目前有很多种stomp client 库(java,c#,c/c++,ruby,python...);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  JMS ActiveMQ