您的位置:首页 > 其它

JMS ActiveMQ简单介绍+简单实例

2014-01-05 17:54 295 查看
1. JMS基本概念

JMS(Java Message Service) 即Java消息服务。它提供标准的产生、发送、接收消息的接口简化
企业
应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。P2P 模型规定了一个消息只能有一个接收者;Pub/Sub 模型允许一个消息可以有多个接收者。

对于点到点模型,消息生产者产生一个消息后,把这个消息发送到一个Queue(队列)中,然后消息接收者再从这个Queue中读取,一旦这个消息被一个接收者读取之后,它就在这个Queue中消失了,所以一个消息只能被一个接收者消费。

与点到点模型不同,发布/订阅模型中,消息生产者产生一个消息后,把这个消息发送到一个Topic中,这个Topic可以同时有多个接收者在监听,当一个消息到达这个Topic之后,所有消息接收者都会收到这个消息。

2.编程的结构

2.1消息产生者向JMS发送消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)使用消息生产者MessageSender发送消息
2.2消息消费者从JMS接受消息的步骤
(1)创建连接使用的工厂类JMS ConnectionFactory
(2)使用管理对象JMS ConnectionFactory建立连接Connection
(3)使用连接Connection 建立会话Session
(4)使用会话Session和管理对象Destination创建消息消费者MessageReceiver
(5)使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver
消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

3.ActiveMQ的下载

下载地址:http://activemq.apache.org/download.html
解压缩到本地,然后启动/bin/activemq.bat

并且有的Eclipse没有自带jms.jar,需要去下载

4.发送端代码:

package com.xkey.JMS;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsSender {

private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;

JmsSender(){

}

public void init(){
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");

try{

connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE.booleanValue(),
Session.AUTO_ACKNOWLEDGE);
//Queue
destination = session.createQueue("xkey");
producer = session.createProducer(destination);
//Topic
/**
* Topic topic = session.createTopic("xkey.Topic");
* producer = session.createProducer(topic);
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session,producer);
session.commit();

}catch(Exception e){
e.printStackTrace();
}finally{
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

private void sendMessage(Session session,MessageProducer producer) throws JMSException{
for (int i = 1; i <= 5; i ++) {
TextMessage message = session.createTextMessage("First ActiveMQ Test:::: " + i);
// 发送消息
System.out.println("Sender:" + "First ActiveMQ Test::: " + i);
producer.send(message);
}
}

/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
JmsSender jms = new JmsSender();
jms.init();
}

}


5.接收端代码

package com.xkey.JMS;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsReceiver {

private ConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private Destination destination = null;

public JmsReceiver(){

}

public void init(){
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
try{
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE.booleanValue(),
Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("xkey");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener(){

@Override
public void onMessage(Message msg) {
// TODO Auto-generated method stub
TextMessage message = (TextMessage)msg;
try{
System.out.println("Receiver " + message.getText());
}catch(Exception e){
e.printStackTrace();
}
}

});
/**while (true) {
TextMessage message = (TextMessage) consumer.receive(1000);
if (null != message) {
System.out.println("Receiver " + message.getText());
} else {
break;
}
}  */
}catch(Exception e){
e.printStackTrace();
}finally{
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
JmsReceiver jms = new JmsReceiver();
jms.init();
}

}


6、JMS+ActiveMQ+Spring+TomCat可以用来实现企业级的消息队列,这样能使服务请求端和服务操作端实现低耦合。不知道怎么实现分布式,也就是多个请求,每个请求用不同的服务器去相响应。

Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)

连接: http://blog.csdn.net/acceptedxukai/article/details/7775746
在Tomcat 6.0下用JNDI连接IBM MQ 6.0的配置方法(一)

连接: http://blog.sina.com.cn/s/blog_60dadc490100ecy6.html http://www.javawind.net/help/html/spring_ref_2.0/html/jms.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: