您的位置:首页 > 其它

ActiveMQ消息发送和接收

2015-02-02 16:17 204 查看
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

在 Java 里有 JMS 的多个实现,ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。

ConnectionFactory 是连接工厂,负责创建Connection。Connection 负责创建 Session。Destination 是消息的目的地。

Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。

ActiveMQ的官方网址:http://activemq.apache.org。在此可以下载ActiveMQ的最新版本和阅读相关文档。

下面是使用ActiveMQ发送和接收消息的JAVA实现:

1、消息发送者

package com.jmsd;

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.MessageProducer;

import javax.jms.ObjectMessage;

import javax.jms.Session;

import javax.jms.StreamMessage;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 说明: activemq send message

*

* @author xajava

* @version 创建时间:2012-10-24 下午1:22:40

*/

public class JmsSender {

private String USER = ActiveMQConnection.DEFAULT_USER;

private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

private String SUBJECT = "ActiveMQ.Demo";

private Destination destination = null;

private Connection conn = null;

private Session session = null;

private MessageProducer producer = null;

// 初始化

private void initialize() throws JMSException, Exception {

// 连接工厂

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);

conn = connectionFactory.createConnection();

// 事务性会话,自动确认消息

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 消息的目的地(Queue/Topic)

destination = session.createQueue(SUBJECT);

// destination = session.createTopic(SUBJECT);

// 消息的提供者(生产者)

producer = session.createProducer(destination);

// 不持久化消息

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

}

public void sendMessage(String msgType) throws JMSException, Exception {

initialize();

// 连接到JMS提供者(服务器)

conn.start();

// 发送文本消息

if ("text".equals(msgType)) {

String textMsg = "ActiveMQ Text Message!";

TextMessage msg = session.createTextMessage();

// TextMessage msg = session.createTextMessage(textMsg);

msg.setText(textMsg);

producer.send(msg);

}

// 发送Map消息

if ("map".equals(msgType)) {

MapMessage msg = session.createMapMessage();

msg.setBoolean("boolean", true);

msg.setShort("short", (short) 0);

msg.setLong("long", 123456);

msg.setString("MapMessage", "ActiveMQ Map Message!");

producer.send(msg);

}

// 发送流消息

if ("stream".equals(msgType)) {

String streamValue = "ActiveMQ stream Message!";

StreamMessage msg = session.createStreamMessage();

msg.writeString(streamValue);

msg.writeBoolean(false);

msg.writeLong(1234567890);

producer.send(msg);

}

// 发送对象消息

if ("object".equals(msgType)) {

JmsObjectMessageBean jmsObject = new JmsObjectMessageBean("ActiveMQ Object Message", 18, false);

ObjectMessage msg = session.createObjectMessage();

msg.setObject(jmsObject);

producer.send(msg);

}

// 发送字节消息

if ("bytes".equals(msgType)) {

String byteValue = "字节消息";

BytesMessage msg = session.createBytesMessage();

msg.writeBytes(byteValue.getBytes());

producer.send(msg);

}

}

// 关闭连接

public void close() throws JMSException {

if (producer != null)

producer.close();

if (session != null)

session.close();

if (conn != null)

conn.close();

}

}

2、消息接收者

package com.jmsd;

import java.util.Enumeration;

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MapMessage;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.ObjectMessage;

import javax.jms.Session;

import javax.jms.StreamMessage;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

* 说明:

*

* @author xajava

* @version 创建时间:2012-10-24 下午2:06:48

*/

public class JmsReceiver implements MessageListener {

private String USER = ActiveMQConnection.DEFAULT_USER;

private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;

private String SUBJECT = "ActiveMQ.Demo";

private Destination dest = null;

private Connection conn = null;

private Session session = null;

private MessageConsumer consumer = null;

private boolean stop = false;

// 初始化

private void initialize() throws JMSException, Exception {

// 连接工厂是用户创建连接的对象.

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);

// 连接工厂创建一个jms connection

conn = connectionFactory.createConnection();

// 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务

// 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象.

dest = session.createQueue(SUBJECT);

// dest = session.createTopic(SUBJECT);

// 会话创建消息的生产者将消息发送到目的地

consumer = session.createConsumer(dest);

}

/**

* 消费消息

*

* @throws JMSException

* @throws Exception

*/

public void receiveMessage() throws JMSException, Exception {

initialize();

conn.start();

consumer.setMessageListener(this);

// 等待接收消息

while (!stop) {

Thread.sleep(5000);

}

}

@SuppressWarnings("rawtypes")

@Override

public void onMessage(Message msg) {

try {

if (msg instanceof TextMessage) {

TextMessage message = (TextMessage) msg;

System.out.println("------Received TextMessage------");

System.out.println(message.getText());

} else if (msg instanceof MapMessage) {

MapMessage message = (MapMessage) msg;

System.out.println("------Received MapMessage------");

System.out.println(message.getLong("long"));

System.out.println(message.getBoolean("boolean"));

System.out.println(message.getShort("short"));

System.out.println(message.getString("MapMessage"));

System.out.println("------Received MapMessage for while------");

Enumeration enumer = message.getMapNames();

while (enumer.hasMoreElements()) {

Object obj = enumer.nextElement();

System.out.println(message.getObject(obj.toString()));

}

} else if (msg instanceof StreamMessage) {

StreamMessage message = (StreamMessage) msg;

System.out.println("------Received StreamMessage------");

System.out.println(message.readString());

System.out.println(message.readBoolean());

System.out.println(message.readLong());

} else if (msg instanceof ObjectMessage) {

System.out.println("------Received ObjectMessage------");

ObjectMessage message = (ObjectMessage) msg;

JmsObjectMessageBean jmsObject = (JmsObjectMessageBean) message.getObject();

System.out.println(jmsObject.getUserName() + "__" + jmsObject.getAge() + "__" + jmsObject.isFlag());

} else if (msg instanceof BytesMessage) {

System.out.println("------Received BytesMessage------");

BytesMessage message = (BytesMessage) msg;

byte[] byteContent = new byte[1024];

int length = -1;

StringBuffer content = new StringBuffer();

while ((length = message.readBytes(byteContent)) != -1) {

content.append(new String(byteContent, 0, length));

}

System.out.println(content.toString());

} else {

System.out.println(msg);

}

stop = true;

} catch (JMSException e) {

e.printStackTrace();

} finally {

try {

this.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

// 关闭连接

public void close() throws JMSException {

System.out.println("Consumer:->Closing connection");

if (consumer != null)

consumer.close();

if (session != null)

session.close();

if (conn != null)

conn.close();

}

}

3、对象消息

package com.jmsd;

import java.io.Serializable;

/**

* 说明: JMS 对象消息示例对象

*

* @author xajava

* @version 创建时间:2012-10-24 下午1:56:07

*/

public class JmsObjectMessageBean implements Serializable {

private static final long serialVersionUID = 2620024932905963095L;

private String userName;

private int age = 16;

private boolean flag = true;

public JmsObjectMessageBean(String userName,int age,boolean flag){

this.setUserName(userName);

this.setAge(age);

this.setFlag(flag);

}

public String getUserName() {

return userName;

}

public void setUserName(String userName) {

this.userName = userName;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

public boolean isFlag() {

return flag;

}

public void setFlag(boolean flag) {

this.flag = flag;

}

}

4、测试类

package com.jmsd;

import javax.jms.JMSException;

/**

* 说明:

*

* @author xajava

* @version 创建时间:2012-10-22 下午4:33:17

*/

public class Test {

public static void main(String[] args) throws JMSException, Exception {

JmsSender sender = new JmsSender();

JmsReceiver receiver = new JmsReceiver();

sender.sendMessage("bytes");

sender.close();

receiver.receiveMessage();

receiver.close();

}

}

package com.jmsd;

import javax.jms.JMSException;

/**

* 说明:

*

* @author xajava

* @version 创建时间:2012-10-22 下午4:33:17

*/

public class Test {

public static void main(String[] args) throws JMSException, Exception {

JmsSender sender = new JmsSender();

JmsReceiver receiver = new JmsReceiver();

sender.sendMessage("bytes");

sender.close();

receiver.receiveMessage();

receiver.close();

}

}

package com.jmsd;

import javax.jms.JMSException;

/**

* 说明:

*

* @author xajava

* @version 创建时间:2012-10-22 下午4:33:17

*/

public class Test {

public static void main(String[] args) throws JMSException, Exception {

JmsSender sender = new JmsSender();

JmsReceiver receiver = new JmsReceiver();

sender.sendMessage("bytes");

sender.close();

receiver.receiveMessage();

receiver.close();

}

}

---------------------------------------------------------------------------------
http://www.blogjava.net/xajava/archive/2012/10/24/390165.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: