您的位置:首页 > 其它

ActiveMQ 入门及实例

2015-10-20 14:57 190 查看
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 

ActiveMQ特性列表 

1. 多种语言和协议编写客户端。语言: Java、 C、 C++、 C#、 Ruby、 Perl、 Python、 PHP。应用协议: OpenWire、Stomp REST、WS Notification、XMPP、AMQP ;
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化、XA消息、事务);
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性;
4. 通过了常见J2EE服务器(如 Geronimo、JBoss 、 GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上;
5. 支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA;
6. 支持通过JDBC和journal提供高速的消息持久化;
7. 从设计上保证了高性能的集群、客户端-服务器、点对点;
8. 支持Ajax;
9. 支持与Axis的整合;
10. 可以很容易得调用内嵌JMS provider,进行测试;

JMS基本概念 

JMS(Java Message Service) 即Java消息服务。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P 模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。 
对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。
与点到点模型不同,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。

基于JMS编程思路 

消息产生者向JMS发送消息的步骤
(1) 创建连接使用的工厂类JMS ConnectionFactory;
(2) 使用管理对象JMS ConnectionFactory建立连接Connection;
(3) 使用连接Connection 建立会话Session;
(4) 使用会话Session和管理对象Destination创建消息生产者MessageSender;
(5) 使用消息生产者MessageSender发送消息。
消息消费者从JMS接受消息的步骤 
(1) 创建连接使用的工厂类JMS ConnectionFactory;
(2) 使用管理对象JMS ConnectionFactory建立连接Connection;
(3) 使用连接Connection 建立会话Session;
(4) 使用会话Session和管理对象Destination创建消息消费者MessageReceiver;
(5) 使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver;
消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

下载安装ActiveMQ 

去官方网站下载:http://activemq.apache.org/
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。(注意:要求先安装配置好本机的JVM环境)
启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

ActiveMQ JMS代码实现 

MessageReceiver

package jms.activemq.myexample;
 
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class MessageReceiver implements Runnable {
    private String url;
    private String user;
    private String password;
    private final String QUEUE;
 
    public MessageReceiver(String queue, String url, String user, String password) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.QUEUE = queue;
    }
 
    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        Session session = null;
        Destination receiveQueue;
        try {
            Connection connection = connectionFactory.createConnection();
 
            session = connection
                    .createSession(true, Session.SESSION_TRANSACTED);
            receiveQueue = session.createQueue(QUEUE);
            MessageConsumer consumer = session.createConsumer(receiveQueue);
 
            connection.start();
 
            while (true) {
                Message message = consumer.receive();
 
                if (message instanceof TextMessage) {
                    TextMessage receiveMessage = (TextMessage) message;
                    System.out.println("我是Receiver,收到消息如下: \r\n"
                            + receiveMessage.getText());
                } else {
                    session.commit();
                    break;
                }
 
            }
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
    public String getUrl() {
        return url;
    }
 
    public void setUrl(String url) {
        this.url = url;
    }
 
    public String getUser() {
        return user;
    }
 
    public void setUser(String user) {
        this.user = user;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    } 
 
}


MessageSender

package jms.activemq.myexample;
 
import java.util.Date;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
  
public class MessageSender implements Runnable {
     
    private String url;
    private String user;
    private String password;
    private final String QUEUE;
 
    public MessageSender(String queue, String url, String user, String password) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.QUEUE = queue;
    }
 
    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                user, password, url);
        Session session = null;
        Destination sendQueue;
        Connection connection = null;
 
        int messageCount = 0;
        try {
            connection = connectionFactory.createConnection();
 
            connection.start();
 
            while (true) {
                session = connection.createSession(true,
                        Session.SESSION_TRANSACTED);
 
                sendQueue = session.createQueue(QUEUE);
                MessageProducer sender = session.createProducer(sendQueue);
                TextMessage outMessage = session.createTextMessage();
                outMessage.setText(new Date() + "现在发送是第" + messageCount + "条消息");
 
                sender.send(outMessage);
 
                session.commit();
 
                sender.close();
 
                if ((++messageCount) == 10) {
                    // 发够十条消息退出
                    break;
                }
                Thread.sleep(1000);
            }
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
    public String getUrl() {
        return url;
    }
 
    public void setUrl(String url) {
        this.url = url;
    }
 
    public String getUser() {
        return user;
    }
 
    public void setUser(String user) {
        this.user = user;
    }
 
    public String getPassword() {
        return password;
    }
 
    public void setPassword(String password) {
        this.password = password;
    }
 
}


MyActiveMQDemo

package jms.activemq.myexample;
 
public class MyActiveMQDemo {
    public static void main(String[] args) {
        String url = "tcp://localhost:61616";
        String user = null;
        String password = null;
        String query = "MyQueue";         
         
        new Thread(new MessageSender(query,url,user,password), "Test-Sender").start();
        new Thread(new MessageReceiver(query,url,user,password), "Test-Receiver").start();
    }
}

可以把上面获得连接的通用部分提取出来,构造一个专门的类来管理


import java.io.IOException;  
import java.io.InputStream;  
import java.util.Properties;  
  
import javax.jms.Connection;  
import javax.jms.JMSException;  
import javax.jms.MessageConsumer;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
  
import org.apache.activemq.ActiveMQConnectionFactory;  
  
/** 
 * 管理服务器连接 
 *  
 */  
  
public class ConnectionManager {  
  
    private String url;                 //远程服务器地址   
    private String queue;               //消息队列名  
    private String username;                //用于连接服务器的用户名  
    private String password;                //用于连接服务器的密码  
    private final String URL = "url";           //properties文件中url的key  
    private final String QUEUE = "queue";       //properties文件中query的key  
    private final String USERNAME = "username"; //properties文件中username的key  
    private final String PASSWORD = "password"; //properties文件中password的key  
    private final String FILEPATH = "/connection.properties";  
      
    /** 
     * 初始化连接字符串 
     * @return 是否初始化成功 
     */  
    public Boolean init() {  
        Properties properties = new Properties();  
        InputStream inputStream = Object.class.getResourceAsStream(FILEPATH);  
        if (inputStream != null) {  
            try {  
                properties.load(inputStream);  
                url = properties.getProperty(URL);  
                queue = properties.getProperty(QUEUE);  
                username = properties.getProperty(USERNAME);  
                password = properties.getProperty(PASSWORD);  
                inputStream.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
            return true;  
        } else {  
        }  
        return false;  
    }  
      
    /** 
     * 与远程服务器取得连接 
     * @param url           服务器连接地址 
     * @param username      用户名 
     * @param password      密码 
     * @return              连接对象 
     * @throws JMSException JMS异常S 
     */  
    public Connection getConnection() {  
        Connection connection = null;  
        if (init()) {  
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);  
            try {  
                connection = factory.createConnection(username, password);  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
        }  
        return connection;  
    }  
      
    /** 
     * 关闭连接 
     * @param connection    与远程服务器连接 
     * @throws JMSException JMS异常 
     */  
    public void closeConnection(Connection connection, Session session,  
            MessageProducer producer, MessageConsumer consumer) {  
        try {  
            if (producer != null) {  
                producer.close();  
            }  
            if (consumer != null) {  
                consumer.close();  
            }  
            if (session != null) {  
                session.close();  
            }  
            if (connection != null) {  
                connection.close();  
            }  
        } catch (JMSException e) {  
            e.printStackTrace();  
        }  
    }  
  
    /** 
     * 取得队列名 
     * @return 队列名 
     */  
    public String getQueue() {  
        init();  
        return queue;  
    }  
  
}  


Spring +ActiveMQ 的JMS代码实现 


所需jar 

activemq-all-5.6.0.jar
commons-logging-1.1.1.jar
log4j-1.2.17.jar
org.springframework.asm-3.1.0.RELEASE.jar
org.springframework.beans-3.1.0.RELEASE.jar
org.springframework.context-3.1.0.RELEASE.jar
org.springframework.core-3.1.0.RELEASE.jar
org.springframework.expression-3.1.0.RELEASE.jar
org.springframework.jms-3.1.0.RELEASE.jar
org.springframework.transaction-3.1.0.RELEASE.jar

spring配置文件applicationContext.xml 



这时主要是配置activemq服务信息与实现springjms的对应接口 


消息产生者 

Java代码 

package test; 

import javax.jms.JMSException; 

import javax.jms.Message; 

import javax.jms.Session; 

 

import org.springframework.jms.core.MessageCreator; 

 

/**

 * 消息产生者

 * User

 * Time: 12-6-14 上午11:31

 */ 

public class MyMessageCreator implements MessageCreator { 

    public int n = 0; 

    private static String str1 = "这个是第 "; 

    private static String str2 = " 个测试消息!"; 

    private String str = ""; 

    @Override 

    public Message createMessage(Session paramSession) throws JMSException { 

        System.out.println("MyMessageCreator  n=" + n); 

        if (n == 9) { 

            //在这个例子中表示第9次调用时,发送结束消息 

            return paramSession.createTextMessage("end"); 

        } 

        str = str1 + n + str2; 

        return paramSession.createTextMessage(str); 

    } 

发送消息方 

Java代码 

package test; 

import javax.jms.Destination; 

 

import org.springframework.context.ApplicationContext; 

import org.springframework.context.support.ClassPathXmlApplicationContext; 

import org.springframework.jms.core.JmsTemplate; 

/**

 * 发送消息方

 * User

 * Time: 12-6-14 上午11:29

 */

public class MessageSender extends Thread {

    public static void main(String args[]) throws Exception {

        String[] configLocations = new String[] {"test/applicationContext.xml"};

        ApplicationContext context = new ClassPathXmlApplicationContext(configLocations);

        JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");

        Destination destination = (Destination) context.getBean("destination");

        for (int i = 1; i < 100; i++) {

            System.out.println("发送 i=" + i);

            //消息产生者

            MyMessageCreator myMessageCreator = new MyMessageCreator();

            myMessageCreator.n = i;

            jmsTemplate.send(destination, myMessageCreator);

            sleep(10000);//10秒后发送下一条消息

        }

    }

}


消息接收方 

Java代码 

package test; 

import javax.jms.Destination; 

import javax.jms.TextMessage; 

 

import org.springframework.context.ApplicationContext; 

import org.springframework.context.support.ClassPathXmlApplicationContext; 

import org.springframework.jms.core.JmsTemplate; 

/**

 * 消息接收方

 * User

 * Time: 12-6-14 上午11:32

 */ 

public class MessageReciver{ 

    public static void main(String args[]) throws Exception { 

        String[] configLocations = new String[] {"test/applicationContext.xml"}; 

        ApplicationContext context = new ClassPathXmlApplicationContext(configLocations); 

 

        JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 

        Destination destination = (Destination) context.getBean("destination"); 

 

        TextMessage msg = null; 

        //是否继续接收消息 

        boolean isContinue = true; 

        while (isContinue) { 

            msg = (TextMessage) jmsTemplate.receive(destination); 

            System.out.println("收到消息 :" + msg.getText()); 

            if (msg.getText().equals("end")) { 

                isContinue = false; 

                System.out.println("收到退出消息,程序要退出!"); 

            } 

        } 

        System.out.println("程序退出了!"); 

    } 

}


来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/9399028/viewspace-1815168/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/9399028/viewspace-1815168/

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: